Category

Data pipeline

DevOps for Databases using Liquibase, Jenkins and CodeCommit

By | data, Data pipeline, DevOps | 4 Comments

Written by Arun Kumar, Associate Cloud Architect at Powerupcloud Technologies

In the Infra modernization, we are moving the complete architecture to microservices-based with CI/CD deployment methods, these techniques are suitable for any application deployments. But  most of the time Database deployments are manual efforts. Applications and databases are growing day by day. Especially the database size and the operational activities are getting complex and maintaining this by a database administrator is a bit tedious task.

 For enterprise organizations this is even more complex when it comes to managing multiple DB engines with hundreds of Databases or multi-tenant databases . Currently, below are the couple of challenges that the DBA faces indeed which are the manual activities.

  • Creating or Modifying the stored procedures, triggers and functions in the database.
  •  Altering the table in the database.
  • Rollback any Database deployment.
  • Developers need to wait for any new changes to be made in the database by DBA which increase the TAT(turn around time) to test any new features even in non-production environments.
  • Security concerns by giving access to the Database to do change and maintaining access to the database will be huge overhead.
  • With vertical scaling different DB engines of the database it is difficult to manage.

One of our Enterprise customers has all the above challenges, to overcome this we have explored various tools and come up with the strategy to use Liquibasefor deployments.  Liquibase supports standard SQL databases (SQL, MySQL, Oracle, PostgreSQL, Redshift, Snowflake, DB2, etc..) but the community is improving their support towards NoSQL databases, now it supports MongoDB, Cassandra. Liquibase will help us in versioning, deployment and rollback.

With our DevOps experience, We have integrated the both open source tools Liquibase and Jenkins automation server for continuous deployment. We can implement this solution across any cloud platform or on-premise.

Architecture

For this demonstration we will be considering the AWS platform. MS SQL is our main database, lets see how to setup a CI/CD pipeline for the database.

Pre-request:

  • Setup a sample repo in codecommit .
  • Jenkins server up and running.
  • Notification service configured in Jenkins.
  • RDS MSSQL up and running.

Setup the AWS codecommit repo:

To create a code repo in AWS CodeCommit refer the following link .

https://docs.aws.amazon.com/codecommit/latest/userguide/setting-up-https-windows.html

Integration of codecommit with Jenkins:

To trigger the webhook from the AWS CodeCommit, We needto configure the AWS SQS and SNS. please follow the link

https://github.com/riboseinc/aws-codecommit-trigger-plugin

Webhook connection from Codecommit to Jenkins, We need to install the AWS CodeCommit Trigger Plugin.

Select -> Manage Jenkins -> Manage Plugins -> Available ->  AWS CodeCommit Trigger Plugin.

  • In Jenkin, create a new freestyle project. 
  • In the Source Code Management add your CodeCommit repo url and credentials.

Jenkins -> Manage Jenkins -> Configure System -> AWS CodeCommit Trigger SQS Plugin.

Installation and configuration of Liquibase:

sudo add-apt-repository ppa:webupd8team/java
sudo apt install openjdk-8-jdk
java -version
wget https://github.com/liquibase/liquibase/releases/download/v3.8.1/liquibase-3.8.1.tar.gz
mkdir liquibase
cd liquibase/
mv liquibase-3.8.1.tar.gz /opt/liquibase/
tar -xvzf liquibase-3.8.1.tar.gz

Based on your Database, you need to download the JDBC driver(jar file) in the same location of the liquibase directory. Go through the following link.

https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15

Integration of Jenkins with Liquibase:

During the deployment Jenkins will ssh into Liquibase instance, we need to generate a ssh key pair for Jenkins user and paste the key into Liquibase server linux user. Here we have a Ubuntu user on the Liquibase server.

Prepare the deployment script in Liquibase server.

For Single database server deployment: singledb-deployment.sh

-- Script Name: singledb-deployment.sh
#!/bin/bash
set -x
GIT_COMMIT=`cat /tmp/gitcommit.txt`
sudo cp /opt/db/temp/temp.sql /opt/db/db-script.sql
old=$(sudo cat /opt/db/db-script.sql | grep   'change' | cut -d ":" -f 2)
sudo  sed -i "s/$old/$GIT_COMMIT/g" /opt/db/db-script.sql
dburl=`cat /home/ubuntu/test | head -1 | cut -d ":" -f 1`
dbname=`cat /home/ubuntu/test | head -1 | cut -d ":" -f 2`
sed  -i -e "1d" /home/ubuntu/test
sudo sh -c 'cat /home/ubuntu/test >> /opt/db/db-script.sql'
export PATH=/opt/liquibase/:$PATH
echo DB_URLs is $dburl
echo DB_Names is $dbname
for prepare in $dbname; do  liquibase --driver=com.microsoft.sqlserver.jdbc.SQLServerDriver --classpath="/opt/liquibase/mssql-jdbc-7.4.1.jre8.jar" --url="jdbc:sqlserver://$dburl:1433;databaseName=$prepare;integratedSecurity=false;" --changeLogFile="/opt/db/db-script.sql"  --username=xxxx --password=xxxxx  Update;  done
sudo rm -rf /opt/db/db-script.sql  /home/ubuntu/test /tmp/gitcommit.txt

For Multi database server deployment: Multidb-deployment.sh

--Script name: Multidb-deployment.sh
#!/bin/bash
set -x
GIT_COMMIT=`cat /tmp/gitcommit.txt`
sudo cp /opt/db/temp/temp.sql /opt/db/db-script.sql
old=$(sudo cat /opt/db/db-script.sql | grep   'change' | cut -d ":" -f 2)
sudo  sed -i "s/$old/$GIT_COMMIT/g" /opt/db/db-script.sql

csplit -sk /home/ubuntu/test '/#----#/' --prefix=/home/ubuntu/test
sed  -i -e "1d" /home/ubuntu/test01
while IFS=: read -r db_url db_name; do
echo "########"
sudo sh -c 'cat /home/ubuntu/test01 >> /opt/db/db-script.sql'
export PATH=/opt/liquibase/:$PATH
echo db_url is $db_url
echo db_name is $db_name
for prepare in $db_name; do  liquibase --driver=com.microsoft.sqlserver.jdbc.SQLServerDriver --classpath="/opt/liquibase/mssql-jdbc-7.4.1.jre8.jar" --url="jdbc:sqlserver://$db_url:1433;databaseName=$prepare;integratedSecurity=false;" --changeLogFile="/opt/db/db-script.sql"  --username=xxxx --password=xxxx  Update;  done
done < /home/ubuntu/test00
sudo rm -rf /opt/db/db-script.sql  /home/ubuntu/test* /tmp/gitcommit.txt
  • In your Jenkins Job use shell to execute the commands.
  • The file test is actually coming from your code commit repo which contains the SQL queries and SQL server information
  • Below is the example job for multiple database servers. So we used to trigger the mutlidb-deployment.sh file. If you are using single SQL server deployment use singledb-deployment.sh

Prepare sample SQL Database for  demo:

