Troubleshooting Kinesis Streams as Used by StreamAlert
One of the tools we use for monitoring our logs at Duo Security is StreamAlert, an open-source framework developed by Airbnb that was released in January, 2017. StreamAlert is a serverless, real time data analysis framework that runs on Amazon Web Services (AWS), which means it can continually scale to accept increasing amounts of logs.
StreamAlert matches rules (kept in source control) against these log messages and creates alerts that are sent to our Slack channels and other outputs. StreamAlert also helps to aggregate and normalize these logs from various sources and store them in S3 buckets for long term storage.
StreamAlert makes use of a number of technologies from AWS, including Kinesis Streams and Lambda in order to scale easily with little maintenance needed. However, when you first start off, or if your log volume changes dramatically, there is a little tuning that needs to be done. With there being few resources available for managing Kinesis Streams, we wanted to share some lessons learned.
Example StreamAlert Use Case
Let's begin by describing the basic architecture behind StreamAlert by looking at one possible data flow. Let's say you wanted to monitor the network traffic of your EC2 instances and other network resources. One way to do this is with VPC Flow Logs, which record which network interfaces and associated IP addresses communicated with which other network interfaces. These are recorded by AWS to CloudWatch Logs.
With a few API calls, you can connect CloudWatch Logs to Kinesis Streams. Kinesis Streams is a real-time data-streaming service, similar to Apache Kafka, that Lambda functions can be connected with to read the data as it arrives. This is just one of many flows that StreamAlert supports, and will be the focus of this article. This may all sound complicated, but with StreamAlert, you simply write the detection rules you want in Python, set up some configuration files, and StreamAlert takes care of creating the required infrastructure for you.
When a record is sent to a Kinesis Stream, you assign it a partition key, which is a 128-bit integer, and can be thought of as a hash value. This determines which shard in a Kinesis Stream the data should go to. When a Kinesis Stream is first created, it has a single shard, that all records will go to. A shard can only support certain throughput limits, so you will want to split shards multiple times until your Kinesis Stream, as a whole, can support the throughput you need. In order to evenly distribute messages across the entirety of your shards, a random partition key is recommended.
If you have over-provisioned more shards than you need, you can merge shards, however splitting and merging shards results in extra costs so it should not be done often. After your initial setup, resharding only needs to be done if your log throughput changes dramatically, which is good, because one of the benefits of Kinesis Streams is the limited amount of maintenance they require.
Through StreamAlert, a Lambda function is associated with the Kinesis Stream to consume the records that are sent through it. A single record can contain multiple log events.
Let's go through some common problems:
Lambda functions run by default for a maximum of 60 seconds in StreamAlert. If you experience time-outs, you can increase this value to a maximum of 300 seconds (5 minutes). Your Lambdas should be completing within half of the time-out, so the extra time available is just a buffer for rare events that take longer. The CloudWatch metrics for Lambda functions include a duration metric that should be looked at for this.
Another option is to give the Lambda function more CPU power so it runs faster. You can't directly control the compute power of the Lambda, but you can control how much memory it has. AWS proportionally scales the amount of compute with the allocated amount of memory of the Lambda. So, if you double the memory, it will double the CPU, and this should reduce the time it takes the function to complete. By default, StreamAlert creates a Lambda function with 128 MB of memory. Your Lambdas will likely be more bound by IO and CPU than by memory, so when designing your rules, keep in mind that you will likely have extra memory.
Another option is to make your Lambda work on fewer records. By default, StreamAlert’s Rule Processor Lambda function reads 100 records at a time from a given Kinesis Stream You can adjust this so your Lambda function works on either more or less records at a time by changing the
batch_size. Processing 100 records within the maximum Lambda time-out period of five minutes might not sound like a lot, but a single record can contain multiple log events when dealing with nested types, such as VPC Flow Logs.
In the case of VPC Flow Logs, AWS might pack tens of thousands of log events into a single record, which means with 100 records and a 300 second timeout, your Lambda would have to process thousands of log lines per second. VPC Flow Logs come in bunches every five minutes, so it is better to smooth out these spikes over the five minute periods by working on fewer records at a time.
Lambda duration before and after a coding error was fixed that caused time-outs.
Unless your log volume has suddenly and dramatically changed, a common source of time-outs is errors in your code, which can be identified by searching CloudWatch Logs. One error in particular to look for is if a function call is being made by your rule that fails and then retries after some amount of time. Those repeated failures, waits, and retries will eat up the Lambda function's duration. The above graph was the result of one such bug in the code, and its subsequent fix.
Ensure your code is not making calls to slow services. If your rule happened to be calling out to some external service for every log event with possible latency, your time could be eaten up.
Finally, review the efficiency of your rule code by looking for unnecessary loops.
If alerts are not timely, it may be because the Kinesis Stream IteratorAge is high. The IteratorAge is the difference between the current time and when the last record from the GetRecords call was written to the stream. The IteratorAge should be at or near zero. If it shows as 86M ms (24 hours), this means your IteratorAge is matching the default retention period of Kinesis Streams and your Lambda functions are not keeping up with the rate of incoming data.
For example, a high IteratorAge could cause an alert to fire for an event that happened hours ago instead of the alert firing in real time. If the IteratorAge is high enough, records will not be processed before their retention time expires and will be dropped, meaning no alert will fire. By default, StreamAlert creates a CloudWatch Alarm at 1M ms to catch errors early before they cause production issues.
Kinesis will only call one Lambda function per shard at a time. I originally thought that if a ton of records came in, that Kinesis would just spin up hundreds of Lambdas simultaneously, but that is not the case. If you are hitting IteratorAge issues, you either need to increase the number of shards or ensure that data is being evenly distributed. Having more shards will result in more lambdas being run simultaneously, which will result in more records being churned through.
Balanced shards receiving similar amounts of data
All shards should have roughly the same number of
IncomingRecords. If they do not, then the partition key they use is not being randomly assigned. This problem will likely reveal itself by the iterator age being high. Splitting an individual shard could help with this problem, but you'll be better off by making changes to how your partition key is assigned.
The error "ProvisionedThroughputExceededException" means too much data is being sent in for a shard to handle, which means more shards are needed.
Another solution would be to reduce the amount of log data being sent in, or implement exponential backoff when making PutRecord calls into Kinesis directly.
The main technique you'll need to use to debug Kinesis Streams is looking at CloudWatch metric graphs. To identify many problems, you'll want to look at the maximum or minimum values, not the averages. You may also need to look at per shard metrics as opposed to the Kinesis Stream as a whole to identify "hot shards."
You should also read the data being passed through your Kinesis Stream manually to confirm it matches your assumptions. This can be done using the techniques described in AWS's guide Perform Basic Stream Operations.
We hope that this post provides some security benefit to you and your organization. If you’re interested in the intersection between security and running a highly-available service on AWS, please contact Duo's Production Engineering team at email@example.com.