Serverless Data processing Pipeline with AWS Step Functions and Lambda

Written by Arun Kumar, Associate Cloud Architect at Powerupcloud Technologies

In the traditional ETL world generally, we use our own scripts or any paid tool or Open source Data processing tool or an orchestrator to deploy our data pipeline. If the Data processing pipeline is not complex, if we use these server-based solutions then sometimes it would add additional costs(considering deploying some non-complex pipelines). In AWS we have multiple serverless solutions Lambda and Glue. But lambda has the execution time limitation and Glue is running an EMR cluster in the background, so ultimately it’ll charge you a lot. So we decided to explore AWS Step functions with Lambda which are serverless at the same time as an orchestration service that executes our process on the event bases and terminates the resources post-execution of the process. Let’s see how we can build a data pipeline with this.

Architecture Description:

  1. The Teradata server from on-prem will send the input.csv  file to the S3  bucket(data_processing folder) on schedule bases.
  2. CloudWatch Event Rule will trigger the step function in the case of PutObject in the specified S3 bucket and start processing the input file.
  3. The cleansing script placed on ECS.
  4. AWS Step function will call Lambda Function and it will trigger ECS tasks(a bunch of Python and R script).
  5. Once the cleansing is done the output file will be uploaded to the target S3 bucket.
  6. AWS lambda function will be triggered to get the output file from the target bucket and send it to the respective team.

Create a custom CloudWatch Event Rule for S3 put object operation

Choose Event Pattern -> Service Name -> S3 -> Event Type -> Object level operations -> choose put object -> give the bucket name.

  • In targets choose the step function to be triggered -> give the name of the state machine created.
  • Create a new role or existing role as a cloud watch event requires permission to send events to your step function.
  • Choose one more target to trigger the lambda function -> choose the function which we created before.

AWS Management Console and search for Step Function

  • Create a state machine
  • On the Define state machine page select Author with code snippets.
  • Give a name. Review the State machine definition and visual workflow.
  • Use the graph in the Visual Workflow pane to check that your Amazon States Language code describes your state machine correctly.
  • Create or If you have previously created an IAM role select created an IAM role.

Create an ECS with Fargate

ECS console -> choose create cluster -> choose cluster template -> Networking only -> Next -> Configure -> cluster -> give cluster name -> create

In the navigation pane, choose Task Definitions, Create new Task Definition.

On the Select compatibilities page, select the launch type that your task should use and choose Next step. Choose Fargate launch type.

For Task Definition Name, type a name for your task definition.

For Task Role, choose an IAM role that provides permissions for containers in your task to make calls to AWS API operations on your behalf.

To create an IAM role for your tasks

a.   Open the IAM console.

b.   In the navigation pane, choose Roles, Create New Role.

c.   In the Select Role Type section, for the Amazon Elastic Container Service Task Role service role, choose Select.

d.   In the Attach Policy section, select the policy to use for your tasks and then choose Next Step.

e.   For Role Name, enter a name for your role. Choose Create Role to finish.

Task execution IAM role, either select your task execution role or choose to Create a new role so that the console can create one for you.

Task size, choose a value for Task memory (GB) and Task CPU (vCPU).

For each container in your task definition, complete the following steps:

a.   Choose Add container.

b.   Fill out each required field and any optional fields to use in your container definitions.

c.   Choose Add to add your container to the task definition.

Create a Lambda function

  • Create lambda function to call the ECS

In lambda console -> Create function -> Choose Author from scratch -> give function name -> Give runtime -> choose python 3.7 -> Create a new role or if you have existing role choose the role with required permission [ Amazon ECS Full Access, AWS Lambda Basic Execution Role ]

import boto3
import os
import time
 client = boto3.client('ecs')
def lambda_handler(event,context):
	response = client.run_task(
    	cluster='Demo',
    	launchType='FARGATE',
    	taskDefinition='Demo-ubuntu-new',
    	count=1,
    	platformVersion='LATEST',
    	networkConfiguration={
        	'awsvpcConfiguration': {
            	'subnets': ['subnet-f5e959b9','subnet-11713279'],
            	'assignPublicIp': 'ENABLED',
            	'securityGroups': ['sg-0462860d9c60d87d3']
        	},
    	}
	)
	print("this is the response",response)
	task_arn=response['tasks'][0]['taskArn']
	print (task_arn)
	time.sleep(31.5)
	stop_response = client.stop_task(
	cluster='Demo',
	task=task_arn
	)
	print (stop_response)
	return str(stop_response) ]
  • Give the required details such as cluster, launch Type, task Definition, count, platform Version, network Configuration.
  • Applications hosted in ECS Fargate will process the data_process.csv file and out file will be pushed to S3 bucket of output folder.

Create Notification to trigger lambda function(Send Email)

  • To enable the event notifications for an S3 bucket -> open the Amazon S3 console.
  • In the Bucket name list, choose the name of the bucket that you want to enable events for.
  • Choose Properties ->Under Advanced settings, choose Events ->Choose Add notification.
  •  In Name, type a descriptive name for your event configuration.
  • Under Events, select one or more of the type of event occurrences that you want to receive notifications for. When the event occurs a notification is sent to a destination that you choose.
  • Type an object name Prefix and/or a Suffix to filter the event notifications by the prefix and/or suffix.
  •  Select the type of destination to have the event notifications sent to.
  • If you select the Lambda Function destination type, do the following:
  • In Lambda Function, type or choose the name of the Lambda function that you want to receive notifications from Amazon S3 and choose to save.
  • Create a lambda function with Node.js
    • Note: Give bucket name, folder, file name, Email address verified.
[ var aws = require('aws-sdk');
var nodemailer = require('nodemailer');
var ses = new aws.SES({region:'us-east-1'});
var s3 = new aws.S3();
 function getS3File(bucket, key) {
	return new Promise(function (resolve, reject) {
    	s3.getObject(
        	{
            	Bucket: bucket,
            	Key: key
        	},
        	function (err, data) {
            	if (err) return reject(err);
            	else return resolve(data);
        	}
    	);
	})
}
 exports.handler = function (event, context, callback) {
     getS3File('window-demo-1', 'output/result.csv')
    	.then(function (fileData) {
        	var mailOptions = {
            	from: 'arun.kumar@powerupcloud.com',
            	subject: 'File uploaded in S3 succeeded!',
            	html: `<p>You got a contact message from: <b>${event.emailAddress}</b></p>`,
            	to: 'arun.kumar@powerupcloud.com',
            	attachments: [
                	{
                        filename: "result.csv",
                        content: fileData.Body
                	}
            	]
        	};
            console.log('Creating SES transporter');
        	// create Nodemailer SES transporter
        	var transporter = nodemailer.createTransport({
            	SES: ses
        	});
        	// send email
            transporter.sendMail(mailOptions, function (err, info) {
            	if (err) {
                    console.log(err);
                    console.log('Error sending email');
                    callback(err);
            	} else {
                    console.log('Email sent successfully');
                    callback();
            	}
        	});
    	})
    	.catch(function (error) {
        	console.log(error);
            console.log('Error getting attachment from S3');
        	callback(error);
    	});
}; ]

Conclusion:

If you are looking for a serverless orchestrator for your batch processing or on a complex Data Processing pipeline then give a try with AW Step functions and Lambda here we use  ECS Farget to cleanse the data. If your Data processing script is more complex you can integrate with Glue but still Step function will act as your orchestrator. 

Join the discussion One Comment

Leave a Reply