Event Sources - Kinesis

In this section we’ll walkthrough how to trigger your lambda function in response to Amazon Kinesis streams. This overview is based on the SpartaApplication sample code if you’d rather jump to the end result.

Goal

The goal of this example is to provision a Sparta lambda function that logs Amazon Kinesis events to CloudWatch logs.

Getting Started

We’ll start with an empty lambda function and build up the needed functionality.

func echoKinesisEvent(event *json.RawMessage,
                      context *sparta.LambdaContext,
                      w http.ResponseWriter,
                      logger *logrus.Logger)
{
  logger.WithFields(logrus.Fields{
		"RequestID": context.AWSRequestID,
		"Event":     string(*event),
	}).Info("Request received")

For this sample all we’re going to do is unmarshal the Kinesis event to a Sparta kinesis event and log the id to CloudWatch Logs:

var lambdaEvent spartaKinesis.Event
  err := json.Unmarshal([]byte(*event), &lambdaEvent)
  if err != nil {
    logger.Error("Failed to unmarshal event data: ", err.Error())
    http.Error(w, err.Error(), http.StatusInternalServerError)
  }
  for _, eachRecord := range lambdaEvent.Records {
    logger.WithFields(logrus.Fields{
      "EventID": eachRecord.EventID,
    }).Info("Kinesis Event")
  }
}

With the function defined let’s register it with Sparta.

Sparta Integration

First we wrap the Go function in a LambdaAWSInfo struct:

lambdaFn := sparta.NewLambda(sparta.IAMRoleDefinition{}, echoKinesisEvent, nil)

Since our lambda function doesn’t access any other AWS Services, we can use an empty IAMRoleDefinition (sparta.IAMRoleDefinition{}).

Event Source Registration

Then last step is to configure our AWS Lambda function with Kinesis as the EventSource

lambdaFn.EventSourceMappings = append(lambdaFn.EventSourceMappings, &lambda.CreateEventSourceMappingInput{
  EventSourceArn:   aws.String(kinesisTestStream),
  StartingPosition: aws.String("TRIM_HORIZON"),
  BatchSize:        aws.Int64(100),
  Enabled:          aws.Bool(true),
})

The kinesisTestStream parameter is the Kinesis stream ARN (eg: arn:aws:kinesis:us-west-2:123412341234:stream/kinesisTestStream) whose events will trigger lambda execution.

Wrapping Up

With the lambdaFn fully defined, we can provide it to sparta.Main() and deploy our service. The workflow below is shared by all Kinesis-triggered lambda functions:

  • Define the lambda function (echoKinesisEvent).
  • If needed, create the required IAMRoleDefinition with appropriate privileges if the lambda function accesses other AWS services.
  • Provide the lambda function & IAMRoleDefinition to sparta.NewLambda()
  • Add the necessary EventSourceMappings to the LambdaAWSInfo struct so that the lambda function is properly configured.

Notes

  • The Kinesis stream and the AWS Lambda function must be provisioned in the same region.
  • The AWS docs have an excellent Kinesis EventSource walkthrough.