Event Source - S3

In this section we’ll walkthrough how to trigger your lambda function in response to S3 events. This overview is based on the SpartaImager sample code if you’d rather jump to the end result.

Goal

Assume we have an S3 bucket that stores images. You’ve been asked to write a service that creates a duplicate image that includes a characteristic stamp overlay and store it in the same S3 bucket.

Getting Started

We’ll start with an empty lambda function and build up the needed functionality.

func transformImage(event *json.RawMessage,
                  context *sparta.LambdaContext,
                  w http.ResponseWriter,
                  logger *logrus.Logger)
{
  logger.WithFields(logrus.Fields{
		"RequestID": context.AWSRequestID,
		"Event":     string(*event),
	}).Info("Request received")
}

Unmarshalling the S3 Event

Since the transformImage is expected to be triggered by S3 event changes, we will unmarshal the *json.RawMessage data into an S3-specific event provided by Sparta via:

var lambdaEvent spartaS3.Event
err := json.Unmarshal([]byte(*event), &lambdaEvent)
if err != nil {
  logger.Error("Failed to unmarshal event data: ", err.Error())
  http.Error(w, err.Error(), http.StatusInternalServerError)
}

S3 events are delivered in batches, via lists of EventRecords, so we’ll need to process each record.

for _, eachRecord := range lambdaEvent.Records {
  err = nil
  switch eachRecord.EventName {
  case "ObjectCreated:Put":
    {
      err = stampImage(eachRecord.S3.Bucket.Name, eachRecord.S3.Object.Key, logger)
    }
  case "s3:ObjectRemoved:Delete":
    {
      // Delete stamped image
    }
  default:
    {
      logger.Info("Unsupported event: ", eachRecord.EventName)
    }
  }

  //
  if err != nil {
    logger.Error("Failed to process event: ", err.Error())
    http.Error(w, err.Error(), http.StatusInternalServerError)
  }
}

The stampImage function does most of the work, fetching the S3 image to memory, applying the stamp, and putting the transformed content back to S3 with a new name. It uses a simple xformed_ keyname prefix to identify items which have already been stamped & prevents an “event-storm” from being triggered. This simple approach is acceptable for an example, but in production you should use a more durable approach.

Sparta Integration

With the core of the transformImage complete, the next step is to integrate the Go function with Sparta. This is performed by the imagerFunctions source.

Our lambda function needs to both Get and Put items back to an S3 bucket, so we need an IAM Role that grants those privileges under which the function will execute:

// Provision an IAM::Role as part of this application
var iamRole = sparta.IAMRoleDefinition{}

// Setup the ARN that includes all child keys
resourceArn := fmt.Sprintf("%s/*", s3EventBroadcasterBucket)
iamRole.Privileges = append(iamRole.Privileges, sparta.IAMRolePrivilege{
  Actions: []string{"s3:GetObject",
    "s3:PutObject",
  },
  Resource: resourceArn,
})

The s3EventBroadcasterBucket param is the ARN of the S3 bucket that will trigger your lambda function (eg: arn:aws:s3:::MyImagingS3Bucket).

With the IAM Role defined, we can create the Sparta lambda function for transformImage:

// The default timeout is 3 seconds - increase that to 30 seconds s.t. the
// transform lambda doesn't fail early.
transformOptions := &sparta.LambdaFunctionOptions{
  Description: "Stamp assets in S3",
  MemorySize:  128,
  Timeout:     30,
}
lambdaFn := sparta.NewLambda(iamRole, transformImage, transformOptions)

It typically takes more than 3 seconds to apply the transform, so we increase the execution timeout and provision a new lambda function using the iamRole we defined earlier.

Event Source Registration

If we were to deploy this Sparta application, the transformImage function would have the ability to Get and Put back to the s3EventBroadcasterBucket, but would not be invoked in response to events triggered by that bucket. To register for state change events, we need to configure the lambda’s Permissions:

//////////////////////////////////////////////////////////////////////////////
// S3 configuration
//
lambdaFn.Permissions = append(lambdaFn.Permissions, sparta.S3Permission{
  BasePermission: sparta.BasePermission{
    SourceArn: s3EventBroadcasterBucket,
  },
  Events: []string{"s3:ObjectCreated:*", "s3:ObjectRemoved:*"},
})
lambdaFunctions = append(lambdaFunctions, lambdaFn)

When Sparta generates the CloudFormation template, it scans for Permission configurations. For push based sources like S3, Sparta uses that service’s APIs to register your lambda function as a publishing target for events. This remote registration is handled automatically by CustomResources added to the CloudFormation template.

Wrapping Up

With the lambdaFn fully defined, we can provide it to sparta.Main() and deploy our service. The workflow below is shared by all S3-triggered lambda functions:

  • Define the lambda function (transformImage).
  • Implement the associated business logic (stampImage).
  • If needed, create the required IAMRoleDefinition with appropriate privileges.
  • Provide the lambda function & IAMRoleDefinition to sparta.NewLambda()
  • Add the necessary Permissions to the LambdaAWSInfo struct so that the lambda function is triggered.

The SpartaImager repo contains the full code, and includes API Gateway support that allows you to publicly fetch the stamped image via an expiring S3 URL.

Other Resources