API V2 Gateway

The API V2 Gateway service provides a way to expose a WebSocket API that is supported by a set of Lambda functions. The AWS blog post supplies an excellent overview of the pros and cons of this approach that enables a near real time, pushed-based application. This section will provide an overview of how to configure a WebSocket API using Sparta. It is based on the SpartaWebSocket sample project.

Payload

Similar to the AWS blog post, our WebSocket API will transmit messages of the form

{
  "message": "sendmessage",
  "data": "hello world !"
}

We’ll use ws to test the API from the command line.

Routes

The Sparta service consists of three lambda functions:

  • connectWorld(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)
  • disconnectWorld(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)
  • sendMessage(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)

Our functions will use the PROXY style integration and therefore accept an instance of the APIGatewayWebsocketProxyRequest type.

Each function returns a *wsResponse instance that satisfies the PROXY response:

type wsResponse struct {
  StatusCode int    `json:"statusCode"`
  Body       string `json:"body"`
}

connectWorld

The connectWorld AWS Lambda function is responsible for saving the incoming connectionID into a dynamically provisioned DynamoDB database so that subsequent sendMessage requests can broadcast to all subscribed parties.

The table name is advertised in the Lambda function via a user-defined environment variable. The specifics of how that table is provisioned will be addressed in a section below.

...
// Operation
putItemInput := &dynamodb.PutItemInput{
  TableName: aws.String(os.Getenv(envKeyTableName)),
  Item: map[string]*dynamodb.AttributeValue{
    ddbAttributeConnectionID: &dynamodb.AttributeValue{
      S: aws.String(request.RequestContext.ConnectionID),
     },
  },
}
_, putItemErr := dynamoClient.PutItem(putItemInput)
...

disconnectWorld

The complement to connectWorld is disconnectWorld which is responsible for removing the connectionID from the list of registered connections:

  delItemInput := &dynamodb.DeleteItemInput{
    TableName: aws.String(os.Getenv(envKeyTableName)),
    Key: map[string]*dynamodb.AttributeValue{
      ddbAttributeConnectionID: &dynamodb.AttributeValue{
        S: aws.String(connectionID),
      },
    },
  }
  _, delItemErr := ddbService.DeleteItem(delItemInput)

sendMessage

With the connectWorld and disconnectWorld connection management functions created, the core of the WebSocket API is sendMessage. This function is responsible for scanning over the set of registered connectionIDs and forwarding a request to PostConnectionWithContext. This function sends the message to the registered connections.

The sendMessage function can be broken down into a few sections.

Setup API Gateway Management Instance

The first requirement is to setup the API Gateway Management service instance using the proper endpoint. The endpoint can be constructed from the incoming APIGatewayWebsocketProxyRequestContext member of the request.

  endpointURL := fmt.Sprintf("%s/%s",
    request.RequestContext.DomainName,
    request.RequestContext.Stage)
  logger.WithField("Endpoint", endpointURL).Info("API Gateway Endpoint")
  dynamoClient := dynamodb.New(sess)
    apigwMgmtClient := apigwManagement.New(sess, aws.NewConfig().WithEndpoint(endpointURL))

Validate Input

The new step is to unmarshal and validate the incoming JSON request body:

  // Get the input request...
  var objMap map[string]*json.RawMessage
  unmarshalErr := json.Unmarshal([]byte(request.Body), &objMap)
  if unmarshalErr != nil || objMap["data"] == nil {
    return &wsResponse{
      StatusCode: 500,
      Body:       "Failed to unmarshal request: " + unmarshalErr.Error(),
    }, nil
  }

Once we have verified that the input is valid, the final step is to notify all the subscribers.

Scan and Publish

Once the incoming data property is validated, the next step is to scan the DynamoDB table for the registered connections and post a message to each one. Note that the scan callback also attempts to cleanup connections that are no longer valid, but which haven’t been cleanly removed.

  scanCallback := func(output *dynamodb.ScanOutput, lastPage bool) bool {
    // Send the message to all the clients
    for _, eachItem := range output.Items {
      // Get the connectionID
      receiverConnection := ""
      if eachItem[ddbAttributeConnectionID].S != nil {
        receiverConnection = *eachItem[ddbAttributeConnectionID].S
      }

      // Post to this connectionID
      postConnectionInput := &apigwManagement.PostToConnectionInput{
        ConnectionId: aws.String(receiverConnection),
        Data:         *objMap["data"],
      }
      _, respErr := apigwMgmtClient.PostToConnectionWithContext(ctx, postConnectionInput)
      if respErr != nil {
        if receiverConnection != "" &&
          strings.Contains(respErr.Error(), apigwManagement.ErrCodeGoneException) {
          // Cleanup in case the connection is stale
          go deleteConnection(receiverConnection, dynamoClient)
        } else {
          logger.WithField("Error", respErr).Warn("Failed to post to connection")
        }
      }
      return true
    }
    return true
  }

  // Scan the connections table
  scanInput := &dynamodb.ScanInput{
    TableName: aws.String(os.Getenv(envKeyTableName)),
  }
  scanItemErr := dynamoClient.ScanPagesWithContext(ctx,
    scanInput,
    scanCallback)
  ...

