Kinesis Firehose

There are two ways to create a Firehose Transform reactor that transforms a KinesisFirehoseEventRecord with a Lambda function:

  • NewKinesisFirehoseLambdaTransformer
    • Transform using a Lambda function
  • NewKinesisFirehoseTransformer

NewKinesisFirehoseLambdaTransformer

import (
awsEvents "github.com/aws/aws-lambda-go/events"
  spartaArchetype "github.com/mweagle/Sparta/archetype"
)
// KinesisStream reactor function
func reactorFunc(ctx context.Context,
                    record *awsEvents.KinesisFirehoseEventRecord)
                    (*awsEvents.KinesisFirehoseResponseRecord, error) {
  logger, _ := ctx.Value(sparta.ContextKeyRequestLogger).(*logrus.Entry)

  logger.WithFields(logrus.Fields{
    "Record": record,
  }).Info("Kinesis Firehose Event")

  responseRecord = &awsEvents.KinesisFirehoseResponseRecord{
    RecordID: record.RecordID,
    Result:   awsEvents.KinesisFirehoseTransformedStateOk,
    Data:     record.Data,
  }
  return responseRecord, nil
}

func main() {
  // ...
  handler := spartaArchetype.KinesisFirehoseReactorFunc(reactorFunc)
  lambdaFn, lambdaFnErr := spartaArchetype.NewKinesisFirehoseLambdaTransformer(handler,
    5*time.Minute /* Duration: recommended minimum of 1m */)
  // ...
}

This is the lowest level transformation type supported and it enables the most flexibility.

NewKinesisFirehoseTransformer

Another option for creating Kinesis Firehose Transformers is to leverage the text/template package to define a transformation template. For instance:

{{/* file: transform.template */}}
{{if eq (.Record.Data.JMESPathAsString "sector") "TECHNOLOGY"}}
{
    "region" : "{{ .Record.KinesisEventHeader.Region }}",
    "ticker_symbol" : {{ .Record.Data.JMESPath "ticker_symbol"}}
}
{{else}}
{{ KinesisFirehoseDrop }}
{{end}}

A new *sparta.LambdaAWSInfo instance can be created from transform.template as in:

func main() {
  // ...
  hooks := &sparta.WorkflowHooks{}
  reactorFunc, reactorFuncErr := archetype.NewKinesisFirehoseTransformer("transform.template",
    5*time.Minute,
    hooks)
  // ...

  var lambdaFunctions []*sparta.LambdaAWSInfo
  lambdaFunctions = append(lambdaFunctions, reactorFunc)
  err := sparta.MainEx(awsName,
    "Simple Sparta application that demonstrates core functionality",
    lambdaFunctions,
    nil,
    nil,
    hooks,
    false)
}

The template execution context includes the the following:

Data Model

  • Data (string)
    • The data available in the Kinesis Firehose Record. Values can be extracted from the Data content by either JMESPath expressions (JMESPath, JMESPathAsString, JMESPathAsFormattedString) or regexp capture groups (RegExpGroup, RegExpGroupAsString, RegExpGroupAsJSON).
    • See for more information
  • RecordID (string)
    • The specific record id being processed
  • Metadata (struct)
    • The metadata associated with the specific record being processed
  • ApproximateArrivalTimestamp (awsEvents.MilliSecondsEpochTime)
    • The time at which the record arrived
  • KinesisEventHeader (struct)
    • Metadata associated with the set of records being processed

Functions

Functions available in the template’s FuncMap include: