Handling Asynchronous Workflow-Driven pipeline with AWS CodePipeline and AWS Lambda

By April 20, 2020 May 27th, 2020 AWS, Blogs, Cloud, Cloud Assessment, Data pipeline

Written by Praful Tamrakar Senior Cloud Engineer, Powerupcloud Technologies

Most of the AWS customers use AWS lambda widely for performing almost every task, especially its a very handy tool when it comes to customizing the way your pipeline works. If we are talking about pipelines, then AWS Lambda is a service that can be directly integrated with AWS CodePipeline. And the combination of these two services make it possible for AWS customers to successfully automate various tasks, including infrastructure provisioning, blue/green deployments, serverless deployments, AMI baking, database provisioning, and deal with asynchronous behavior.

Problem Statement :

Our customer has a requirement to trigger and monitor the status of the Step Function state machine, which is a long-running asynchronous process. The customer is using the AWS Step Function to run the ETL jobs with the help of AWS Glue jobs and AWS EMR. We proposed to achieve this with Lambda but lambda has a limitation of its timeout i.e. 15 min. Now the real problem is that such an asynchronous process needs to continue and succeed even if it exceeds a fifteen-minute runtime (a limit in Lambda).

Here in this blog we have a solution in which we have figured out how we can solve and automate this approach, with the combination of lambda and AWS CodePipeline with Continuous token.

Assumptions :

This blog assumes you are familiar with AWS CodePipeline and AWS Lambda and know how to create pipelines, functions, Glue jobs and the IAM policies and roles on which they depend.

Pre-requisites:

  1. Glue jobs has already been configured
  2. A StepFunction StateMachine configured to run  Glue Jobs.
  3. CodeCommit repository for Glue scripts

Solution :

In this blog post, we discuss how a CodePipeline action can trigger a Step Functions state machine and how the pipeline and the state machine are kept decoupled through a Lambda function.

The source code for the sample pipeline, pipeline actions, and state machine used in this post is available at https://github.com/powerupcloud/lambdacodepipeline.git.

The below diagram highlights the CodePipeline-StepFunctions integration that will be described in this post. The pipeline contains two stages: a Source stage represented by a CodeCommit Git repository and a DEV stage with CodeCommit, CodeBuild and Invoke Lambda actions that represent the workflow-driven action.

The Steps involved  in the CI/CD pipeline:

  1. Developers commit AWS Glue job’s Code in the SVC (AWS CodeCommit)
  2. The AWS CodePipeline in the Tools Account gets triggered due to step
  3. The Code build steps involve multiple things as mentioned below
    • Installations of dependencies and packages needed
    • Copying the Glue and EMR jobs to S3 location where the Glue jobs will pick the script from.
  4. CHECK_OLD_SFN: The Lambda is invoked to ensure that the Previous Step function execution is not still in a running state before we run the actual Step function. Please find below the process.
    • This action invokes a Lambda function (1).
    • In (2) Lamba Checks the State Machine  Status, which returns a Step Functions State Machine status.
    • In (3) The lambda gets the execution state of the State Machine ( RUNNING || COMPLETED || TIMEOUT )
    • In (4) The Lambda function sends a continuation token back to the pipeline

If The State Machine State is RUNNING in Seconds later, the pipeline invokes the Lambda function again (4), passing the continuation token received. The Lambda function checks the execution state of the state machine and communicates the status to the pipeline. The process is repeated until the state machine execution is complete.

Else (5) Lambda  sends a Job completion token  and completes the pipeline stage.

  1.  TRIGGER_SFN_and_CONTINUE : Invoking Lambda to execute the new Step function execution and Check the status of the new execution. Please find below the process.
    • This action invokes a Lambda function (1) called the State Machine, which, in turn, triggers a Step Functions State Machine to process the request (2).
    • The Lambda function sends a continuation token back to the pipeline (3) to continue its execution later and terminates.
    • Seconds later, the pipeline invokes the Lambda function again (4), passing the continuation token received. The Lambda function checks the execution state of the state machine (5,6) and communicates the status to the pipeline. The process is repeated until the state machine execution is complete.
    • Then the Lambda function notifies the pipeline that the corresponding pipeline action is complete (7). If the state machine has failed, the Lambda function will then fail the pipeline action and stop its execution (7). While running, the state machine triggers various Glue Jobs to perform ETL operations. The state machine and the pipeline are fully decoupled. Their interaction is handled by the Lambda function.
  2. Approval to the Higher Environment. In this stage, we Add a Manual Approval Action to a Pipeline in CodePipeline. Which can be implemented using https://docs.aws.amazon.com/codepipeline/latest/userguide/approvals-action-add.html

