All files / resources eventFileQueueHandler.ts

0% Statements 0/90
0% Branches 0/1
0% Functions 0/1
0% Lines 0/90

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119                                                                                                                                                                                                                                             
import {
  BatchCreatePartitionCommand,
  BatchGetPartitionCommand,
  GlueClient
} from '@aws-sdk/client-glue';
import {
  S3Event, S3EventRecord
} from 'aws-lambda';
 
const glueDatabase = process.env.GLUE_DATABASE;
const glueTable = process.env.GLUE_TABLE;
 
export async function main(event: S3Event) {
  // Get the S3 events
  const s3EventRecords: S3EventRecord[] = event.Records || [];
 
  // Exit early if no messages
  if (s3EventRecords.length === 0) {
    return;
  }
 
  // Get all of the prefixes that were created
  const paths = s3EventRecords
    .map(record => {
      const path = record.s3.object.key
        .replace(/[^\/]+$/, '');
      return decodeURIComponent(path);
    })
    .filter((v, i, a) => a.indexOf(v) === i);
 
  // Attempt to get the associated partitions
  const eventPartitions: {
    datetime: string;
    event: string;
    path: string;
  }[] = paths.map(path => {
    const matches = path.match(/datetime=([^\/]+)\/event=([^\/]+)\//);
    if (matches === null) {
      return {
        datetime: '',
        event: '',
        path,
      };
    }
 
    return {
      datetime: matches[1],
      event: matches[2],
      path,
    };
  });
 
  // Fetch the partition information
  const glue = new GlueClient();
  const existingPartitions = await glue.send(new BatchGetPartitionCommand({
    DatabaseName: glueDatabase,
    TableName: glueTable,
    PartitionsToGet: eventPartitions.map(p => ({
      Values: [
        p.datetime,
        p.event,
      ],
    })),
  }));
 
  // Figure out which partitions are new
  const newPartitions = eventPartitions.filter(p => !existingPartitions.Partitions?.some(ep =>
    ep.Values?.[0] === p.datetime &&
    ep.Values?.[1] === p.event));
 
  // Make the new partitions
  if (newPartitions.length > 0) {
    await glue.send(new BatchCreatePartitionCommand({
      DatabaseName: glueDatabase,
      TableName: glueTable,
      PartitionInputList: newPartitions.map(p => ({
        Values: [
          p.datetime,
          p.event,
        ],
        StorageDescriptor: {
          Location: `s3://${process.env.EVENTS_S3_BUCKET}/${p.path}`,
          Columns: [
            {
              Name: 'radioid',
              Type: 'string',
            },
            {
              Name: 'talkgroup',
              Type: 'string',
            },
            {
              Name: 'talkgrouplist',
              Type: 'string',
            },
            {
              Name: 'tower',
              Type: 'string',
            },
            {
              Name: 'timestamp',
              Type: 'bigint',
            },
          ],
          InputFormat: 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat',
          OutputFormat: 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat',
          SerdeInfo: {
            SerializationLibrary: 'org.apache.hadoop.hive.ql.io.orc.OrcSerde',
          },
        },
      })),
    }));
  }
 
  // Log the messages received
  console.log(`Records: ${s3EventRecords.length}, ` +
    `Partitions: ${eventPartitions.length}, New Partitions: ${newPartitions.length}`);
}