In this post we are going to learn how you can use AWS Cloudwatch insights to precisely measure no. of logs successfully processed by AWS Kinesis Firehose for a given time interval. You might think that the current metrics of AWS Kinesis Firehose already do that but we will prove otherwise and provide a solution.
Henceforth, AWS Kinesis Firehose will be called Firehose.
If you are streaming user logs from one AWS account to another, chances are that you have used Firehose. The setup is typical -
Account-1 source -> Account-2 Firehose -> Account-2 destination.
where
Account-1 source = EC2 with Kinesis agent / AWS Lambda, etc.
Account-2 destination = AWS Elasticsearch / AWS S3 / Splunk / AWS Redshift, etc.
I wrote a Terraform module which showcases the flow
Account-1 Lambda -> Account-2 Firehose -> Account-2 Elasticsearch.
When I deployed this setup, one of the important questions that was asked was:
- How do we ensure that the no. of logs generated by source == no. of logs arrived at destination for each day’s worth of logs?
This means that we have to count the no. of logs per day at each hop and see if they are dropped/rejected at any stage.
Per day is the keyword here because for the flow described above, there can be buffering at each stage and we can only compare the numbers after we know that a day’s worth of data is done. To elaborate futher consider this timeline
Flow - Lambda -> Firehose -> Elasticsearch.
Lambda generates - 100 logs per second = 6000 logs per min = 360,000 logs per hour.
If you were to verify if 2 hours worth has been sent and you select a time window - say 0000 hrs to 0200 hrs and check for those logs in Elasticsearch - there is a chance that Elasticsearch didn’t receive 720,000 logs in that time because Firehose might be buffering them during processing as per these settings.
The actual 720,000 logs might reach the destination at 0201 hrs - but that does not mean that they were not received. It just means that they were eventually received.
However, if you search for a day’s worth of data in Elasticsearch e.g. How many logs did Elasticsearch receive on 29th March 2020 - you will get a precise answer. Considering that you use a daily index - the no. of logs in the index for 29-03-2020 will not grow, because even if AWS Kinesis Firehose, while processing the data, buffered the logs, the logs were delivered to 29-03-2020 index.
This means that you can count the logs generated at Account-1 AWS Lambda for 29-03-2020 and do the same for the logs received at Account-2 Elasticsearch to get your answer. In short - you cannot be sure of whether the inflight data will be delivered or not till a unit of time i.e. a day is over.
But you cannot precisely calculate the no. of logs processed by AWS Kinesis Firehose per day by the out of the box metrics that AWS provides.
Firehose exposes these metrics - out of which IncomingRecords is of our interest. If we take a day’s range and check this number - we might think it is the number of logs/records received by Firehose in one day. But what if:
- The source - AWS EC2 / Lambda was buffering and sending records at a buffer interval or after reaching a specific buffer size? e.g. Lambda buffered logs from 28-03-2020 23:59:30 to 29-03-2020 00:00:30 and sent them together at 29-03-2020 00:01:00. This means that Firehose also got some residual logs for 28-03-2020 in 29-03-2020’s quota. So counting logs for 29-03-2020 will show up 28-03-2020’s numbers.
That’s the catch - IncomingRecords calculates the records as per the timestamp they were received and not as per the timestamp they were generated.
The only way to fix this is to use a transformation lambda which, in addition to validating the logs - checks the generated timestamp of the logs and counts no. of logs destined for that specific day.
This Lambda captures and logs stats - like the following:
{
"level": "INFO",
"funcName": "lambda_handler",
"lineno": 158,
"message": "xformation_stats",
"firehose_name": "arn:aws:firehose:us-east-1:123456678:deliverystream/test-firehose-delivery-stream",
"total_records": 2,
"total_processed": 2,
"total_failed": 0,
"total_failed_max_size_exceeded": 0,
"total_failed_b64_decode": 0,
"total_failed_json_load": 0,
"total_failed_xformation": 0,
"total_event_record_size_bytes": 184,
"max_event_record_size_bytes": 92,
"min_event_record_size_bytes": 92,
"all_records_processed": true,
"index_dates": "2020-02-01",
"timestamp": "2020-02-02T00:00:01.722613Z"
}
The field - index_dates is important here - this tells us - in the batch of logs we received - how many different index dates worth of logs were there?
Normally there will be just one index date as per the sample. But if buffering happens on the edge as per the above example - we might see an entry like the following:
{
.
.
"index_dates": "2020-02-01,2020-01-02",
}
These log entries can now be analyzed through Cloudwatch insights to get precise numbers. A sample Cloudwatch insights dashboard is generated in the Terraform module’s receiver example.
The Cloudwatch insights query is:
filter @message like /true|false/ |
filter index_dates like /2020-02-08/ |
stats sum(total_records) as @totalRecords, avg(total_records) as @avgBatchSize, sum(total_processed) as @totalProcessed, sum(total_failed) as @totalFailed, max(max_event_record_size_bytes) as @maxSize, sum(total_failed_b64_decode) as @failedB64, sum(total_failed_json_load) as @failedJson
which gives the total number of records processed (success, failure) for that specific index_date in Firehose.
That’s it. Use Firehose inbuilt metrics to get an approximate idea of the number of logs it processed for a given day. But use the above info to get the actual number when checking against Elasticsearch index.