CREATE DATABASE employee;
use employee;
CREATE TABLE employees
( employee_id INT NOT NULL,
  last_name VARCHAR(30) NOT NULL,
  first_name VARCHAR(30),
  salary VARCHAR(30),
  phone INT NOT NULL,
  department VARCHAR(30),
  emp_role VARCHAR(30)
);
INSERT into [dbo].[employees] values ('1', 'kumar' ,'arun', '1000000', '9999998888', 'devops', 'architect' );
INSERT into [dbo].[employees] values ('2', 'hk' ,'guna', '5000000', '9398899434, 'cloud', 'engineer' );
INSERT into [dbo].[employees] values ('3', 'kumar' ,'manoj', '900000', '98888', 'lead', 'architect' );

Deployment 1: (for single SQL server deployment)

We are going to insert a new row using CI/CD

  • db-mssql: CodeCommit Repo
  • test: SQL server information( RDS endpoint: DBname) and SQL that we need to deploy.
  • Once we commit our code to our repository(CodeCommit). The webhook triggers the deployment

Check the SQL server to verify the row inserted:

Deployment 2: (for Multiple SQL server to deploy same SQL statements)

  • db-mssql: CodeCommit Repo
  • test: SQL server information( RDS endpoint: DBname) and SQL that we need to deploy.
  • #—-#: this is the separator for the servers and SQL queries so don’t remove this.

Deployment 3:  (for Multiple SQL server to deploy same SQL stored procedure)

  • db-mssql: CodeCommit Repo
  • test: SQL server information( RDS endpoint: DBname) and SQL that we need to deploy.
  • #—-#: this is the separator for the servers and SQL queries so don’t remove this.

Notification:

  • Once the Job is executed you will get the email notification.

Liquibase Limitations:

  • Commented messages in the function or SP will not get updated in the Database.

Conclusion:

Here we used this liquibase on AWS, so we used RDS, CodeCommit and etc. But you can use the same method to configure the automatic deployment pipeline for databases with versioning, rollback in (AWS RDS, Azure SQL Database, Google Cloud SQL, Snowflake) using open source tool Liquibase and Jenkins.

Serverless Data processing Pipeline with AWS Step Functions and Lambda

By | Blogs, data, Data Lake, Data pipeline | One Comment

Written by Arun Kumar, Associate Cloud Architect at Powerupcloud Technologies

In the traditional ETL world generally, we use our own scripts or any paid tool or Open source Data processing tool or an orchestrator to deploy our data pipeline. If the Data processing pipeline is not complex, if we use these server-based solutions then sometimes it would add additional costs(considering deploying some non-complex pipelines). In AWS we have multiple serverless solutions Lambda and Glue. But lambda has the execution time limitation and Glue is running an EMR cluster in the background, so ultimately it’ll charge you a lot. So we decided to explore AWS Step functions with Lambda which are serverless at the same time as an orchestration service that executes our process on the event bases and terminates the resources post-execution of the process. Let’s see how we can build a data pipeline with this.

Architecture Description:

  1. The Teradata server from on-prem will send the input.csv  file to the S3  bucket(data_processing folder) on schedule bases.
  2. CloudWatch Event Rule will trigger the step function in the case of PutObject in the specified S3 bucket and start processing the input file.
  3. The cleansing script placed on ECS.
  4. AWS Step function will call Lambda Function and it will trigger ECS tasks(a bunch of Python and R script).
  5. Once the cleansing is done the output file will be uploaded to the target S3 bucket.
  6. AWS lambda function will be triggered to get the output file from the target bucket and send it to the respective team.

Create a custom CloudWatch Event Rule for S3 put object operation

Choose Event Pattern -> Service Name -> S3 -> Event Type -> Object level operations -> choose put object -> give the bucket name.

  • In targets choose the step function to be triggered -> give the name of the state machine created.
  • Create a new role or existing role as a cloud watch event requires permission to send events to your step function.
  • Choose one more target to trigger the lambda function -> choose the function which we created before.

AWS Management Console and search for Step Function

  • Create a state machine
  • On the Define state machine page select Author with code snippets.
  • Give a name. Review the State machine definition and visual workflow.
  • Use the graph in the Visual Workflow pane to check that your Amazon States Language code describes your state machine correctly.
  • Create or If you have previously created an IAM role select created an IAM role.

Create an ECS with Fargate

ECS console -> choose create cluster -> choose cluster template -> Networking only -> Next -> Configure -> cluster -> give cluster name -> create

In the navigation pane, choose Task Definitions, Create new Task Definition.

On the Select compatibilities page, select the launch type that your task should use and choose Next step. Choose Fargate launch type.

For Task Definition Name, type a name for your task definition.

For Task Role, choose an IAM role that provides permissions for containers in your task to make calls to AWS API operations on your behalf.

To create an IAM role for your tasks

a.   Open the IAM console.

b.   In the navigation pane, choose Roles, Create New Role.

c.   In the Select Role Type section, for the Amazon Elastic Container Service Task Role service role, choose Select.

d.   In the Attach Policy section, select the policy to use for your tasks and then choose Next Step.

e.   For Role Name, enter a name for your role. Choose Create Role to finish.

Task execution IAM role, either select your task execution role or choose to Create a new role so that the console can create one for you.

Task size, choose a value for Task memory (GB) and Task CPU (vCPU).

For each container in your task definition, complete the following steps:

a.   Choose Add container.

b.   Fill out each required field and any optional fields to use in your container definitions.

c.   Choose Add to add your container to the task definition.

Create a Lambda function

  • Create lambda function to call the ECS

In lambda console -> Create function -> Choose Author from scratch -> give function name -> Give runtime -> choose python 3.7 -> Create a new role or if you have existing role choose the role with required permission [ Amazon ECS Full Access, AWS Lambda Basic Execution Role ]

import boto3
import os
import time
 client = boto3.client('ecs')
def lambda_handler(event,context):
	response = client.run_task(
    	cluster='Demo',
    	launchType='FARGATE',
    	taskDefinition='Demo-ubuntu-new',
    	count=1,
    	platformVersion='LATEST',
    	networkConfiguration={
        	'awsvpcConfiguration': {
            	'subnets': ['subnet-f5e959b9','subnet-11713279'],
            	'assignPublicIp': 'ENABLED',
            	'securityGroups': ['sg-0462860d9c60d87d3']
        	},
    	}
	)
	print("this is the response",response)
	task_arn=response['tasks'][0]['taskArn']
	print (task_arn)
	time.sleep(31.5)
	stop_response = client.stop_task(
	cluster='Demo',
	task=task_arn
	)
	print (stop_response)
	return str(stop_response) ]
  • Give the required details such as cluster, launch Type, task Definition, count, platform Version, network Configuration.
  • Applications hosted in ECS Fargate will process the data_process.csv file and out file will be pushed to S3 bucket of output folder.

Create Notification to trigger lambda function(Send Email)

  • To enable the event notifications for an S3 bucket -> open the Amazon S3 console.
  • In the Bucket name list, choose the name of the bucket that you want to enable events for.
  • Choose Properties ->Under Advanced settings, choose Events ->Choose Add notification.
  •  In Name, type a descriptive name for your event configuration.
  • Under Events, select one or more of the type of event occurrences that you want to receive notifications for. When the event occurs a notification is sent to a destination that you choose.
  • Type an object name Prefix and/or a Suffix to filter the event notifications by the prefix and/or suffix.
  •  Select the type of destination to have the event notifications sent to.
  • If you select the Lambda Function destination type, do the following:
  • In Lambda Function, type or choose the name of the Lambda function that you want to receive notifications from Amazon S3 and choose to save.
  • Create a lambda function with Node.js
    • Note: Give bucket name, folder, file name, Email address verified.
[ var aws = require('aws-sdk');
var nodemailer = require('nodemailer');
var ses = new aws.SES({region:'us-east-1'});
var s3 = new aws.S3();
 function getS3File(bucket, key) {
	return new Promise(function (resolve, reject) {
    	s3.getObject(
        	{
            	Bucket: bucket,
            	Key: key
        	},
        	function (err, data) {
            	if (err) return reject(err);
            	else return resolve(data);
        	}
    	);
	})
}
 exports.handler = function (event, context, callback) {
     getS3File('window-demo-1', 'output/result.csv')
    	.then(function (fileData) {
        	var mailOptions = {
            	from: 'arun.kumar@powerupcloud.com',
            	subject: 'File uploaded in S3 succeeded!',
            	html: `<p>You got a contact message from: <b>${event.emailAddress}</b></p>`,
            	to: 'arun.kumar@powerupcloud.com',
            	attachments: [
                	{
                        filename: "result.csv",
                        content: fileData.Body
                	}
            	]
        	};
            console.log('Creating SES transporter');
        	// create Nodemailer SES transporter
        	var transporter = nodemailer.createTransport({
            	SES: ses
        	});
        	// send email
            transporter.sendMail(mailOptions, function (err, info) {
            	if (err) {
                    console.log(err);
                    console.log('Error sending email');
                    callback(err);
            	} else {
                    console.log('Email sent successfully');
                    callback();
            	}
        	});
    	})
    	.catch(function (error) {
        	console.log(error);
            console.log('Error getting attachment from S3');
        	callback(error);
    	});
}; ]

Conclusion:

If you are looking for a serverless orchestrator for your batch processing or on a complex Data Processing pipeline then give a try with AW Step functions and Lambda here we use  ECS Farget to cleanse the data. If your Data processing script is more complex you can integrate with Glue but still Step function will act as your orchestrator. 

Handling Asynchronous Workflow-Driven pipeline with AWS CodePipeline and AWS Lambda

By | AWS, Blogs, Cloud, Cloud Assessment, Data pipeline | No Comments

Written by Praful Tamrakar Senior Cloud Engineer, Powerupcloud Technologies

Most of the AWS customers use AWS lambda widely for performing almost every task, especially its a very handy tool when it comes to customizing the way your pipeline works. If we are talking about pipelines, then AWS Lambda is a service that can be directly integrated with AWS CodePipeline. And the combination of these two services make it possible for AWS customers to successfully automate various tasks, including infrastructure provisioning, blue/green deployments, serverless deployments, AMI baking, database provisioning, and deal with asynchronous behavior.

Problem Statement :

Our customer has a requirement to trigger and monitor the status of the Step Function state machine, which is a long-running asynchronous process. The customer is using the AWS Step Function to run the ETL jobs with the help of AWS Glue jobs and AWS EMR. We proposed to achieve this with Lambda but lambda has a limitation of its timeout i.e. 15 min. Now the real problem is that such an asynchronous process needs to continue and succeed even if it exceeds a fifteen-minute runtime (a limit in Lambda).

Here in this blog we have a solution in which we have figured out how we can solve and automate this approach, with the combination of lambda and AWS CodePipeline with Continuous token.

Assumptions :

This blog assumes you are familiar with AWS CodePipeline and AWS Lambda and know how to create pipelines, functions, Glue jobs and the IAM policies and roles on which they depend.

Pre-requisites:

  1. Glue jobs has already been configured
  2. A StepFunction StateMachine configured to run  Glue Jobs.
  3. CodeCommit repository for Glue scripts

Solution :

In this blog post, we discuss how a CodePipeline action can trigger a Step Functions state machine and how the pipeline and the state machine are kept decoupled through a Lambda function.

The source code for the sample pipeline, pipeline actions, and state machine used in this post is available at https://github.com/powerupcloud/lambdacodepipeline.git.

The below diagram highlights the CodePipeline-StepFunctions integration that will be described in this post. The pipeline contains two stages: a Source stage represented by a CodeCommit Git repository and a DEV stage with CodeCommit, CodeBuild and Invoke Lambda actions that represent the workflow-driven action.

The Steps involved  in the CI/CD pipeline:

  1. Developers commit AWS Glue job’s Code in the SVC (AWS CodeCommit)
  2. The AWS CodePipeline in the Tools Account gets triggered due to step
  3. The Code build steps involve multiple things as mentioned below
    • Installations of dependencies and packages needed
    • Copying the Glue and EMR jobs to S3 location where the Glue jobs will pick the script from.
  4. CHECK_OLD_SFN: The Lambda is invoked to ensure that the Previous Step function execution is not still in a running state before we run the actual Step function. Please find below the process.
    • This action invokes a Lambda function (1).
    • In (2) Lamba Checks the State Machine  Status, which returns a Step Functions State Machine status.
    • In (3) The lambda gets the execution state of the State Machine ( RUNNING || COMPLETED || TIMEOUT )
    • In (4) The Lambda function sends a continuation token back to the pipeline

If The State Machine State is RUNNING in Seconds later, the pipeline invokes the Lambda function again (4), passing the continuation token received. The Lambda function checks the execution state of the state machine and communicates the status to the pipeline. The process is repeated until the state machine execution is complete.

