Category

Data Lake

Serverless Data processing Pipeline with AWS Step Functions and Lambda

By | Blogs, data, Data Lake, Data pipeline | One Comment

Written by Arun Kumar, Associate Cloud Architect at Powerupcloud Technologies

In the traditional ETL world generally, we use our own scripts or any paid tool or Open source Data processing tool or an orchestrator to deploy our data pipeline. If the Data processing pipeline is not complex, if we use these server-based solutions then sometimes it would add additional costs(considering deploying some non-complex pipelines). In AWS we have multiple serverless solutions Lambda and Glue. But lambda has the execution time limitation and Glue is running an EMR cluster in the background, so ultimately it’ll charge you a lot. So we decided to explore AWS Step functions with Lambda which are serverless at the same time as an orchestration service that executes our process on the event bases and terminates the resources post-execution of the process. Let’s see how we can build a data pipeline with this.

Architecture Description:

  1. The Teradata server from on-prem will send the input.csv  file to the S3  bucket(data_processing folder) on schedule bases.
  2. CloudWatch Event Rule will trigger the step function in the case of PutObject in the specified S3 bucket and start processing the input file.
  3. The cleansing script placed on ECS.
  4. AWS Step function will call Lambda Function and it will trigger ECS tasks(a bunch of Python and R script).
  5. Once the cleansing is done the output file will be uploaded to the target S3 bucket.
  6. AWS lambda function will be triggered to get the output file from the target bucket and send it to the respective team.

Create a custom CloudWatch Event Rule for S3 put object operation

Choose Event Pattern -> Service Name -> S3 -> Event Type -> Object level operations -> choose put object -> give the bucket name.

  • In targets choose the step function to be triggered -> give the name of the state machine created.
  • Create a new role or existing role as a cloud watch event requires permission to send events to your step function.
  • Choose one more target to trigger the lambda function -> choose the function which we created before.

AWS Management Console and search for Step Function

  • Create a state machine
  • On the Define state machine page select Author with code snippets.
  • Give a name. Review the State machine definition and visual workflow.
  • Use the graph in the Visual Workflow pane to check that your Amazon States Language code describes your state machine correctly.
  • Create or If you have previously created an IAM role select created an IAM role.

Create an ECS with Fargate

ECS console -> choose create cluster -> choose cluster template -> Networking only -> Next -> Configure -> cluster -> give cluster name -> create

In the navigation pane, choose Task Definitions, Create new Task Definition.

On the Select compatibilities page, select the launch type that your task should use and choose Next step. Choose Fargate launch type.

For Task Definition Name, type a name for your task definition.

For Task Role, choose an IAM role that provides permissions for containers in your task to make calls to AWS API operations on your behalf.

To create an IAM role for your tasks

a.   Open the IAM console.

b.   In the navigation pane, choose Roles, Create New Role.

c.   In the Select Role Type section, for the Amazon Elastic Container Service Task Role service role, choose Select.

d.   In the Attach Policy section, select the policy to use for your tasks and then choose Next Step.

e.   For Role Name, enter a name for your role. Choose Create Role to finish.

Task execution IAM role, either select your task execution role or choose to Create a new role so that the console can create one for you.

Task size, choose a value for Task memory (GB) and Task CPU (vCPU).

For each container in your task definition, complete the following steps:

a.   Choose Add container.

b.   Fill out each required field and any optional fields to use in your container definitions.

c.   Choose Add to add your container to the task definition.

Create a Lambda function

  • Create lambda function to call the ECS

In lambda console -> Create function -> Choose Author from scratch -> give function name -> Give runtime -> choose python 3.7 -> Create a new role or if you have existing role choose the role with required permission [ Amazon ECS Full Access, AWS Lambda Basic Execution Role ]

import boto3
import os
import time
 client = boto3.client('ecs')
def lambda_handler(event,context):
	response = client.run_task(
    	cluster='Demo',
    	launchType='FARGATE',
    	taskDefinition='Demo-ubuntu-new',
    	count=1,
    	platformVersion='LATEST',
    	networkConfiguration={
        	'awsvpcConfiguration': {
            	'subnets': ['subnet-f5e959b9','subnet-11713279'],
            	'assignPublicIp': 'ENABLED',
            	'securityGroups': ['sg-0462860d9c60d87d3']
        	},
    	}
	)
	print("this is the response",response)
	task_arn=response['tasks'][0]['taskArn']
	print (task_arn)
	time.sleep(31.5)
	stop_response = client.stop_task(
	cluster='Demo',
	task=task_arn
	)
	print (stop_response)
	return str(stop_response) ]
  • Give the required details such as cluster, launch Type, task Definition, count, platform Version, network Configuration.
  • Applications hosted in ECS Fargate will process the data_process.csv file and out file will be pushed to S3 bucket of output folder.

Create Notification to trigger lambda function(Send Email)

  • To enable the event notifications for an S3 bucket -> open the Amazon S3 console.
  • In the Bucket name list, choose the name of the bucket that you want to enable events for.
  • Choose Properties ->Under Advanced settings, choose Events ->Choose Add notification.
  •  In Name, type a descriptive name for your event configuration.
  • Under Events, select one or more of the type of event occurrences that you want to receive notifications for. When the event occurs a notification is sent to a destination that you choose.
  • Type an object name Prefix and/or a Suffix to filter the event notifications by the prefix and/or suffix.
  •  Select the type of destination to have the event notifications sent to.
  • If you select the Lambda Function destination type, do the following:
  • In Lambda Function, type or choose the name of the Lambda function that you want to receive notifications from Amazon S3 and choose to save.
  • Create a lambda function with Node.js
    • Note: Give bucket name, folder, file name, Email address verified.
[ var aws = require('aws-sdk');
var nodemailer = require('nodemailer');
var ses = new aws.SES({region:'us-east-1'});
var s3 = new aws.S3();
 function getS3File(bucket, key) {
	return new Promise(function (resolve, reject) {
    	s3.getObject(
        	{
            	Bucket: bucket,
            	Key: key
        	},
        	function (err, data) {
            	if (err) return reject(err);
            	else return resolve(data);
        	}
    	);
	})
}
 exports.handler = function (event, context, callback) {
     getS3File('window-demo-1', 'output/result.csv')
    	.then(function (fileData) {
        	var mailOptions = {
            	from: 'arun.kumar@powerupcloud.com',
            	subject: 'File uploaded in S3 succeeded!',
            	html: `<p>You got a contact message from: <b>${event.emailAddress}</b></p>`,
            	to: 'arun.kumar@powerupcloud.com',
            	attachments: [
                	{
                        filename: "result.csv",
                        content: fileData.Body
                	}
            	]
        	};
            console.log('Creating SES transporter');
        	// create Nodemailer SES transporter
        	var transporter = nodemailer.createTransport({
            	SES: ses
        	});
        	// send email
            transporter.sendMail(mailOptions, function (err, info) {
            	if (err) {
                    console.log(err);
                    console.log('Error sending email');
                    callback(err);
            	} else {
                    console.log('Email sent successfully');
                    callback();
            	}
        	});
    	})
    	.catch(function (error) {
        	console.log(error);
            console.log('Error getting attachment from S3');
        	callback(error);
    	});
}; ]

Conclusion:

If you are looking for a serverless orchestrator for your batch processing or on a complex Data Processing pipeline then give a try with AW Step functions and Lambda here we use  ECS Farget to cleanse the data. If your Data processing script is more complex you can integrate with Glue but still Step function will act as your orchestrator. 

Managed Data Lake on Cloud Improved Driver Notification by 95%

By | Data Case Study, Data Lake | No Comments

Customer: The pioneer of Electric vehicles and related technologies in India.

 

Summary

The customer is the pioneer of Electric vehicles and related technologies in India, involved in designing and manufacturing of compact electric vehicles. The moving a fully managed & scalable infrastructure and configuration shift, resulted in a cost saving of 30%.

About Customer

The customer is the pioneer of Electric Vehicle technology in India. Their mission is to bring tomorrow’s movement today. They have a wide variety of electric vehicles and will be increasing this range even further with products spanning across personal and commercial segments. Their design supports the new paradigm of shared, electric, and connected mobility. Currently, the total number of connected cars is at 7000 and is further expected to grow to 50,000.

Problem Statement

The customer was looking at,

  • A fully managed and scalable Infrastructure set up and configuration on AWS.
  • Application and services migration from current Azure set up to AWS.
  • Setting up of an Extract, Transform, Load (ETL) pipeline for analytics along with managed data lake.
  • Availability and structure of historical data similar to live data for analytics and
  • Framework for near real-time notification.

They were also eyeing at maintaining the reliability of data in Postgres and Cassandra as well as on it’s back up server.

Proposed Solution

All application microservices and MQTT/TCP IoT brokers will be containerized and deployed on AWS Fargate. All latest IoT sensor data will be sent to the AWS environment. IoT Sensor data will be pushed to a Kinesis stream and a Lambda function to query the stream to find the critical data (low battery, door open, etc) and call the notification microservice. Old sensor data to be sent to the Azure environment initially due to existing public IP whitelisting. MQTT bridge and TCP port forwarding to be done to proxy the request from Azure to AWS. Once the old sensors are updated fully cut-over to AWS.

Identity Access Management (IAM) roles to be created to access different AWS services. The network is to be setup using the Virtual Private Cloud (VPC) service with appropriate CIDR range, subnets, and route tables created. Network Address Translation (NAT) gateway is setup to enable internet access for servers in the private subnet and all Docker Images will be stored in Elastic Container Registry (ECR).

AWS Elastic Container Service (ECS), Fargate, is used to run the docker containers to deploy all the container images on the worker nodes. ECS task definitions are configured for each container to be run. In Fargate the control plain and worker nodes are managed by AWS. The scaling, highly available (HA) services, and patching is handled by AWS as well.

Application Load Balancer (ALB) will be deployed as the front end to all the application microservices. ALB will forward the request to the Kong API gateway which in turn will route the requests to the microservices. Service level scaling will be configured in Fargate for more containers to spin up based on load. AWS Elasticache, a managed service with Redis Engine will be deployed across multiple Availability Zone (AZ) for HA, patching, and updates.

Aurora PostgreSQL will be used to host the PostgreSQL database. SQL dump will be taken from Azure PostgreSQL Virtual Machine (VM) and then restored on Aurora. 3 Node Cassandra cluster, of which 2 will be running in one AZ and the remaining ones in the second AZ will be setup for HA. A 3-node ElasticSearch cluster will also be setup using the AWS managed services.

 

 

In the bi-directional notification workflow, TCP and MQTT gateways will be running on EC2 machines and Parser application on a different EC2 instance. AWS Public IP addresses will be whitelisted on the IoT Sensor during manufacturing for the device to securely connect to AWS. The Gateway Server will push the raw data coming from the sensors to a Kinesis Stream. The Parser server will push the converted and processed data to the same or another Kinesis stream.

Lambda function will query the data in the Kinesis stream to find the fault or notification type data and will invoke the notification Microservice/ SNS to notify the customer team. This reduces the current notification time from 6-8 minutes to almost near real-time. The plan is to have Kinesis Firehose as a consumer reading from the Kinesis streams to push processed data to a different S3 bucket. Another Firehose will push the processed data to Cassandra Database and a different S3 bucket.

AWS Glue will be used for data aggregation previously done using Spark jobs to push the data to a separate S3 bucket. Athena will be used to query on the S3 buckets and standard SQL queries work with Athena. Dashboards will be created using Tableau.

 

 

Cloud platform

AWS.

Technologies used

Cassandra, Amazon Kinesis, Amazon Redshift, Amazon Athena, Tableau.

Business benefit

  • The customer can send and receive notifications in real-time & time taken to send notifications to the driver is reduced by 95%. Using AWS, applications can scale on a secure, fault-tolerant, and low-latency global cloud. With the implementation of the CI/CD pipeline, the customer team is no longer spending its valuable time on mundane administrative tasks. Powerup helped the customer achieve its goal of securing data while lowering cloud bills and simplifying compliance.
  • API Gateway proved to be one of the most beneficial services offered by AWS with its wide range of functionalities as it helped Powerup to address customer’s issues.
  • There was a parallel and collaborative effort from the customer and the Powerup team on containerization of the microservices.
  • Data-driven business decisions taken by the customer team helped in easier movement of data and eliminated the repetitive process.
  • 30% – Cost savings with new architecture