These three functions are the core of the WebSocket service.

API V2 Gateway Decorator

The next step is to create the API V2 API object which is comprised of:

There is one Stage and one API per service, but a given service (including this one) may include multiple Routes.

// APIv2 Websockets
stage, _ := sparta.NewAPIV2Stage("v1")
stage.Description = "New deploy!"

apiGateway, _ := sparta.NewAPIV2(sparta.Websocket,
  "sample",
  "$request.body.message",
   stage)

The NewAPIV2 creation function requires:

  • The protocol to use (sparta.Websocket)
  • The name of the API (sample)
  • The route selection expression that represents a JSONPath selection expression to map input data to the corresponding lambda function.
  • The stage

Once the API is defined, each route is associated with the API as in:

apiv2ConnectRoute, _ := apiGateway.NewAPIV2Route("$connect",
    lambdaConnect)
apiv2ConnectRoute.OperationName = "ConnectRoute"
...
apiv2SendRoute, _ := apiGateway.NewAPIV2Route("sendmessage",
    lambdaSend)
apiv2SendRoute.OperationName = "SendRoute"
...

The $connect routeKey is a special route key value that is sent when a client first connects to the WebSocket API. See the official documentation for more information.

In comparison, the sendmessage routeKey value of sendmessage means that a payload of the form:

{
  "message": "sendmessage",
  "data": "hello world !"
}

will trigger the lambdaSend function given the parent API’s route selection expression of $request.body.message.

Additional Privileges

Because the lambdaSend function also needs to invoke the API Gateway Management APIs to broadcast, an additional IAM Privilege must be enabled:

  var apigwPermissions = []sparta.IAMRolePrivilege{
    {
      Actions: []string{"execute-api:ManageConnections"},
      Resource: gocf.Join("",
        gocf.String("arn:aws:execute-api:"),
        gocf.Ref("AWS::Region"),
        gocf.String(":"),
        gocf.Ref("AWS::AccountId"),
        gocf.String(":"),
        gocf.Ref(apiGateway.LogicalResourceName()),
        gocf.String("/*")),
    },
  }
  lambdaSend.RoleDefinition.Privileges = append(lambdaSend.RoleDefinition.Privileges, apigwPermissions...)

Annotating Lambda Functions

The final configuration step is to use the API gateway to create an instance of the APIV2GatewayDecorator. This decorator is responsible for:

  • Provisioning the DynamoDB table.
  • Ensuring DynamoDB CRUD permissions for all the AWS Lambda functions.
  • Publishing the table name into the Lambda function’s Environment block.
  • Adding the WebSocket wss://... URL to the Stack’s Outputs.

The decorator is created by a call to NewConnectionTableDecorator which accepts:

  • The environment variable to populate with the dynamically assigned DynamoDB table
  • The DynamoDB attribute name to use to store the connectionID
  • The read capacity units
  • The write capacity units

For instance:

  decorator, _ := apiGateway.NewConnectionTableDecorator(envKeyTableName,
      ddbAttributeConnectionID,
      5,
      5)

  var lambdaFunctions []*sparta.LambdaAWSInfo
  lambdaFunctions = append(lambdaFunctions,
      lambdaConnect,
      lambdaDisconnect,
      lambdaSend)
  decorator.AnnotateLambdas(lambdaFunctions)

Provision

With everything defined, provide the API V2 Decorator as a Workflow hook as in:

  // Set everything up and run it...
  workflowHooks := &sparta.WorkflowHooks{
    ServiceDecorators: []sparta.ServiceDecoratorHookHandler{decorator},
  }
  err := sparta.MainEx(awsName,
    "Sparta application that demonstrates API v2 Websocket support",
    lambdaFunctions,
    apiGateway,
    nil,
    workflowHooks,
    false)

and then provision the application:

go run main.go provision --s3Bucket $S3_BUCKET --noop
INFO[0000] ════════════════════════════════════════════════
INFO[0000] ╔═╗╔═╗╔═╗╦═╗╔╦╗╔═╗   Version : 1.9.4
INFO[0000] ╚═╗╠═╝╠═╣╠╦╝ ║ ╠═╣   SHA     : cfd44e2
INFO[0000] ╚═╝╩  ╩ ╩╩╚═ ╩ ╩ ╩   Go      : go1.12.6
INFO[0000] ════════════════════════════════════════════════
INFO[0000] Service: SpartaWebSocket-123412341234         LinkFlags= Option=provision UTC="2019-07-25T05:26:57Z"
INFO[0000] ════════════════════════════════════════════════
INFO[0000] Using `git` SHA for StampedBuildID            Command="git rev-parse HEAD" SHA=6b26f8e645e9d58c1b678e46576e19bbc29886c0
INFO[0000] Provisioning service                          BuildID=6b26f8e645e9d58c1b678e46576e19bbc29886c0 CodePipelineTrigger= InPlaceUpdates=false NOOP=false Tags=
INFO[0000] Verifying IAM Lambda execution roles
INFO[0000] IAM roles verified                            Count=3
INFO[0000] Checking S3 versioning                        Bucket=weagle VersioningEnabled=true
INFO[0000] Checking S3 region                            Bucket=weagle Region=us-west-2
INFO[0000] Running `go generate`
INFO[0000] Compiling binary                              Name=Sparta.lambda.amd64
INFO[0002] Creating code ZIP archive for upload          TempName=./.sparta/SpartaWebSocket_123412341234-code.zip
INFO[0002] Lambda code archive size                      Size="23 MB"
INFO[0002] Uploading local file to S3                    Bucket=weagle Key=SpartaWebSocket-123412341234/SpartaWebSocket_123412341234-code.zip Path=./.sparta/SpartaWebSocket_123412341234-code.zip Size="23 MB"
INFO[0011] Calling WorkflowHook                          ServiceDecoratorHook= WorkflowHookContext="map[]"
INFO[0011] Uploading local file to S3                    Bucket=weagle Key=SpartaWebSocket-123412341234/SpartaWebSocket_123412341234-cftemplate.json Path=./.sparta/SpartaWebSocket_123412341234-cftemplate.json Size="14 kB"
INFO[0011] Creating stack                                StackID="arn:aws:cloudformation:us-west-2:123412341234:stack/SpartaWebSocket-123412341234/d8a405b0-ae9c-11e9-a05a-0a1528792fce"
INFO[0122] CloudFormation Metrics ▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬
...
INFO[0122] Stack Outputs ▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬
INFO[0122]     APIGatewayURL                             Description="API Gateway Websocket URL" Value="wss://gu4vmnia27.execute-api.us-west-2.amazonaws.com/v1"
INFO[0122] Stack provisioned                             CreationTime="2019-07-25 05:27:08.687 +0000 UTC" StackId="arn:aws:cloudformation:us-west-2:123412341234:stack/SpartaWebSocket-123412341234/d8a405b0-ae9c-11e9-a05a-0a1528792fce" StackName=SpartaWebSocket-123412341234
INFO[0122] ════════════════════════════════════════════════
INFO[0122] SpartaWebSocket-123412341234 Summary
INFO[0122] ════════════════════════════════════════════════
INFO[0122] Verifying IAM roles                           Duration (s)=0
INFO[0122] Verifying AWS preconditions                   Duration (s)=0
INFO[0122] Creating code bundle                          Duration (s)=1
INFO[0122] Uploading code                                Duration (s)=9
INFO[0122] Ensuring CloudFormation stack                 Duration (s)=112
INFO[0122] Total elapsed time                            Duration (s)=122

Test

With the API Gateway deployed, the last step is to test it. Download and install the [ws](go get -u github.com/hashrocket/ws ) tool:

go get -u github.com/hashrocket/ws

then connect to your new API and send a message as in:

22:31 $ ws wss://gu4vmnia27.execute-api.us-west-2.amazonaws.com/v1
> {"message":"sendmessage", "data":"hello world !"}
< "hello world !"

You can also send messages with Firecamp, a Chrome extension, and send messages between your ws session and the web (or vice versa).

Conclusion

While a production ready application would likely need to include authentication and authorization, this is the beginnings of a full featured WebSocket service in fewer than 200 lines of application code:

-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Go                               1             21             52            183
Markdown                         1              0              2              0
-------------------------------------------------------------------------------
TOTAL                            2             21             54            183
-------------------------------------------------------------------------------

Remember to terminate the stack when you’re done to avoid any unintentional costs!

References