Else (5) Lambda  sends a Job completion token  and completes the pipeline stage.

  1.  TRIGGER_SFN_and_CONTINUE : Invoking Lambda to execute the new Step function execution and Check the status of the new execution. Please find below the process.
    • This action invokes a Lambda function (1) called the State Machine, which, in turn, triggers a Step Functions State Machine to process the request (2).
    • The Lambda function sends a continuation token back to the pipeline (3) to continue its execution later and terminates.
    • Seconds later, the pipeline invokes the Lambda function again (4), passing the continuation token received. The Lambda function checks the execution state of the state machine (5,6) and communicates the status to the pipeline. The process is repeated until the state machine execution is complete.
    • Then the Lambda function notifies the pipeline that the corresponding pipeline action is complete (7). If the state machine has failed, the Lambda function will then fail the pipeline action and stop its execution (7). While running, the state machine triggers various Glue Jobs to perform ETL operations. The state machine and the pipeline are fully decoupled. Their interaction is handled by the Lambda function.
  2. Approval to the Higher Environment. In this stage, we Add a Manual Approval Action to a Pipeline in CodePipeline. Which can be implemented using https://docs.aws.amazon.com/codepipeline/latest/userguide/approvals-action-add.html

Deployment Steps :

Step 1: Create a Pipeline

  1. Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  2. On the Welcome page, Getting started page, or the Pipelines page, choose Create pipeline.
  3. In Choose pipeline settings, in Pipeline name, enter the pipeline name.
  4. In-Service role, do one of the following:
    • Choose a New service role to allow CodePipeline to create a new service role in IAM.
    • Choose the Existing service role to use a service role already created in IAM. In Role name, choose your service role from the list.
  5. Leave the settings under Advanced settings at their defaults, and then choose Next.

6. In the Add source stage, in Source provider, Choose Source Provider as CodeCommit.

7. Provide Repository name and Branch Name

8. In Change detection options Choose AWS CodePipeline

9. In Add build stage,  in Build provider choose AWS CodeBuild, choose the Region

10. Select the existing Project name or Create project

11. You Can add Environment Variables, which you may use in buildspec.yaml file , and click Next

NOTE: The build Step has a very special reason. Here we copy the glue script from SVC (AWS CodeCommit ) to the S3 bucket, from where the Glue job picks its script to execute in its next execution.

12. Add deploy stage, Skip deploy Stage.

13. Now Finally click Create Pipeline.

Step 2: Create the CHECK OLD SFN LAMBDA Lambda Function

  1. Create the execution role
  • Sign in to the AWS Management Console and open the IAM console

Choose Policies, and then choose Create Policy. Choose the JSON tab, and then paste the following policy into the field.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "states:*",
                "codepipeline:PutJobFailureResult",
                "codepipeline:PutJobSuccessResult"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:*",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
  • Choose Review policy.
  • On the Review policy page, in Name, type a name for the policy (for example, CodePipelineLambdaExecPolicy). In Description, enter Enables Lambda to execute code.
  • Choose Create Policy.
  • On the policy dashboard page, choose Roles, and then choose to Create role.
  • On the Create role page, choose AWS service. Choose Lambda, and then choose Next: Permissions.
  • On the Attach permissions policies page, select the checkbox next to CodePipelineLambdaExecPolicy, and then choose Next: Tags. Choose Next: Review.
  • On the Review page, in Role name, enter the name, and then choose to Create role.

2. Create the CHECK_OLD_SFN_LAMBDA Lambda function to use with CodePipeline

  • Open the Lambda console and choose the Create function.
  • On the Create function page, choose Author from scratch. In the Function name, enter a name for your Lambda function (for example, CHECK_OLD_SFN_LAMBDA ) .
  • In Runtime, choose Python 2.7.
  • Under Role, select Choose an existing role. In Existing role, choose your role you created earlier, and then choose the Create function.
  • The detail page for your created function opens.
  • Copy the check_StepFunction.py code into the Function code box
  • In Basic settings, for Timeout, replace the default of 3 seconds with 5 Min.
  • Choose Save.

3. Create the TRIGGER_SFN_and_CONTINUE Lambda function to use with CodePipeline

  • Open the Lambda console and choose the Create function.
  • On the Create function page, choose Author from scratch. In Function name, enter a name for your Lambda function (for example, TRIGGER_SFN_and_CONTINUE ) .
  • In Runtime, choose Python 2.7.
  • Under Role, select Choose an existing role. In Existing role, choose your role you created earlier, and then choose the Create function.
  • The detail page for your created function opens.
  • Copy the trigger_StepFunction.py code into the Function code box
  • In Basic settings, for Timeout, replace the default of 3 seconds with 5 Min.
  • Choose Save.

