Using Lambda as Kinesis events processor

AWS Kinesis service is widely used for building realtime applications where we can ingest data using kinesis streams, process it using kinesis analytics or other services using kinesis streams.

In this tutorial, we will learn how we can use AWS Lambda service to process Kinesis events.

lambda diagram

I will be using AWS CLI for this tutorial so that you can get better grasp of AWS CLI. For real projects, I would suggest you to use cloudformation or higher level abstractions such as SAM (Serverless Application Model) Or Terraform.

Prerequisites

Make sure you have completed the following steps before you start this tutorial:

  • Signed up for an AWS account and created an administrator user in the account.
  • Installed and set up the AWS CLI.
  • For instructions, see Step 1: Set Up an AWS Account and the AWS CLI
  • Install Gradle
  • Once installed, please add the executables in your system path for git, gradle, awscli.
  • Run following commands to verify that the following tools have been installed.
git --version

gradle --version

aws --version

Use case

Assume that you have high traffic web application which is writing to kinesis enabling asyncronous processing. In this tutorial, we will write a lambda function to process the kinesis records.

Following is high level flow

  • Your application writes records to the stream.
  • AWS Lambda polls the stream and whenever it finds new records, it executes your lambda function.

Tutorial

if you run into any errors, please refer Common Errors section to see if it helps

# Create project directory
mkdir lambda-kinesis; cd lambda-kinesis

# Create config directory to store config files such as s3, AWS IAM policies
mkdir config; mkdir config/policies; mkdir config/test

Create Lambda code using Java

# Create Java project directory
mkdir KinesisConsumer;cd KinesisConsumer
gradle init --type java-library

gradle build

# Remove default files created by project
rm -rf src/main/java/Library.java;rm -rf src/test/java/LibraryTest.java

Add below lambda dependencies under dependencies section in build.gradle file.

compile 'com.amazonaws:aws-lambda-java-core:1.1.0'
compile "com.amazonaws:aws-lambda-java-events:1.3.0"

Add below task at the end of build.gradle so that we can create a Fat Jar which contains all the dependencies for the project. This is required as AWS lambda mandates uploading one zip/jar file containing all the dependencies.

/*
** Task to create zip file bundling all the dependencies
*/
task buildZip(type: Zip) {
    from compileJava
    from processResources
    into('lib') {
        from configurations.runtime
    }
}

build.dependsOn buildZip

Add KinesisConsumer class

Tip: You can Open the project as gradle project in your favorite IDE such as eclipse or Idea

touch src/main/java/KinesisConsumer.java

Add following code to KinesisConsumer.java class

import java.io.IOException;

import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;

public class KinesisConsumer {
  public String handleRequest(KinesisEvent event) throws IOException {

    System.out.println("Record Size - " + event.getRecords().size());
    for(KinesisEventRecord rec : event.getRecords()) {
      System.out.println(new String(rec.getKinesis().getSequenceNumber()));
      System.out.println(new String(rec.getKinesis().getData().array()));
    }
    return "success";
  }
}

Build the project now to make sure you are not getting any error.

gradle clean build

Verify that fat jar has been created in distributions folder

ls build/distributions

Create AWS roles and policies

lambda diagram

Create the trust policy

AWS service role of the type AWS Lambda – This role grants AWS Lambda permissions to assume the role.

cd ../
touch config/policies/KinesisConsumer-lambda-trustpolicy.json

Add following contents to the KinesisConsumer-lambda-trustpolicy.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Create the Lambda Execution role

Create role

aws iam create-role --role-name KinesisConsumer-lambda-execution-role --assume-role-policy-document  file://config/policies/KinesisConsumer-lambda-trustpolicy.json

You will see the following output

{
    "Role": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": "sts:AssumeRole",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "lambda.amazonaws.com"
                    }
                }
            ]
        },
        "RoleId": "AROAIRJRHN62AKLSK2L3W",
        "CreateDate": "2017-07-11T06:18:09.277Z",
        "RoleName": "KinesisConsumer-lambda-execution-role",
        "Path": "/",
        "Arn": "arn:aws:iam::YOUR_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role"
    }
}

Note down the YOUR_ACCOUNT_ID from the ARN and replace in below line

export AWS_ACCOUNT_ID=YOUR_ACCOUNT_ID

# Validate that it has been set up properly
echo AWS_ACCOUNT_ID

Create Cloudwatch Permissions Policy

touch config/policies/lambda-cloudwatch-permissionspolicy.json

Add following contents to the lambda-cloudwatch-permissionspolicy.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

Create Kinesis Permissions Policy

touch config/policies/lambda-kinesis-permissionspolicy.json

Add following contents to the lambda-kinesis-permissionspolicy.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListStreams"
            ],
            "Resource": "*"
        }
    ]
}

Attach Cloudwatch and S3 policies to the role

Embed the permissions policy (in this example an inline policy) to the role to specify what it is allowed to do.

# Attach cloudwatch write policy
aws iam put-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name lambda-cloudwatch-permissions-Policy --policy-document file://config/policies/lambda-cloudwatch-permissionspolicy.json

# Attach Kinesis Stream read policy
aws iam put-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name KinesisConsumer-kinesis-permissions-Policy --policy-document  file://config/policies/lambda-kinesis-permissionspolicy.json

Create Lambda function

aws lambda create-function \
--region us-west-2 \
--function-name KinesisConsumer \
--zip-file fileb://KinesisConsumer/build/distributions/KinesisConsumer.zip \
--role arn:aws:iam::$AWS_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role \
--handler KinesisConsumer::handleRequest \
--runtime java8 \
--timeout 30 \
--memory-size 1024


# Output

{
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "CodeSha256": "TySSxcICge5hqXTG4sQadlMeH7ZERwpv2xmy12pYxJs=",
    "FunctionName": "KinesisConsumer",
    "CodeSize": 8082643,
    "MemorySize": 1024,
    "FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
    "Version": "$LATEST",
    "Role": "arn:aws:iam::YOUR_ACCOUNT_ID:role/KinesisConsumer-lambda-execution-role",
    "Timeout": 30,
    "LastModified": "2017-07-11T06:23:32.982+0000",
    "Handler": "KinesisConsumer::handleRequest",
    "Runtime": "java8",
    "Description": ""
}

Test Lambda invocation using Kinesis mock event

Create sample kinesis input event file

touch config/test/KinesisConsumer-input.json

Add following content to KinesisConsumer-input.json

{
    "Records": [
        {
            "kinesis": {
                "partitionKey": "partitionKey-3",
                "kinesisSchemaVersion": "1.0",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
                "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
            },
            "eventSource": "aws:kinesis",
            "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
            "invokeIdentityArn": "arn:aws:iam::account-id:role/testLEBRole",
            "eventVersion": "1.0",
            "eventName": "aws:kinesis:record",
            "eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream",
            "awsRegion": "us-west-2"
        }
    ]
}

Invoke Lambda

aws lambda  invoke \
--invocation-type Event \
--function-name KinesisConsumer \
--region us-west-2 \
--payload file://config/test/KinesisConsumer-input.json \
outputfile.txt

{
    "StatusCode": 200
}
aws lambda invoke \
--invocation-type RequestResponse \
--function-name KinesisConsumer \
--region us-west-2 \
--payload file://config/test/KinesisConsumer-input.json \
 outputfile.txt

Testing with Kinesis

Create an Amazon Kinesis Stream and Associate It with Your Lambda Function

Create Kinesis Stream

aws kinesis create-stream \
--stream-name LambdaKinesisStreamExample \
--shard-count 1 \
--region us-west-2

Run the following Amazon Kinesis describe-stream AWS CLI command to get the stream ARN.

aws kinesis describe-stream \
--stream-name LambdaKinesisStreamExample \
--region us-west-2
{
    "StreamDescription": {
        "RetentionPeriodHours": 24,
        "StreamName": "LambdaKinesisStreamExample",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49574047567355003591396290706428592663441186123818205186"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "StreamStatus": "ACTIVE"
    }
}

Get the streamARN - arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample

Add an Event Source in AWS Lambda

Add an Event Source in AWS Lambda so that your lambda function can start polling the Amazon Kinesis stream

aws lambda create-event-source-mapping \
--region us-west-2 \
--function-name KinesisConsumer \
--event-source  arn:aws:kinesis:us-west-2:$AWS_ACCOUNT_ID:stream/LambdaKinesisStreamExample \
--batch-size 100 \
--starting-position TRIM_HORIZON
{
    "UUID": "1538750c-8dcb-4cd1-a4d5-a753fdb01895",
    "StateTransitionReason": "User action",
    "LastModified": 1497120187.749,
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
    "FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
    "State": "Creating",
    "LastProcessingResult": "No records processed"
}

You can get a list of event source mappings by running the following command.

aws lambda list-event-source-mappings \
--region us-west-2 \
--function-name KinesisConsumer \
--event-source arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample \
--debug
{
    "EventSourceMappings": [
        {
            "UUID": "1538750c-8dcb-4cd1-a4d5-a753fdb01895",
            "StateTransitionReason": "User action",
            "LastModified": 1497120240.0,
            "BatchSize": 100,
            "State": "Enabled",
            "FunctionArn": "arn:aws:lambda:us-west-2:YOUR_ACCOUNT_ID:function:KinesisConsumer",
            "EventSourceArn": "arn:aws:kinesis:us-west-2:YOUR_ACCOUNT_ID:stream/LambdaKinesisStreamExample",
            "LastProcessingResult": "No records processed"
        }
    ]
}

you can verify the status value is enabled.

If you disable the event source mapping, AWS Lambda stops polling the Amazon Kinesis stream. If you re-enable event source mapping, it will resume polling from the sequence number where it stopped, so each record is processed either before you disabled the mapping or after you enabled it. If the sequence number falls behind TRIM_HORIZON, when you re-enable it polling will start from TRIM_HORIZON. However, if you create a new event source mapping, polling will always start from TRIM_HORIZON, LATEST or AT_TIMESTAMP, depending on the starting position you specify. This applies even if you delete an event source mapping and create a new one with the same configuration as the deleted one.

Test the Setup

Using the following AWS CLI command, add event records to your Amazon Kinesis stream. The –data value is a base64-encoded value of the “Hello, this is a test.” string. You can run the same command more than once to add multiple records to the stream.

aws kinesis put-record \
--stream-name LambdaKinesisStreamExample \
--data "This is a test. final" \
--partition-key shardId-000000000000 \
--region us-west-2
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49574047567355003591396290707342540583069889897800138754"
}

Validate that lambda processed the events by going to AWS cloudwatch

aws logs describe-log-streams --log-group-name '/aws/lambda/KinesisConsumer' --region us-west-2 --order-by  LastEventTime --descending

# Note down the arn of the latest stream(first stream as we are using descending order) and replace in
# --log-stream-name parameter below
aws logs get-log-events --log-group-name /aws/lambda/KinesisConsumer --log-stream-name '2017/06/10/[$LATEST]b678e638f5d9423e9738ccdd8639babb'

Conclusion

You learnt following things

  • Lambda could be used as Kinesis Streams consumer
  • To allow Lambda to call Kinesis Streams, we created execution role which allowed access to cloudwatch and kinesis
  • We also created a kinesis stream and added it as event source in our lambda function
  • Lambda function gets automatically invoked

Let me know if you run into any issues.

Resource Cleanup

aws lambda delete-function \
 --function-name KinesisConsumer \
 --region us-west-2
aws iam delete-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name  lambda-cloudwatch-permissions-Policy

aws iam delete-role-policy --role-name KinesisConsumer-lambda-execution-role --policy-name  KinesisConsumer-kinesis-permissions-Policy

aws iam delete-role --role-name KinesisConsumer-lambda-execution-role
aws kinesis delete-stream \
--stream-name LambdaKinesisStreamExample \
--region us-west-2

Common Errors

Lambda function failing due to timeout

You can increase the timeout by timeout parameter

aws lambda update-function-configuration \
    --function-name KinesisConsumer  \
    --region us-west-2 \
    --timeout 30 \
    --memory-size 1024

How can I update only function code

aws lambda update-function-code \
 --function-name KinesisConsumer \
 --zip-file fileb://KinesisConsumer/build/distributions/KinesisConsumer.zip \
 --region us-west-2

Getting JSON parsing error

I was getting below error because I was using java-events-1.1.0 which depended on kms 1.11.160 which has introduced a bug to cause below issue. For a workaround, I have switched to java-events-1.3.0 which does not depend on 1.11.160

An error occurred during JSON parsing: java.lang.RuntimeException
java.lang.RuntimeException: An error occurred during JSON parsing
Caused by: java.io.UncheckedIOException: com.fasterxml.jackson.databind.JsonMappingException: Conflicting setter definitions for property "encryptionType": com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params) vs com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Conflicting setter definitions for property "encryptionType": com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params) vs com.amazonaws.services.kinesis.model.Record#setEncryptionType(1 params)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:269)

References


# Reference
AWS Kinesis documentation http://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example-deployment-pkg.html

Version History


Date Description
2017-07-04    Initial Version