AWS Kinesis service is widely used for building realtime applications where we can ingest data using kinesis streams, process it using kinesis analytics or other services using kinesis streams.
In this tutorial, we will learn how we can use AWS Lambda service to process Kinesis events.
I will be using AWS CLI for this tutorial so that you can get better grasp of AWS CLI. For real projects, I would suggest you to use cloudformation or higher level abstractions such as SAM (Serverless Application Model) Or Terraform.
Prerequisites
Make sure you have completed the following steps before you start this tutorial:
- Signed up for an AWS account and created an administrator user in the account.
- Installed and set up the AWS CLI.
- For instructions, see Step 1: Set Up an AWS Account and the AWS CLI
- Install Gradle
- Once installed, please add the executables in your system path for git, gradle, awscli.
- Run following commands to verify that the following tools have been installed.
git --version
gradle --version
aws --version
Use case
Assume that you have high traffic web application which is writing to kinesis enabling asyncronous processing. In this tutorial, we will write a lambda function to process the kinesis records.
Following is high level flow
- Your application writes records to the stream.
- AWS Lambda polls the stream and whenever it finds new records, it executes your lambda function.
Tutorial
if you run into any errors, please refer Common Errors section to see if it helps
# Create project directory
mkdir lambda-kinesis; cd lambda-kinesis
# Create config directory to store config files such as s3, AWS IAM policies
mkdir config; mkdir config/policies; mkdir config/test
Create Lambda code using Java
# Create Java project directory
mkdir KinesisConsumer;cd KinesisConsumer
gradle init --type java-library
gradle build
# Remove default files created by project
rm -rf src/main/java/Library.java;rm -rf src/test/java/LibraryTest.java
Add below lambda dependencies under dependencies section in build.gradle file.
compile 'com.amazonaws:aws-lambda-java-core:1.1.0'
compile "com.amazonaws:aws-lambda-java-events:1.3.0"
Add below task at the end of build.gradle so that we can create a Fat Jar which contains all the dependencies for the project. This is required as AWS lambda mandates uploading one zip/jar file containing all the dependencies.
/*
** Task to create zip file bundling all the dependencies
*/
task buildZip(type: Zip) {
from compileJava
from processResources
into('lib') {
from configurations.runtime
}
}
build.dependsOn buildZip
Add KinesisConsumer class
Tip: You can Open the project as gradle project in your favorite IDE such as eclipse or Idea
touch src/main/java/KinesisConsumer.java
Add following code to KinesisConsumer.java class
import java.io.IOException;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
public class KinesisConsumer {
public String handleRequest(KinesisEvent event) throws IOException {
System.out.println("Record Size - " + event.getRecords().size());
for(KinesisEventRecord rec : event.getRecords()) {
System.out.println(new String(rec.getKinesis().getSequenceNumber()));
System.out.println(new String(rec.getKinesis().getData().array()));
}
return "success";
}
}
Build the project now to make sure you are not getting any error.
gradle clean build
Verify that fat jar has been created in distributions folder
ls build/distributions
Create AWS roles and policies
Create the trust policy
AWS service role of the type AWS Lambda – This role grants AWS Lambda permissions to assume the role.
cd ../
touch config/policies/KinesisConsumer-lambda-trustpolicy.json
Add following contents to the KinesisConsumer-lambda-trustpolicy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Create the Lambda Execution role
Create role
aws iam create-role --role-name KinesisConsumer-lambda-execution-role --assume-role-policy-document file://config/policies/KinesisConsumer-lambda-trustpolicy.json
You will see the following output
{
"Role": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
]
},
"RoleId": "AROAIRJRHN62AKLSK2L3W",
"CreateDate": "2017-07-11T06:18:09.277Z",
"RoleName": "KinesisConsumer-lambda-execution-role",
"Path": "/",
"Arn": "arn:aws:iam::YOUR_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role"
}
}
Note down the YOUR_ACCOUNT_ID from the ARN and replace in below line
export AWS_ACCOUNT_ID=YOUR_ACCOUNT_ID
# Validate that it has been set up properly
echo AWS_ACCOUNT_ID
Create Cloudwatch Permissions Policy
touch config/policies/lambda-cloudwatch-permissionspolicy.json
Add following contents to the lambda-cloudwatch-permissionspolicy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
Create Kinesis Permissions Policy
touch config/policies/lambda-kinesis-permissionspolicy.json
Add following contents to the lambda-kinesis-permissionspolicy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListStreams"
],
"Resource": "*"
}
]
}
Attach Cloudwatch and S3 policies to the role
Embed the permissions policy (in this example an inline policy) to the role to specify what it is allowed to do.
# Attach cloudwatch write policy
aws iam put-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name lambda-cloudwatch-permissions-Policy --policy-document file://config/policies/lambda-cloudwatch-permissionspolicy.json
# Attach Kinesis Stream read policy
aws iam put-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name KinesisConsumer-kinesis-permissions-Policy --policy-document file://config/policies/lambda-kinesis-permissionspolicy.json
Create Lambda function
aws lambda create-function \
--region us-west-2 \
--function-name KinesisConsumer \
--zip-file fileb://KinesisConsumer/build/distributions/KinesisConsumer.zip \
--role arn:aws:iam::$AWS_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role \
--handler KinesisConsumer::handleRequest \
--runtime java8 \
--timeout 30 \
--memory-size 1024
# Output
{
"TracingConfig": {
"Mode": "PassThrough"
},
"CodeSha256": "TySSxcICge5hqXTG4sQadlMeH7ZERwpv2xmy12pYxJs=",
"FunctionName": "KinesisConsumer",
"CodeSize": 8082643,
"MemorySize": 1024,
"FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
"Version": "$LATEST",
"Role": "arn:aws:iam::YOUR_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role",
"Timeout": 30,
"LastModified": "2017-07-11T06:23:32.982+0000",
"Handler": "KinesisConsumer::handleRequest",
"Runtime": "java8",
"Description": ""
}
Test Lambda invocation using Kinesis mock event
Create sample kinesis input event file
touch config/test/KinesisConsumer-input.json
Add following content to KinesisConsumer-input.json
{
"Records": [
{
"kinesis": {
"partitionKey": "partitionKey-3",
"kinesisSchemaVersion": "1.0",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
},
"eventSource": "aws:kinesis",
"eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
"invokeIdentityArn": "arn:aws:iam::account-id:role/testLEBRole",
"eventVersion": "1.0",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream",
"awsRegion": "us-west-2"
}
]
}
Invoke Lambda
aws lambda invoke \
--invocation-type Event \
--function-name KinesisConsumer \
--region us-west-2 \
--payload file://config/test/KinesisConsumer-input.json \
outputfile.txt
{
"StatusCode": 200
}
aws lambda invoke \
--invocation-type RequestResponse \
--function-name KinesisConsumer \
--region us-west-2 \
--payload file://config/test/KinesisConsumer-input.json \
outputfile.txt
Testing with Kinesis
Create an Amazon Kinesis Stream and Associate It with Your Lambda Function
Create Kinesis Stream
aws kinesis create-stream \
--stream-name LambdaKinesisStreamExample \
--shard-count 1 \
--region us-west-2
Run the following Amazon Kinesis describe-stream AWS CLI command to get the stream ARN.
aws kinesis describe-stream \
--stream-name LambdaKinesisStreamExample \
--region us-west-2
{
"StreamDescription": {
"RetentionPeriodHours": 24,
"StreamName": "LambdaKinesisStreamExample",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49574047567355003591396290706428592663441186123818205186"
}
}
],
"StreamARN": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"StreamStatus": "ACTIVE"
}
}
Get the streamARN - arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample
Add an Event Source in AWS Lambda
Add an Event Source in AWS Lambda so that your lambda function can start polling the Amazon Kinesis stream
aws lambda create-event-source-mapping \
--region us-west-2 \
--function-name KinesisConsumer \
--event-source arn:aws:kinesis:us-west-2:$AWS_ACCOUNT_ID:stream/LambdaKinesisStreamExample \
--batch-size 100 \
--starting-position TRIM_HORIZON
{
"UUID": "1538750c-8dcb-4cd1-a4d5-a753fdb01895",
"StateTransitionReason": "User action",
"LastModified": 1497120187.749,
"BatchSize": 100,
"EventSourceArn": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
"FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
"State": "Creating",
"LastProcessingResult": "No records processed"
}
You can get a list of event source mappings by running the following command.
aws lambda list-event-source-mappings \
--region us-west-2 \
--function-name KinesisConsumer \
--event-source arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample \
--debug
{
"EventSourceMappings": [
{
"UUID": "1538750c-8dcb-4cd1-a4d5-a753fdb01895",
"StateTransitionReason": "User action",
"LastModified": 1497120240.0,
"BatchSize": 100,
"State": "Enabled",
"FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
"EventSourceArn": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
"LastProcessingResult": "No records processed"
}
]
}
you can verify the status value is enabled.
If you disable the event source mapping, AWS Lambda stops polling the Amazon Kinesis stream. If you re-enable event source mapping, it will resume polling from the sequence number where it stopped, so each record is processed either before you disabled the mapping or after you enabled it. If the sequence number falls behind TRIM_HORIZON, when you re-enable it polling will start from TRIM_HORIZON. However, if you create a new event source mapping, polling will always start from TRIM_HORIZON, LATEST or AT_TIMESTAMP, depending on the starting position you specify. This applies even if you delete an event source mapping and create a new one with the same configuration as the deleted one.
Test the Setup
Using the following AWS CLI command, add event records to your Amazon Kinesis stream. The –data value is a base64-encoded value of the “Hello, this is a test.” string. You can run the same command more than once to add multiple records to the stream.
aws kinesis put-record \
--stream-name LambdaKinesisStreamExample \
--data "This is a test. final" \
--partition-key shardId-000000000000 \
--region us-west-2
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49574047567355003591396290707342540583069889897800138754"
}
Validate that lambda processed the events by going to AWS cloudwatch
aws logs describe-log-streams --log-group-name '/aws/lambda/KinesisConsumer' --region us-west-2 --order-by LastEventTime --descending
# Note down the arn of the latest stream(first stream as we are using descending order) and replace in
# --log-stream-name parameter below
aws logs get-log-events --log-group-name /aws/lambda/KinesisConsumer --log-stream-name '2017/06/10/[$LATEST]b678e638f5d9423e9738ccdd8639babb'
Conclusion
You learnt following things
- Lambda could be used as Kinesis Streams consumer
- To allow Lambda to call Kinesis Streams, we created execution role which allowed access to cloudwatch and kinesis
- We also created a kinesis stream and added it as event source in our lambda function
- Lambda function gets automatically invoked
Let me know if you run into any issues.
Resource Cleanup
aws lambda delete-function \
--function-name KinesisConsumer \
--region us-west-2
aws iam delete-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name lambda-cloudwatch-permissions-Policy
aws iam delete-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name KinesisConsumer-kinesis-permissions-Policy
aws iam delete-role --role-name KinesisConsumer-lambda-execution-role
aws kinesis delete-stream \
--stream-name LambdaKinesisStreamExample \
--region us-west-2
Common Errors
Lambda function failing due to timeout
You can increase the timeout by timeout parameter
aws lambda update-function-configuration \
--function-name KinesisConsumer \
--region us-west-2 \
--timeout 30 \
--memory-size 1024
How can I update only function code
aws lambda update-function-code \
--function-name KinesisConsumer \
--zip-file fileb://KinesisConsumer/build/distributions/KinesisConsumer.zip \
--region us-west-2
Getting JSON parsing error
I was getting below error because I was using java-events-1.1.0 which depended on kms 1.11.160 which has introduced a bug to cause below issue. For a workaround, I have switched to java-events-1.3.0 which does not depend on 1.11.160
An error occurred during JSON parsing: java.lang.RuntimeException
java.lang.RuntimeException: An error occurred during JSON parsing
Caused by: java.io.UncheckedIOException: com.fasterxml.jackson.databind.JsonMappingException: Conflicting setter definitions for property "encryptionType": com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params) vs com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Conflicting setter definitions for property "encryptionType": com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params) vs com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:269)
References
# | Reference |
---|---|
AWS Kinesis documentation | http://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example-deployment-pkg.html |
Version History
Date | Description |
---|---|
2017-07-04 | Initial Version |