Step 3: Add the CHECK OLD SFN LAMBDA, Lambda Function to a Pipeline in the CodePipeline Console

In this step, you add a new stage to your pipeline, and then add a Lambda action that calls your function to that stage.

To add stage

  • Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  • On the Welcome page, choose the pipeline you created.
  • On the pipeline view page, choose Edit.
  • On the Edit page, choose + Add stage to add a stage after the Build stage with thaction. Enter a name for the stage (for example, CHECK_OLD_SFN_LAMBDA ), and choose Add stage.
  • Choose + Add action group. In Edit action, in Action name, enter a name for your Lambda action (for example, CHECK_OLD_SFN_LAMBDA ). In Provider, choose AWS Lambda. In Function name, choose or enter the name of your Lambda function (for example, CHECK_OLD_SFN_LAMBDA )
  • In UserParameters, you must provide a JSON string with a parameter: { “stateMachineARN”: “<ARN_OF_STATE_MACHINE>” } EG: 
  • choose Save.

Step 4: Add the TRIGGER_SFN_and_CONTINUE  Lambda Function to a Pipeline in the CodePipeline Console

In this step, you add a new stage to your pipeline, and then add a Lambda action that calls your function to that stage.

To add a stage

  • Sign in to the AWS Management Console and open the CodePipeline console at http://console.aws.amazon.com/codesuite/codepipeline/home.
  • On the Welcome page, choose the pipeline you created.
  • On the pipeline view page, choose Edit.
  • On the Edit page, choose + Add stage to add a stage after the Build stage with thaction. Enter a name for the stage (for example, TRIGGER_SFN_and_CONTINUE ), and choose Add stage.
  • Choose + Add action group. In Edit action, in Action name, enter a name for your Lambda action (for example, TRIGGER_SFN_and_CONTINUE ). In Provider, choose AWS Lambda. In Function name, choose or enter the name of your Lambda function (for example, TRIGGER_SFN_and_CONTINUE )
  • In UserParameters, you must provide a JSON string with a parameter: { “stateMachineARN”: “<ARN_OF_STATE_MACHINE>” }
  • choose Save.

Step 5: Test the Pipeline with the Lambda function

  • To test the function, release the most recent change through the pipeline.
  • To use the console to run the most recent version of an artifact through a pipeline
  • On the pipeline details page, choose Release change. This runs the most recent revision available in each source location specified in a source action through the pipeline.
  • When the Lambda action is complete, choose the Details link to view the log stream for the function in Amazon CloudWatch, including the billed duration of the event. If the function failed, the CloudWatch log provides information about the cause.

Example JSON Event

The following example shows a sample JSON event sent to Lambda by CodePipeline. The structure of this event is similar to the response to the GetJobDetails API, but without the actionTypeId and pipelineContext data types. Two action configuration details, FunctionName and UserParameters, are included in both the JSON event and the response to the GetJobDetails API. The values in green text are examples or explanations, not real values.

{
    "CodePipeline.job": {
        "id": "11111111-abcd-1111-abcd-111111abcdef",
        "accountId": "111111111111",
        "data": {
            "actionConfiguration": {
                "configuration": {
                    "FunctionName": "MyLambdaFunctionForAWSCodePipeline",
                    "UserParameters": "some-input-such-as-a-URL"
                }
            },
            "inputArtifacts": [
                {
                    "location": {
                        "s3Location": {
                            "bucketName": "s3-bucket-name",
                            "objectKey": "for example CodePipelineDemoApplication.zip"
                        },
                        "type": "S3"
                    },
                    "revision": null,
                    "name": "ArtifactName"
                }
            ],
            "outputArtifacts": [],
            "artifactCredentials": {
                "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w
0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvwdasdadasljdajldlakslkdjakjdkaljdaljdasljdaljdalklakkoi9494k3k3owlkeroieowiruwpirpdk3k23j2jk234hjl2343rrszlaEXAMPLE=",
                "accessKeyId": "AKIAIOSFODNN7EXAMPLE"
            },
            "continuationToken": "A continuation token if continuing job",
            "encryptionKey": { 
              "id": "arn:aws:kms:us-west-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab",
              "type": "KMS"
            }
        }
    }
}

