The API V2 Gateway service provides a way to expose a WebSocket API that is supported by a set of Lambda functions. The AWS blog post supplies an excellent overview of the pros and cons of this approach that enables a near real time, pushed-based application. This section will provide an overview of how to configure a WebSocket API using Sparta. It is based on the SpartaWebSocket sample project.
Similar to the AWS blog post, our WebSocket API will transmit messages of the form
{
"message": "sendmessage",
"data": "hello world !"
}
We’ll use ws to test the API from the command line.
The Sparta service consists of three lambda functions:
connectWorld(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)
disconnectWorld(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)
sendMessage(context.Context, awsEvents.APIGatewayWebsocketProxyRequest) (*wsResponse, error)
Our functions will use the PROXY style integration and therefore accept an instance of the APIGatewayWebsocketProxyRequest type.
Each function returns a *wsResponse
instance that satisfies the PROXY response:
type wsResponse struct {
StatusCode int `json:"statusCode"`
Body string `json:"body"`
}
The connectWorld
AWS Lambda function is responsible for saving the incoming connectionID into a dynamically provisioned DynamoDB database so that subsequent sendMessage requests can broadcast to all subscribed parties.
The table name is advertised in the Lambda function via a user-defined environment variable. The specifics of how that table is provisioned will be addressed in a section below.
...
// Operation
putItemInput := &dynamodb.PutItemInput{
TableName: aws.String(os.Getenv(envKeyTableName)),
Item: map[string]*dynamodb.AttributeValue{
ddbAttributeConnectionID: &dynamodb.AttributeValue{
S: aws.String(request.RequestContext.ConnectionID),
},
},
}
_, putItemErr := dynamoClient.PutItem(putItemInput)
...
The complement to connectWorld
is disconnectWorld
which is responsible for removing the connectionID from the list of registered connections:
delItemInput := &dynamodb.DeleteItemInput{
TableName: aws.String(os.Getenv(envKeyTableName)),
Key: map[string]*dynamodb.AttributeValue{
ddbAttributeConnectionID: &dynamodb.AttributeValue{
S: aws.String(connectionID),
},
},
}
_, delItemErr := ddbService.DeleteItem(delItemInput)
With the connectWorld
and disconnectWorld
connection management functions created, the core of the WebSocket API is sendMessage
. This function is responsible for scanning over the set of registered connectionIDs and forwarding a request to PostConnectionWithContext. This function sends the message to the registered connections.
The sendMessage
function can be broken down into a few sections.
The first requirement is to setup the API Gateway Management service instance using the proper endpoint. The endpoint can be constructed from the incoming APIGatewayWebsocketProxyRequestContext member of the request.
endpointURL := fmt.Sprintf("%s/%s",
request.RequestContext.DomainName,
request.RequestContext.Stage)
logger.WithField("Endpoint", endpointURL).Info("API Gateway Endpoint")
dynamoClient := dynamodb.New(sess)
apigwMgmtClient := apigwManagement.New(sess, aws.NewConfig().WithEndpoint(endpointURL))
The new step is to unmarshal and validate the incoming JSON request body:
// Get the input request...
var objMap map[string]*json.RawMessage
unmarshalErr := json.Unmarshal([]byte(request.Body), &objMap)
if unmarshalErr != nil || objMap["data"] == nil {
return &wsResponse{
StatusCode: 500,
Body: "Failed to unmarshal request: " + unmarshalErr.Error(),
}, nil
}
Once we have verified that the input is valid, the final step is to notify all the subscribers.
Once the incoming data
property is validated, the next step is to scan the DynamoDB table for the registered connections and post a message to each one. Note that the scan callback also attempts to cleanup connections that are no longer valid, but which haven’t been cleanly removed.
scanCallback := func(output *dynamodb.ScanOutput, lastPage bool) bool {
// Send the message to all the clients
for _, eachItem := range output.Items {
// Get the connectionID
receiverConnection := ""
if eachItem[ddbAttributeConnectionID].S != nil {
receiverConnection = *eachItem[ddbAttributeConnectionID].S
}
// Post to this connectionID
postConnectionInput := &apigwManagement.PostToConnectionInput{
ConnectionId: aws.String(receiverConnection),
Data: *objMap["data"],
}
_, respErr := apigwMgmtClient.PostToConnectionWithContext(ctx, postConnectionInput)
if respErr != nil {
if receiverConnection != "" &&
strings.Contains(respErr.Error(), apigwManagement.ErrCodeGoneException) {
// Cleanup in case the connection is stale
go deleteConnection(receiverConnection, dynamoClient)
} else {
logger.WithField("Error", respErr).Warn("Failed to post to connection")
}
}
return true
}
return true
}
// Scan the connections table
scanInput := &dynamodb.ScanInput{
TableName: aws.String(os.Getenv(envKeyTableName)),
}
scanItemErr := dynamoClient.ScanPagesWithContext(ctx,
scanInput,
scanCallback)
...
These three functions are the core of the WebSocket service.
The next step is to create the API V2 API object which is comprised of:
There is one Stage and one API per service, but a given service (including this one) may include multiple Routes.
// APIv2 Websockets
stage, _ := sparta.NewAPIV2Stage("v1")
stage.Description = "New deploy!"
apiGateway, _ := sparta.NewAPIV2(sparta.Websocket,
"sample",
"$request.body.message",
stage)
The NewAPIV2
creation function requires:
sparta.Websocket
)sample
)Once the API is defined, each route is associated with the API as in:
apiv2ConnectRoute, _ := apiGateway.NewAPIV2Route("$connect",
lambdaConnect)
apiv2ConnectRoute.OperationName = "ConnectRoute"
...
apiv2SendRoute, _ := apiGateway.NewAPIV2Route("sendmessage",
lambdaSend)
apiv2SendRoute.OperationName = "SendRoute"
...
The $connect
routeKey is a special route key value that is sent when a client first connects to the WebSocket API. See the official documentation for more information.
In comparison, the sendmessage
routeKey value of sendmessage
means that a payload of the form:
{
"message": "sendmessage",
"data": "hello world !"
}
will trigger the lambdaSend
function given the parent API’s route selection expression of $request.body.message
.
Because the lambdaSend
function also needs to invoke the API Gateway Management APIs to broadcast, an additional IAM Privilege must be enabled:
var apigwPermissions = []sparta.IAMRolePrivilege{
{
Actions: []string{"execute-api:ManageConnections"},
Resource: gocf.Join("",
gocf.String("arn:aws:execute-api:"),
gocf.Ref("AWS::Region"),
gocf.String(":"),
gocf.Ref("AWS::AccountId"),
gocf.String(":"),
gocf.Ref(apiGateway.LogicalResourceName()),
gocf.String("/*")),
},
}
lambdaSend.RoleDefinition.Privileges = append(lambdaSend.RoleDefinition.Privileges, apigwPermissions...)
The final configuration step is to use the API gateway to create an instance of the APIV2GatewayDecorator
. This decorator is responsible for:
wss://...
URL to the Stack’s Outputs.The decorator is created by a call to NewConnectionTableDecorator
which accepts:
For instance:
decorator, _ := apiGateway.NewConnectionTableDecorator(envKeyTableName,
ddbAttributeConnectionID,
5,
5)
var lambdaFunctions []*sparta.LambdaAWSInfo
lambdaFunctions = append(lambdaFunctions,
lambdaConnect,
lambdaDisconnect,
lambdaSend)
decorator.AnnotateLambdas(lambdaFunctions)
With everything defined, provide the API V2 Decorator as a Workflow hook as in:
// Set everything up and run it...
workflowHooks := &sparta.WorkflowHooks{
ServiceDecorators: []sparta.ServiceDecoratorHookHandler{decorator},
}
err := sparta.MainEx(awsName,
"Sparta application that demonstrates API v2 Websocket support",
lambdaFunctions,
apiGateway,
nil,
workflowHooks,
false)
and then provision the application:
go run main.go provision --s3Bucket $S3_BUCKET --noop
INFO[0000] ════════════════════════════════════════════════
INFO[0000] ╔═╗╔═╗╔═╗╦═╗╔╦╗╔═╗ Version : 1.9.4
INFO[0000] ╚═╗╠═╝╠═╣╠╦╝ ║ ╠═╣ SHA : cfd44e2
INFO[0000] ╚═╝╩ ╩ ╩╩╚═ ╩ ╩ ╩ Go : go1.12.6
INFO[0000] ════════════════════════════════════════════════
INFO[0000] Service: SpartaWebSocket-123412341234 LinkFlags= Option=provision UTC="2019-07-25T05:26:57Z"
INFO[0000] ════════════════════════════════════════════════
INFO[0000] Using `git` SHA for StampedBuildID Command="git rev-parse HEAD" SHA=6b26f8e645e9d58c1b678e46576e19bbc29886c0
INFO[0000] Provisioning service BuildID=6b26f8e645e9d58c1b678e46576e19bbc29886c0 CodePipelineTrigger= InPlaceUpdates=false NOOP=false Tags=
INFO[0000] Verifying IAM Lambda execution roles
INFO[0000] IAM roles verified Count=3
INFO[0000] Checking S3 versioning Bucket=weagle VersioningEnabled=true
INFO[0000] Checking S3 region Bucket=weagle Region=us-west-2
INFO[0000] Running `go generate`
INFO[0000] Compiling binary Name=Sparta.lambda.amd64
INFO[0002] Creating code ZIP archive for upload TempName=./.sparta/SpartaWebSocket_123412341234-code.zip
INFO[0002] Lambda code archive size Size="23 MB"
INFO[0002] Uploading local file to S3 Bucket=weagle Key=SpartaWebSocket-123412341234/SpartaWebSocket_123412341234-code.zip Path=./.sparta/SpartaWebSocket_123412341234-code.zip Size="23 MB"
INFO[0011] Calling WorkflowHook ServiceDecoratorHook= WorkflowHookContext="map[]"
INFO[0011] Uploading local file to S3 Bucket=weagle Key=SpartaWebSocket-123412341234/SpartaWebSocket_123412341234-cftemplate.json Path=./.sparta/SpartaWebSocket_123412341234-cftemplate.json Size="14 kB"
INFO[0011] Creating stack StackID="arn:aws:cloudformation:us-west-2:123412341234:stack/SpartaWebSocket-123412341234/d8a405b0-ae9c-11e9-a05a-0a1528792fce"
INFO[0122] CloudFormation Metrics ▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬
...
INFO[0122] Stack Outputs ▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬▬
INFO[0122] APIGatewayURL Description="API Gateway Websocket URL" Value="wss://gu4vmnia27.execute-api.us-west-2.amazonaws.com/v1"
INFO[0122] Stack provisioned CreationTime="2019-07-25 05:27:08.687 +0000 UTC" StackId="arn:aws:cloudformation:us-west-2:123412341234:stack/SpartaWebSocket-123412341234/d8a405b0-ae9c-11e9-a05a-0a1528792fce" StackName=SpartaWebSocket-123412341234
INFO[0122] ════════════════════════════════════════════════
INFO[0122] SpartaWebSocket-123412341234 Summary
INFO[0122] ════════════════════════════════════════════════
INFO[0122] Verifying IAM roles Duration (s)=0
INFO[0122] Verifying AWS preconditions Duration (s)=0
INFO[0122] Creating code bundle Duration (s)=1
INFO[0122] Uploading code Duration (s)=9
INFO[0122] Ensuring CloudFormation stack Duration (s)=112
INFO[0122] Total elapsed time Duration (s)=122
With the API Gateway deployed, the last step is to test it. Download and install the [ws](go get -u github.com/hashrocket/ws ) tool:
go get -u github.com/hashrocket/ws
then connect to your new API and send a message as in:
22:31 $ ws wss://gu4vmnia27.execute-api.us-west-2.amazonaws.com/v1
> {"message":"sendmessage", "data":"hello world !"}
< "hello world !"
You can also send messages with Firecamp, a Chrome extension, and send messages between your ws
session and the web (or vice versa).
While a production ready application would likely need to include authentication and authorization, this is the beginnings of a full featured WebSocket service in fewer than 200 lines of application code:
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Go 1 21 52 183
Markdown 1 0 2 0
-------------------------------------------------------------------------------
TOTAL 2 21 54 183
-------------------------------------------------------------------------------
Remember to terminate the stack when you’re done to avoid any unintentional costs!