Connecting SNS to a lambda function using CloudFormation

We are using Amazon CloudFormation to configure our infrastructure as code. We are doing video processing using a Lambda function triggered by a message in an SNS queue. The documentation on how to do this in CloudFormation is fairly poor. In this article I will show some troposphere code that shows how to do this.

First create the lambda function. This one just logs its invocation event into the cloudwatch logs:

def create_lambda(self):
    t = self.template
 
    code = [
        "exports.handler = function(event, context) {" +
        "    console.log(\"event: \", JSON.stringify(event, null, 4));" +
        "    context.succeed(\"success\");" +
        "}"
    ]
 
    return t.add_resource(Function(
        "LambdaFunction",
        Code=Code(
            ZipFile=Join("", code)
        ),
        Handler="index.handler",
        Role=GetAtt("LambdaExecutionRole", "Arn"),
        Runtime="nodejs4.3",
    ))

Creating the SNS topic is straightforward:

def subscribe_lambda_to_topic(self, topic, function):
    topic.Subscription = [Subscription(
        Protocol="lambda",
        Endpoint=GetAtt(function, "Arn")
    )]

The most complicated, and least well documented, part of the configuration is to give the relevant authorisations. The lambda function itself will require authorisation to use any resources it needs. In this example it is given authority to log to cloudwatch. It is also necessary to set a lambda permission that allows SNS to invoke the lambda in response to a message on the topic. Note that this permission is not a normal IAM role and policy, but something specific to lambda.

def give_permission_to_lambda(self, topic, function):
    t = self.template
 
    # This role gives the lambda the permissions it needs during execution
    lambda_execution_role = t.add_resource(Role(
        "LambdaExecutionRole",
        Path="/",
        AssumeRolePolicyDocument={"Version": "2012-10-17", "Statement": [
            {
                "Action": ["sts:AssumeRole"],
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "lambda.amazonaws.com",
                    ]
                }
            }
        ]},
    ))
 
    lambda_execution_policy = t.add_resource(PolicyType(
        "LambdaExecutionPolicy",
        PolicyName="LambdaExecutionPolicy",
        PolicyDocument={
            "Version": "2012-10-17", "Statement": [
                {"Resource": "arn:aws:logs:*:*:*",
                 "Action": ["logs:*"],
                 "Effect": "Allow",
                 "Sid": "logaccess"}]},
        Roles=[Ref(lambda_execution_role)]
    ))
 
    t.add_resource(Permission(
        "InvokeLambdaPermission",
        FunctionName=GetAtt(function, "Arn"),
        Action="lambda:InvokeFunction",
        SourceArn=Ref(topic),
        Principal="sns.amazonaws.com"
    ))

Leave a Reply