In this section we’ll walkthrough how to trigger your lambda function in response to inbound email. This overview is based on the SpartaApplication sample code if you’d rather jump to the end result.
Assume that we have already verified our email domain with AWS. This allows our domain’s email to be handled by SES.
We’ve been asked to write a lambda function that logs inbound messages, including the metadata associated with the message body itself.
There is also an additional requirement to support immutable infrastructure, so our service needs to manage the S3 bucket to which message bodies should be stored. Our service cannot rely on a pre-existing S3 bucket. The infrastructure (and associated security policies) together with the application logic is coupled.
We’ll start with an empty lambda function and build up the needed functionality.
import (
spartaSES "github.com/mweagle/Sparta/v3/aws/ses"
)
func echoSESEvent(ctx context.Context, sesEvent spartaSES.Event) (*spartaSES.Event, error) {
logger, _ := ctx.Value(sparta.ContextKeyRequestLogger).(*zerolog.Logger)
configuration, configErr := sparta.Discover()
logger.Info().
Err(configErr).
Interface("Configuration", configuration).
Msg("Discovery results")
}
At this point we would normally continue processing the SES event, using Sparta types until the official events are available.
However, before moving on to the event processing, we need to take a detour into dynamic infrastructure because of the immutable infrastructure requirement.
This requirement implies that our service must be self-contained: we can’t assume that the S3 bucket already exists. How can our locally compiled code access AWS-created resources?
The immutable infrastructure requirement makes this lambda function a bit more complex. Our service needs to:
Let’s first take a look at how the SES lambda handler provisions a new S3 bucket via the MessageBodyStorage type:
func appendSESLambda(api *sparta.API,
lambdaFunctions []*sparta.LambdaAWSInfo)
[]*sparta.LambdaAWSInfo {
// Our lambda function will need to be able to read from the bucket, which
// will be handled by the S3MessageBodyBucketDecorator below
lambdaFn, _ := sparta.NewAWSLambda(sparta.LambdaName(echoSESEvent),
echoSESEvent,
sparta.IAMRoleDefinition{})
// Setup options s.t. the lambda function has time to consume the message body
lambdaFn.Options = &sparta.LambdaFunctionOptions{
Description: "",
MemorySize: 128,
Timeout: 10,
}
// Add a Permission s.t. the Lambda function automatically manages SES registration
sesPermission := sparta.SESPermission{
BasePermission: sparta.BasePermission{
// SES only supports wildcard ARNs
SourceArn: "*",
},
InvocationType: "Event",
}
// Store the message body
bodyStorage, _ := sesPermission.NewMessageBodyStorageResource("Special")
sesPermission.MessageBodyStorage = bodyStorage
The MessageBodyStorage
type (and the related MessageBodyStorageOptions type)
cause our SESPermission handler to add an S3 ReceiptRule
at the head of the rules list. This rule instructs SES to store the message body in the supplied bucket before invoking our lambda function.
The single parameter "Special"
is an application-unique literal value that is used to create a stable CloudFormation
resource identifier so that new buckets are not created in response to stack update requests.
Our SES handler then adds two ReceiptRules:
sesPermission.ReceiptRules = make([]sparta.ReceiptRule, 0)
sesPermission.ReceiptRules = append(sesPermission.ReceiptRules,
sparta.ReceiptRule{
Name: "Special",
Recipients: []string{"[email protected]"},
TLSPolicy: "Optional",
})
sesPermission.ReceiptRules = append(sesPermission.ReceiptRules,
sparta.ReceiptRule{
Name: "Default",
Recipients: []string{},
TLSPolicy: "Optional",
})
Our lambda function is required to access the message body in the dynamically created MessageBodyStorage
resource, but
the S3 resource Arn is only defined after the service is provisioned. The solution to this is to reference the
dynamically generated BucketArnAllKeys()
value in the sparta.IAMRolePrivilege
initializer:
// Then add the privilege to the Lambda function s.t. we can actually get at the data
lambdaFn.RoleDefinition.Privileges = append(lambdaFn.RoleDefinition.Privileges,
sparta.IAMRolePrivilege{
Actions: []string{"s3:GetObject", "s3:HeadObject"},
Resource: sesPermission.MessageBodyStorage.BucketArnAllKeys(),
})
The last step is to register the SESPermission
with the lambda info:
// Finally add the SES permission to the lambda function
lambdaFn.Permissions = append(lambdaFn.Permissions, sesPermission)
At this point we’ve implicitly created an S3 bucket via the MessageBodyStorage
value. Our lambda function now needs to dynamically determine the AWS-assigned bucket name.
Our echoSESEvent
function needs to determine, at execution time, the MessageBodyStorage
S3 bucket name. This is done via sparta.Discover()
:
configuration, configErr := sparta.Discover()
logger.Info().
Err(configErr).
Interface("Configuration", configuration).
Msg("Discovery results")
// The message bucket is an explicit `DependsOn` relationship, so it'll be in the
// resources map. We'll find it by looking for the dependent resource with the "AWS::S3::Bucket" type
bucketName := ""
for _, eachResourceInfo := range configuration.Resources {
if eachResourceInfo.ResourceType == "AWS::S3::Bucket" {
bucketName = eachResourceInfo.Properties["Ref"]
}
}
if "" == bucketName {
return nil, errors.Errorf("Failed to discover SES bucket from sparta.Discovery: %#v", configuration)
}
The sparta.Discover()
function returns a DiscoveryInfo structure. This
data is published into the Lambda’s environment variables to enable it to discover other
resources published in the same Stack.
The structure includes the stack’s Pseudo Parameters
as well information about any immediate resource dependencies.
Eg, those that were explicitly marked as DependsOn
. See the discovery documentation for more details.
As we only have a single dependency, our discovery filter is:
// The message bucket is an explicit `DependsOn` relationship, so it'll be in the
// resources map. We'll find it by looking for the dependent resource with the "AWS::S3::Bucket" type
bucketName := ""
for _, eachResourceInfo := range configuration.Resources {
if eachResourceInfo.ResourceType == "AWS::S3::Bucket" {
bucketName = eachResourceInfo.Properties["Ref"]
}
}
if "" == bucketName {
return nil, errors.Errorf("Failed to discover SES bucket from sparta.Discovery: %#v", configuration)
}
The rest of echoSESEvent
satisfies the other requirements, with a bit of help from the SES event types:
// Get the metdata about the item...
svc := s3.New(session.New())
for _, eachRecord := range sesEvent.Records {
logger.Info().
Str("Source", eachRecord.SES.Mail.Source).
Str("MessageID", eachRecord.SES.Mail.MessageID).
Str("BucketName", bucketName).
Msg("SES Event")
if "" != bucketName {
params := &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(eachRecord.SES.Mail.MessageID),
}
resp, err := svc.HeadObject(params)
logger.Info().
Interface("Metadata", resp).
Err(err).
Msg("SES MessageBody")
}
}
return &sesEvent, nil
With the lambdaFn
fully defined, we can provide it to sparta.Main()
and deploy our service. The workflow below is shared by all SES-triggered lambda function:
echoSESEvent
).sparta.NewAWSLambda()
LambdaAWSInfo
struct so that the lambda function is triggered.Additionally, if the SES handler needs to access the raw email message body:
sesPermission.NewMessageBodyStorageResource("Special")
value to store the message bodysesPermission.MessageBodyStorage
fieldsesPermission.[]IAMPrivilege
that includes the sesPermission.MessageBodyStorage.BucketArnAllKeys()
Arnsparta.Discover()