Deployment Steps :

Step 1: Create a Pipeline

  1. Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  2. On the Welcome page, Getting started page, or the Pipelines page, choose Create pipeline.
  3. In Choose pipeline settings, in Pipeline name, enter the pipeline name.
  4. In-Service role, do one of the following:
    • Choose a New service role to allow CodePipeline to create a new service role in IAM.
    • Choose the Existing service role to use a service role already created in IAM. In Role name, choose your service role from the list.
  5. Leave the settings under Advanced settings at their defaults, and then choose Next.

6. In the Add source stage, in Source provider, Choose Source Provider as CodeCommit.

7. Provide Repository name and Branch Name

8. In Change detection options Choose AWS CodePipeline

9. In Add build stage,  in Build provider choose AWS CodeBuild, choose the Region

10. Select the existing Project name or Create project

11. You Can add Environment Variables, which you may use in buildspec.yaml file , and click Next

NOTE: The build Step has a very special reason. Here we copy the glue script from SVC (AWS CodeCommit ) to the S3 bucket, from where the Glue job picks its script to execute in its next execution.

12. Add deploy stage, Skip deploy Stage.

13. Now Finally click Create Pipeline.

Step 2: Create the CHECK OLD SFN LAMBDA Lambda Function

  1. Create the execution role
  • Sign in to the AWS Management Console and open the IAM console

Choose Policies, and then choose Create Policy. Choose the JSON tab, and then paste the following policy into the field.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "states:*",
                "codepipeline:PutJobFailureResult",
                "codepipeline:PutJobSuccessResult"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:*",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
  • Choose Review policy.
  • On the Review policy page, in Name, type a name for the policy (for example, CodePipelineLambdaExecPolicy). In Description, enter Enables Lambda to execute code.
  • Choose Create Policy.
  • On the policy dashboard page, choose Roles, and then choose to Create role.
  • On the Create role page, choose AWS service. Choose Lambda, and then choose Next: Permissions.
  • On the Attach permissions policies page, select the checkbox next to CodePipelineLambdaExecPolicy, and then choose Next: Tags. Choose Next: Review.
  • On the Review page, in Role name, enter the name, and then choose to Create role.

2. Create the CHECK_OLD_SFN_LAMBDA Lambda function to use with CodePipeline

  • Open the Lambda console and choose the Create function.
  • On the Create function page, choose Author from scratch. In the Function name, enter a name for your Lambda function (for example, CHECK_OLD_SFN_LAMBDA ) .
  • In Runtime, choose Python 2.7.
  • Under Role, select Choose an existing role. In Existing role, choose your role you created earlier, and then choose the Create function.
  • The detail page for your created function opens.
  • Copy the check_StepFunction.py code into the Function code box
  • In Basic settings, for Timeout, replace the default of 3 seconds with 5 Min.
  • Choose Save.

3. Create the TRIGGER_SFN_and_CONTINUE Lambda function to use with CodePipeline

  • Open the Lambda console and choose the Create function.
  • On the Create function page, choose Author from scratch. In Function name, enter a name for your Lambda function (for example, TRIGGER_SFN_and_CONTINUE ) .
  • In Runtime, choose Python 2.7.
  • Under Role, select Choose an existing role. In Existing role, choose your role you created earlier, and then choose the Create function.
  • The detail page for your created function opens.
  • Copy the trigger_StepFunction.py code into the Function code box
  • In Basic settings, for Timeout, replace the default of 3 seconds with 5 Min.
  • Choose Save.

Step 3: Add the CHECK OLD SFN LAMBDA, Lambda Function to a Pipeline in the CodePipeline Console

In this step, you add a new stage to your pipeline, and then add a Lambda action that calls your function to that stage.

