By default, Kinesis Firehose delivers stream batches by appending each JSON record directly to the previous one, resulting in a concatenated JSON stream. An optional Newline delimiter setting allows each record to be separated by a newline character, producing a JSON Lines (JSONL) file. However, Firehose does not provide a built-in option to save a batch as a single JSON array. To achieve this, a custom AWS Lambda function must be used as a data transformation processor within the Firehose delivery stream. This Lambda function can wrap the batch of records into a properly formatted JSON array before delivery.
Lambda
import type {
FirehoseRecordTransformationStatus,
FirehoseTransformationHandler,
FirehoseTransformationResult
} from 'aws-lambda'
export const handler: FirehoseTransformationHandler = async (event) => {
const parsedRecords = event.records.map((record) =>
JSON.parse(Buffer.from(record.data, 'base64').toString())
)
const jsonArrayString = JSON.stringify(parsedRecords) + '\n'
const base64Encoded = Buffer.from(jsonArrayString).toString('base64')
const result: FirehoseTransformationResult = {
records: [
{
recordId: event.records[0]!.recordId,
result: 'Ok',
data: base64Encoded
},
...event.records.slice(1).map((record) => ({
recordId: record.recordId,
result: 'Dropped' as FirehoseRecordTransformationStatus,
data: ''
}))
]
}
return result
}
If you happen to use AWS CDK in your project, using the recent L2 constructors, you could end up with something like this:
CDK
const lambdaProcessor = new LambdaFunctionProcessor(
firehoseBatchJSONArrayRecordsFunction
)
const s3Bucket = new S3Bucket("S3Bucket", {
compression: Compression.GZIP,
fileExtension: '.json.gz',
processor: lambdaProcessor
})
const deliveryStream = new DeliveryStream(
this,
"DeliveryStream",
{
destination: s3Destination
}
)