Conclusion

In this blog post, we discussed how a Lambda function can be used fully to decouple the pipeline and the state machine and manage their interaction. We also learned how asynchronous processes that need to continue and succeed, even if it exceeds a fifteen-minute runtime (a limit in Lambda) are handled using Continuous Token.

Please Visit our Blogs for more interesting articles.

Transfer Data from Mysql to BigQuery using Data Fusion Pipeline

By | Blogs, Cloud Assessment, data, Data pipeline | One Comment

Written by Anjali Sharma, Software Engineer at Powerupcloud Technologies

What is Cloud Data Fusion Pipeline? –

Cloud Data fusion is an enterprise data integration service provided by Google for quickly building and managing pipelines. A fully managed, cloud-native platform where a source(MySQL) and sink(bigquery) can be connected easily without using any code.

Since it’s a code-free environment anyone can use it easily without having any hindrances of technical skills or coding knowledge.

Cloud Data Fusion is built on the open-source project CDAP, and this open core ensures data pipeline portability for users. CDAP’s broad integration with on-premises and public cloud platforms gives Cloud Data Fusion users the ability to break down silos and deliver insights that were previously inaccessible.

How it does look on Cloud Data Fusion Platform

Why Cloud Data Fusion?

Now the question arises, why do we use cloud data fusion as we have other options to make a connection from MySQL DB to bigquery for ETL/ELT.

Data fusion pipelines provide fully managed, virtual interface, easy to use, fully scalable, fully distributed platform that enables you to connect to many different data sources easily.

Data fusion pipelines have the flexibility to have all the pipelines as code and enable you to use rest API calls to create and trigger pipelines. Hence cloud data fusion is a complete package to develop data pipelines easily and efficiently.

How do we create the data pipeline? –

Creating data fusion pipeline is quite easy on the Google cloud platform, we can get it done by following a few steps-

Step1- Go to GCP console find Cloud data fusion click on ‘Create Instance’.

Step2- Fill the instance name and region name and click on create.

Step3- It will take 10-15 minutes to create an instance, now go to view instance and click on redirect URL.

Step4-  Now you are inside cloud data fusion instance, click on HUB and choose a pipeline(import data from MySQL).

Step5- Along with pipelines in HUB you are getting several options. Choose import data from MySQL. Now we’re going to install Driver.

Step6-  Install Google cloud JDBC driver which will make a connection to let MySQL database communicate bigquery. We can find the driver from here itself but make sure the driver is of the latest version.

Step7-  Now go to Navigation Bar and click on the control center.

Step8-  Go to green encircled plus symbol and upload the latest version of JDBC driver.

Step9- Give a name to the driver and a suitable class name which is invalid format com.example.myclass and click on finish.

Step10- Now again go to HUB, click on import data from MySQL pipeline and click on create. Give a name to the pipeline and finish. Now you are able to customize your pipeline.

Here in the cloud data fusion studio we can change source and sink accordingly as here we need to connect Database(source) to Bigquery(sink).

Step11- Go to database properties and fill the plugin name and types. After filling the details Browse database and click on Add connection.

Step12- Here you will find installed Mysql driver click on it and put connection name, host, port, database name, id, password.

Step13- Test the connection and add it.

Step14- Now you are able to import your query. Deploy the Pipeline.

Step15- You have deployed your data fusion pipeline successfully.

Conclusion

Cloud Data Fusion takes care of most of ETL/ELT works for you. And since it’s part of Google Cloud, you can take advantage of built-in security benefits when using Cloud Data Fusion rather than self-managed CDAP servers:

  • Cloud-native security control with Cloud IAM—Identity management and authentication efforts are taken care of by Cloud Identity.
  • Full observability with Stackdriver Logging and Monitoring—Logs include pipeline logs and audit logs
  • Reduced exposure to the public internet with private networking.

Cloud Data Fusion offers both preconfigured transformations from an OSS library as well as the ability to create an internal library of custom connections and transformations that can be validated, shared, and reused across an organization. It lays the foundation for collaborative data engineering and improves productivity. That means less waiting for data engineers and, importantly, less sweating about code quality.