To add stage

  • Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  • On the Welcome page, choose the pipeline you created.
  • On the pipeline view page, choose Edit.
  • On the Edit page, choose + Add stage to add a stage after the Build stage with thaction. Enter a name for the stage (for example, CHECK_OLD_SFN_LAMBDA ), and choose Add stage.
  • Choose + Add action group. In Edit action, in Action name, enter a name for your Lambda action (for example, CHECK_OLD_SFN_LAMBDA ). In Provider, choose AWS Lambda. In Function name, choose or enter the name of your Lambda function (for example, CHECK_OLD_SFN_LAMBDA )
  • In UserParameters, you must provide a JSON string with a parameter: { “stateMachineARN”: “<ARN_OF_STATE_MACHINE>” } EG: 
  • choose Save.

Step 4: Add the TRIGGER_SFN_and_CONTINUE  Lambda Function to a Pipeline in the CodePipeline Console

In this step, you add a new stage to your pipeline, and then add a Lambda action that calls your function to that stage.

To add a stage

  • Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  • On the Welcome page, choose the pipeline you created.
  • On the pipeline view page, choose Edit.
  • On the Edit page, choose + Add stage to add a stage after the Build stage with thaction. Enter a name for the stage (for example, TRIGGER_SFN_and_CONTINUE ), and choose Add stage.
  • Choose + Add action group. In Edit action, in Action name, enter a name for your Lambda action (for example, TRIGGER_SFN_and_CONTINUE ). In Provider, choose AWS Lambda. In Function name, choose or enter the name of your Lambda function (for example, TRIGGER_SFN_and_CONTINUE )
  • In UserParameters, you must provide a JSON string with a parameter: { “stateMachineARN”: “<ARN_OF_STATE_MACHINE>” }
  • choose Save.

Step 5: Test the Pipeline with the Lambda function

  • To test the function, release the most recent change through the pipeline.
  • To use the console to run the most recent version of an artifact through a pipeline
  • On the pipeline details page, choose Release change. This runs the most recent revision available in each source location specified in a source action through the pipeline.
  • When the Lambda action is complete, choose the Details link to view the log stream for the function in Amazon CloudWatch, including the billed duration of the event. If the function failed, the CloudWatch log provides information about the cause.

Example JSON Event

The following example shows a sample JSON event sent to Lambda by CodePipeline. The structure of this event is similar to the response to the GetJobDetails API, but without the actionTypeId and pipelineContext data types. Two action configuration details, FunctionName and UserParameters, are included in both the JSON event and the response to the GetJobDetails API. The values in green text are examples or explanations, not real values.

{
    "CodePipeline.job": {
        "id": "11111111-abcd-1111-abcd-111111abcdef",
        "accountId": "111111111111",
        "data": {
            "actionConfiguration": {
                "configuration": {
                    "FunctionName": "MyLambdaFunctionForAWSCodePipeline",
                    "UserParameters": "some-input-such-as-a-URL"
                }
            },
            "inputArtifacts": [
                {
                    "location": {
                        "s3Location": {
                            "bucketName": "s3-bucket-name",
                            "objectKey": "for example CodePipelineDemoApplication.zip"
                        },
                        "type": "S3"
                    },
                    "revision": null,
                    "name": "ArtifactName"
                }
            ],
            "outputArtifacts": [],
            "artifactCredentials": {
                "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w
0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvwdasdadasljdajldlakslkdjakjdkaljdaljdasljdaljdalklakkoi9494k3k3owlkeroieowiruwpirpdk3k23j2jk234hjl2343rrszlaEXAMPLE=",
                "accessKeyId": "AKIAIOSFODNN7EXAMPLE"
            },
            "continuationToken": "A continuation token if continuing job",
            "encryptionKey": { 
              "id": "arn:aws:kms:us-west-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab",
              "type": "KMS"
            }
        }
    }
}

Conclusion

In this blog post, we discussed how a Lambda function can be used fully to decouple the pipeline and the state machine and manage their interaction. We also learned how asynchronous processes that need to continue and succeed, even if it exceeds a fifteen-minute runtime (a limit in Lambda) are handled using Continuous Token.

Please Visit our Blogs for more interesting articles.

Leave a Reply