Saving Firehose stream batches as JSON array

AWS
Firehose
CDK

A quick guide to saving Kinesis Firehose stream batches as JSON arrays for easier processing and storage.

By default, Kinesis Firehose delivers stream batches by appending each JSON record directly to the previous one, resulting in a concatenated JSON stream. An optional Newline delimiter setting allows each record to be separated by a newline character, producing a JSON Lines (JSONL) file. However, Firehose does not provide a built-in option to save a batch as a single JSON array. To achieve this, a custom AWS Lambda function must be used as a data transformation processor within the Firehose delivery stream. This Lambda function can wrap the batch of records into a properly formatted JSON array before delivery.

Lambda

import type {
  FirehoseRecordTransformationStatus,
  FirehoseTransformationHandler,
  FirehoseTransformationResult
} from 'aws-lambda'

export const handler: FirehoseTransformationHandler = async (event) => {
  const parsedRecords = event.records.map((record) =>
    JSON.parse(Buffer.from(record.data, 'base64').toString())
  )

  const jsonArrayString = JSON.stringify(parsedRecords) + '\n'
  const base64Encoded = Buffer.from(jsonArrayString).toString('base64')

  const result: FirehoseTransformationResult = {
    records: [
      {
        recordId: event.records[0]!.recordId,
        result: 'Ok',
        data: base64Encoded
      },
      ...event.records.slice(1).map((record) => ({
        recordId: record.recordId,
        result: 'Dropped' as FirehoseRecordTransformationStatus,
        data: ''
      }))
    ]
  }

  return result
}

If you happen to use AWS CDK in your project, using the recent L2 constructors, you could end up with something like this:

CDK

const lambdaProcessor = new LambdaFunctionProcessor(
      firehoseBatchJSONArrayRecordsFunction
    )

const s3Bucket = new S3Bucket("S3Bucket", {
  compression: Compression.GZIP,
  fileExtension: '.json.gz',
  processor: lambdaProcessor
})

const deliveryStream = new DeliveryStream(
  this,
  "DeliveryStream",
  {
    destination: s3Destination
  }
)