Event Source - DynamoDB

In this section we’ll walkthrough how to trigger your lambda function in response to DynamoDB stream events. This overview is based on the SpartaApplication sample code if you’d rather jump to the end result.

Goal

Assume that we’re given a DynamoDB stream. See below for details on how to create the stream. We’ve been asked to write a lambda function that logs when operations are performed to the table so that we can perform offline analysis.

Getting Started

We’ll start with an empty lambda function and build up the needed functionality.

func echoDynamoDBEvent(event *json.RawMessage,
                       context *sparta.LambdaContext,
                       w http.ResponseWriter,
                      logger *logrus.Logger)
{
  logger.WithFields(logrus.Fields{
    "RequestID": context.AWSRequestID,
  }).Info("Request received")
}

Unmarshalling the DynamoDB Event

Since the echoDynamoDBEvent is expected to be triggered by DynamoDB events, we will unmarshal the *json.RawMessage data into an DynamoDB-specific event provided by Sparta via:

var lambdaEvent spartaDynamoDB.Event
err := json.Unmarshal([]byte(*event), &lambdaEvent)
if err != nil {
  logger.Error("Failed to unmarshal event data: ", err.Error())
  http.Error(w, err.Error(), http.StatusInternalServerError)
}

DynamoDB events are delivered in batches, via lists of EventRecords, so we’ll need to process each record.

for _, eachRecord := range lambdaEvent.Records {
  logger.WithFields(logrus.Fields{
    "Keys":     eachRecord.DynamoDB.Keys,
    "NewImage": eachRecord.DynamoDB.NewImage,
  }).Info("DynamoDb Event")
}

That’s enough to get the data into CloudWatch Logs.

Sparta Integration

With the core of the echoDynamoDBEvent complete, the next step is to integrate the Go function with Sparta. This is performed by the appendDynamoDBLambda function. Since the echoDynamoDBEvent function doesn’t access any additional services (Sparta enables CloudWatch Logs privileges by default), the integration is pretty straightforward:

lambdaFn = sparta.NewLambda(sparta.IAMRoleDefinition{}, echoDynamoDBEvent, nil)

Event Source Mappings

If we were to deploy this Sparta application, the echoDynamoDBEvent function would have the ability to log DynamoDB stream events, but would not be invoked in response to events published by the stream. To register for notifications, we need to configure the lambda’s EventSourceMappings:

lambdaFn.EventSourceMappings = append(lambdaFn.EventSourceMappings, &lambda.CreateEventSourceMappingInput{
    EventSourceArn:   aws.String(dynamoTestStream),
    StartingPosition: aws.String("TRIM_HORIZON"),
    BatchSize:        aws.Int64(10),
    Enabled:          aws.Bool(true),
  })
lambdaFunctions = append(lambdaFunctions, lambdaFn)

The dynamoTestStream param is the ARN of the Dynamo stream that that your lambda function will poll (eg: arn:aws:dynamodb:us-west-2:000000000000:table/myDynamoDBTable/stream/2015-12-05T16:28:11.869).

The EventSourceMappings field is transformed into the appropriate CloudFormation Resource which enables automatic polling of the DynamoDB stream.

Wrapping Up

With the lambdaFn fully defined, we can provide it to sparta.Main() and deploy our service. The workflow below is shared by all DynamoDB stream based lambda functions:

  • Define the lambda function (echoDynamoDBEvent).
  • If needed, create the required IAMRoleDefinition with appropriate privileges if the lambda function accesses other AWS services.
  • Provide the lambda function & IAMRoleDefinition to sparta.NewLambda()
  • Add the necessary EventSourceMappings to the LambdaAWSInfo struct so that the lambda function is properly configured.

Other Resources


Appendix

Creating a DynamoDB Stream

To create a DynamoDB stream for a given table, follow the steps below:

Select Table

Select Table

Enable Stream

Enable Stream

Copy ARN

Copy ARN

The Latest stream ARN value is the value that should be provided as the EventSourceArn in to the Event Source Mappings.