Category

data

Serverless Data processing Pipeline with AWS Step Functions and Lambda

By | Blogs, data, Data Lake, Data pipeline | One Comment

Written by Arun Kumar, Associate Cloud Architect at Powerupcloud Technologies

In the traditional ETL world generally, we use our own scripts or any paid tool or Open source Data processing tool or an orchestrator to deploy our data pipeline. If the Data processing pipeline is not complex, if we use these server-based solutions then sometimes it would add additional costs(considering deploying some non-complex pipelines). In AWS we have multiple serverless solutions Lambda and Glue. But lambda has the execution time limitation and Glue is running an EMR cluster in the background, so ultimately it’ll charge you a lot. So we decided to explore AWS Step functions with Lambda which are serverless at the same time as an orchestration service that executes our process on the event bases and terminates the resources post-execution of the process. Let’s see how we can build a data pipeline with this.

Architecture Description:

  1. The Teradata server from on-prem will send the input.csv  file to the S3  bucket(data_processing folder) on schedule bases.
  2. CloudWatch Event Rule will trigger the step function in the case of PutObject in the specified S3 bucket and start processing the input file.
  3. The cleansing script placed on ECS.
  4. AWS Step function will call Lambda Function and it will trigger ECS tasks(a bunch of Python and R script).
  5. Once the cleansing is done the output file will be uploaded to the target S3 bucket.
  6. AWS lambda function will be triggered to get the output file from the target bucket and send it to the respective team.

Create a custom CloudWatch Event Rule for S3 put object operation

Choose Event Pattern -> Service Name -> S3 -> Event Type -> Object level operations -> choose put object -> give the bucket name.

  • In targets choose the step function to be triggered -> give the name of the state machine created.
  • Create a new role or existing role as a cloud watch event requires permission to send events to your step function.
  • Choose one more target to trigger the lambda function -> choose the function which we created before.

AWS Management Console and search for Step Function

  • Create a state machine
  • On the Define state machine page select Author with code snippets.
  • Give a name. Review the State machine definition and visual workflow.
  • Use the graph in the Visual Workflow pane to check that your Amazon States Language code describes your state machine correctly.
  • Create or If you have previously created an IAM role select created an IAM role.

Create an ECS with Fargate

ECS console -> choose create cluster -> choose cluster template -> Networking only -> Next -> Configure -> cluster -> give cluster name -> create

In the navigation pane, choose Task Definitions, Create new Task Definition.

On the Select compatibilities page, select the launch type that your task should use and choose Next step. Choose Fargate launch type.

For Task Definition Name, type a name for your task definition.

For Task Role, choose an IAM role that provides permissions for containers in your task to make calls to AWS API operations on your behalf.

To create an IAM role for your tasks

a.   Open the IAM console.

b.   In the navigation pane, choose Roles, Create New Role.

c.   In the Select Role Type section, for the Amazon Elastic Container Service Task Role service role, choose Select.

d.   In the Attach Policy section, select the policy to use for your tasks and then choose Next Step.

e.   For Role Name, enter a name for your role. Choose Create Role to finish.

Task execution IAM role, either select your task execution role or choose to Create a new role so that the console can create one for you.

Task size, choose a value for Task memory (GB) and Task CPU (vCPU).

For each container in your task definition, complete the following steps:

a.   Choose Add container.

b.   Fill out each required field and any optional fields to use in your container definitions.

c.   Choose Add to add your container to the task definition.

Create a Lambda function

  • Create lambda function to call the ECS

In lambda console -> Create function -> Choose Author from scratch -> give function name -> Give runtime -> choose python 3.7 -> Create a new role or if you have existing role choose the role with required permission [ Amazon ECS Full Access, AWS Lambda Basic Execution Role ]

import boto3
import os
import time
 client = boto3.client('ecs')
def lambda_handler(event,context):
	response = client.run_task(
    	cluster='Demo',
    	launchType='FARGATE',
    	taskDefinition='Demo-ubuntu-new',
    	count=1,
    	platformVersion='LATEST',
    	networkConfiguration={
        	'awsvpcConfiguration': {
            	'subnets': ['subnet-f5e959b9','subnet-11713279'],
            	'assignPublicIp': 'ENABLED',
            	'securityGroups': ['sg-0462860d9c60d87d3']
        	},
    	}
	)
	print("this is the response",response)
	task_arn=response['tasks'][0]['taskArn']
	print (task_arn)
	time.sleep(31.5)
	stop_response = client.stop_task(
	cluster='Demo',
	task=task_arn
	)
	print (stop_response)
	return str(stop_response) ]
  • Give the required details such as cluster, launch Type, task Definition, count, platform Version, network Configuration.
  • Applications hosted in ECS Fargate will process the data_process.csv file and out file will be pushed to S3 bucket of output folder.

Create Notification to trigger lambda function(Send Email)

  • To enable the event notifications for an S3 bucket -> open the Amazon S3 console.
  • In the Bucket name list, choose the name of the bucket that you want to enable events for.
  • Choose Properties ->Under Advanced settings, choose Events ->Choose Add notification.
  •  In Name, type a descriptive name for your event configuration.
  • Under Events, select one or more of the type of event occurrences that you want to receive notifications for. When the event occurs a notification is sent to a destination that you choose.
  • Type an object name Prefix and/or a Suffix to filter the event notifications by the prefix and/or suffix.
  •  Select the type of destination to have the event notifications sent to.
  • If you select the Lambda Function destination type, do the following:
  • In Lambda Function, type or choose the name of the Lambda function that you want to receive notifications from Amazon S3 and choose to save.
  • Create a lambda function with Node.js
    • Note: Give bucket name, folder, file name, Email address verified.
[ var aws = require('aws-sdk');
var nodemailer = require('nodemailer');
var ses = new aws.SES({region:'us-east-1'});
var s3 = new aws.S3();
 function getS3File(bucket, key) {
	return new Promise(function (resolve, reject) {
    	s3.getObject(
        	{
            	Bucket: bucket,
            	Key: key
        	},
        	function (err, data) {
            	if (err) return reject(err);
            	else return resolve(data);
        	}
    	);
	})
}
 exports.handler = function (event, context, callback) {
     getS3File('window-demo-1', 'output/result.csv')
    	.then(function (fileData) {
        	var mailOptions = {
            	from: 'arun.kumar@powerupcloud.com',
            	subject: 'File uploaded in S3 succeeded!',
            	html: `<p>You got a contact message from: <b>${event.emailAddress}</b></p>`,
            	to: 'arun.kumar@powerupcloud.com',
            	attachments: [
                	{
                        filename: "result.csv",
                        content: fileData.Body
                	}
            	]
        	};
            console.log('Creating SES transporter');
        	// create Nodemailer SES transporter
        	var transporter = nodemailer.createTransport({
            	SES: ses
        	});
        	// send email
            transporter.sendMail(mailOptions, function (err, info) {
            	if (err) {
                    console.log(err);
                    console.log('Error sending email');
                    callback(err);
            	} else {
                    console.log('Email sent successfully');
                    callback();
            	}
        	});
    	})
    	.catch(function (error) {
        	console.log(error);
            console.log('Error getting attachment from S3');
        	callback(error);
    	});
}; ]

Conclusion:

If you are looking for a serverless orchestrator for your batch processing or on a complex Data Processing pipeline then give a try with AW Step functions and Lambda here we use  ECS Farget to cleanse the data. If your Data processing script is more complex you can integrate with Glue but still Step function will act as your orchestrator. 

Azure Data Factory – Setting up Self-Hosted IR HA enabled

By | Blogs, data | No Comments

Written by Tejaswee Das, Software Engineer, Powerupcloud Technologies

Introduction

In the world of big data, raw, unorganized data is often stored in relational, non-relational, and other storage systems. However, on its own, raw data doesn’t have the proper context or meaning to provide meaningful insights to analysts, data scientists, or business decision-makers.

Big data requires service that can orchestrate and operationalize processes to refine these enormous stores of raw data into actionable business insights. Azure Data Factory(ADF) is a managed cloud service that’s built for these complex hybrid extract-transform-load (ETL), extract-load-transform (ELT), and data integration projects.

This is how Azure introduces you to ADF. You can refer to the Azure documentation on ADF to know more.

Simply said, ADF is an ETL tool that will help you connect to various data sources to load data, perform transformations as per your business logic, and store them into different types of storages. It is a powerful tool and will help solve a variety of use cases.

In this blog, we will create a self hosted integration runtime (IR) with two nodes for high availability.

Use Case

A reputed client on OTT building an entire Content Management System (CMS) application on Azure having to migrate their old data or historical data from AWS which is hosting their current production environment. That’s when ADFs with self-hosted IRs come to your rescue – we were required  to connect to a different cloud, different VPC, private network, or on-premise data sources.

Our use-case here was to read data from a production AWS RDS MySQL Server inside a private VPC from ADF. To make this happen, we set up a two node self-hosted IR with high availability (HA).

Pre-requisites

  •  Windows Server VMs (Min 2 – Node1 & Node2)
  • .NET Framework 4.6.1 or later
  • For working with Parquet, ORC, and Avro formats you will require 
    • Visual C++ 2010 Redistributable Package (x64)
    • Java

Installation Steps

Step1: Login to the Azure Portal. Go to https://portal.azure.com

Step 2: Search for Data Factory in the Search bar. Click on + Add to create a new Data Factory.

Step 3: Enter a valid name for your ADF.

Note: The name can contain only letters, numbers, and hyphens. The first and last characters must be a letter or number. Spaces are not allowed.

Select the Subscription & Resource Group you want to create this ADF in. It is usually a good practice to enable Git for your ADF. Apart from being able to  store all your code safely, this also helps you when you have to migrate your ADF to a production subscription. You can get all your pipelines on the go.

Step 4: Click Create

You will need to wait for a few minutes, till your deployment is complete. If you get any error messages here, check your Subscription & Permission level to make sure you have the required permissions to create data factories.

Click on Go to resource

Step 5:

Click on Author & Monitor

Next, click on the Pencil button on the left side panel

Step 6: Click on Connections

Step 7: Under Connections tab, click on Integration runtimes, click on + New to create a new IR

Step 8: On clicking New, you will be taken to the IR set-up wizard.

Select Azure, Self-Hosted and click on Continue

Step 9: Select Self-Hosted  and Continue

Step 10: Enter a valid name for your IR, and click Create

Note: Integration runtime Name can contain only letters, numbers and the dash (-) character. The first and last characters must be a letter or number. Every dash (-) character must be immediately preceded and followed by a letter or a number. Consecutive dashes are not permitted in integration runtime names.

Step 11:

On clicking Create, your IR will be created.

Next you will need to install the IRs in your Windows VMs. At this point you should login to your VM (Node1) or wherever you want to install your

You are provided with two options for installation :

  • Express Setup – This is the easiest way to install and configure your IRs.  We are following the Express Setup in this setup. Connect to your Windows Server where you want to install.

Login to Azure Portal in your browser (inside your VM) → Data Factory →  select your ADF → Connections → Integration Runtimes →  integrationRuntime1 → Click Express Setup → Click on the link to download setup files.

  • Manual Setup – You can download the integration runtime and add the authentication keys to validate your installation.

Step 12: Express Setup

Click on the downloaded file.

On clicking on the downloaded file, your installation will start automatically.

Step 13:

Once the installation and authentication is successfully completed, go to the Start Menu → Microsoft Integration Runtime → Microsoft Integration Runtime

Step 14: You will need to wait till your node is able to connect to the cloud service. If for any reason, you get any error at this step, you can troubleshoot by referring to self hosted integration runtime troubleshoot guide

Step 15: High availability 

One node setup is complete. For high availability, we will need to set up at least 2 nodes. An IR can have a max of 4 nodes.

Note: Before setting up other nodes, you need to enable remote access. To enable remote access, you need to make sure you are doing it in your very first node, i.e, you have a single node when you are doing this configuration, you might face issues with connectivity later if you forget this step.

Go to Settings tab and  Click on Change under Remote access from intranet

Step 16:

Select Enable without TLS/SSL certificate (Basic) for dev/test purpose, or use TLS/SSL for a more secured connection.

You can select a different TCP port – else use the default 8060

Step 17:

Click on OK. Your IR will need to be restarted for this change to be effected. Click OK again.

You will notice remote access enabled for your node.

Step 18:

Login to your other VM (Node2). Repeat Steps 11 to 17. At this point you will probably get a Connection Limited message stating your nodes are not able to connect to each other. Guess why? We will need to enable inbound access to port 8060 for both nodes.

Go to Azure Portal → Virtual Machines → Select your VM (Node1) → Networking.

Click on Add inbound port rule

Step 19:

Select Source → IP Addresses → Set Source IP as the IP of your Node2. Node2 will need to connect to Port 8060 of Node 1. Click Add

Node1 IP – 10.0.0.1 & Node2 IP – 10.0.0.2. You can use either of private or public IP addresses.

We will need to do a similar exercise for Node2.

Go to the VM page of Node2 and add Inbound rule for Port 8060. Node1 & Node2 need to be able to communicate with each other via port 8060.

Step 20:

If you go to your IR inside your Node1 and Node2, you will see the green tick implying your nodes are successfully connected to each other and also to the cloud. You can wait for some time for this sync to happen. If for some reason, you get an error at this step, you can view integration runtime logs from Windows Event Viewer to further troubleshoot. Restart both of your nodes.

To verify this connection, you can also check in the ADF Console.

Go to your Data Factory → Monitor (Watch symbol on the left panel, below Pencil symbol – Check Step 5) → Integration runtimes

Here you can see the number of registered nodes and their resource utilization. The HIGH AVAILABILITY ENABLED featured is turned ON now.

Step 21: Test Database connectivity from your Node

If you want to test database connectivity from your Node, make sure you have whitelisted the Public IP of your Node at the Database Server inbound security rules.

For e.g, if your Node1 has an IP address 66.666.66.66 and needs to connect to an AWS RDS MySQL Server. Go to your RDS security group and add Inbound rules of your MySQL Port for this IP.

To test this. Login to your Node1 → Start → Microsoft Integration Runtime → Diagnostics → Add your RDS connection details → Click on Test

Conclusion

This brings you to the end of successfully setting up a self-hosted IR with high availability enabled.

Hope this was informative. Do leave your comments below. Thanks for reading.

References

Transfer Data from Mysql to BigQuery using Data Fusion Pipeline

By | Blogs, Cloud Assessment, data, Data pipeline | One Comment

Written by Anjali Sharma, Software Engineer at Powerupcloud Technologies

What is Cloud Data Fusion Pipeline? –

Cloud Data fusion is an enterprise data integration service provided by Google for quickly building and managing pipelines. A fully managed, cloud-native platform where a source(MySQL) and sink(bigquery) can be connected easily without using any code.

Since it’s a code-free environment anyone can use it easily without having any hindrances of technical skills or coding knowledge.

Cloud Data Fusion is built on the open-source project CDAP, and this open core ensures data pipeline portability for users. CDAP’s broad integration with on-premises and public cloud platforms gives Cloud Data Fusion users the ability to break down silos and deliver insights that were previously inaccessible.

How it does look on Cloud Data Fusion Platform

Why Cloud Data Fusion?

Now the question arises, why do we use cloud data fusion as we have other options to make a connection from MySQL DB to bigquery for ETL/ELT.

Data fusion pipelines provide fully managed, virtual interface, easy to use, fully scalable, fully distributed platform that enables you to connect to many different data sources easily.

Data fusion pipelines have the flexibility to have all the pipelines as code and enable you to use rest API calls to create and trigger pipelines. Hence cloud data fusion is a complete package to develop data pipelines easily and efficiently.

How do we create the data pipeline? –

Creating data fusion pipeline is quite easy on the Google cloud platform, we can get it done by following a few steps-

Step1- Go to GCP console find Cloud data fusion click on ‘Create Instance’.

Step2- Fill the instance name and region name and click on create.

Step3- It will take 10-15 minutes to create an instance, now go to view instance and click on redirect URL.

Step4-  Now you are inside cloud data fusion instance, click on HUB and choose a pipeline(import data from MySQL).

Step5- Along with pipelines in HUB you are getting several options. Choose import data from MySQL. Now we’re going to install Driver.

Step6-  Install Google cloud JDBC driver which will make a connection to let MySQL database communicate bigquery. We can find the driver from here itself but make sure the driver is of the latest version.

Step7-  Now go to Navigation Bar and click on the control center.

Step8-  Go to green encircled plus symbol and upload the latest version of JDBC driver.

Step9- Give a name to the driver and a suitable class name which is invalid format com.example.myclass and click on finish.

Step10- Now again go to HUB, click on import data from MySQL pipeline and click on create. Give a name to the pipeline and finish. Now you are able to customize your pipeline.

Here in the cloud data fusion studio we can change source and sink accordingly as here we need to connect Database(source) to Bigquery(sink).

Step11- Go to database properties and fill the plugin name and types. After filling the details Browse database and click on Add connection.

Step12- Here you will find installed Mysql driver click on it and put connection name, host, port, database name, id, password.

Step13- Test the connection and add it.

Step14- Now you are able to import your query. Deploy the Pipeline.

Step15- You have deployed your data fusion pipeline successfully.

Conclusion

Cloud Data Fusion takes care of most of ETL/ELT works for you. And since it’s part of Google Cloud, you can take advantage of built-in security benefits when using Cloud Data Fusion rather than self-managed CDAP servers:

  • Cloud-native security control with Cloud IAM—Identity management and authentication efforts are taken care of by Cloud Identity.
  • Full observability with Stackdriver Logging and Monitoring—Logs include pipeline logs and audit logs
  • Reduced exposure to the public internet with private networking.

Cloud Data Fusion offers both preconfigured transformations from an OSS library as well as the ability to create an internal library of custom connections and transformations that can be validated, shared, and reused across an organization. It lays the foundation for collaborative data engineering and improves productivity. That means less waiting for data engineers and, importantly, less sweating about code quality.

How to Convert Historical Data into Parquet Format with Date Partitioning

By | Blogs, data | No Comments

Written by: Nagarjun K, Software engineer at powerupcloud technologies

Given the cloud imperative, a lot of organizations migrate their workloads from on-prem/cloud to AWS. However, while migrating old data into AWS S3, organizations find it hard to enable date-based partitioning. Given the inability to retrospectively implement this feature, organizations usually end-up with disparate storage sources within their AWS environment. The blog equips you with some best practices on implementing date-based partitioning in historical data, as well as, provides key guidelines to convert CSV/Json files to Parquet format before migrating your data.

Source: aws.amazon.com

Source: aws.amazon.com

It is common knowledge that Parquet file format is desirable because of size and cost benefits. Hence a recommended approach  for converting old data to Parquet format is crucial from a migration success point of view. To enable this, organizations often explore AWS EMR and DataProc clusters. However, these approaches introduce other challenges such as large cluster size and associated cost for running the clusters. Therefore, a solution that can address these concerns and also rid the organization from cluster administrative chores is deeply valuable. For these reasons, AWS Glue seems to be a prudent choice. Below is the list on interchangeable format conversions supported by Glue:

  • CSV
  • JSON
  • Parquet
  • Avro
  • ORC

Why Parquet?

Data is usually constrained by storage, which has a bearing on costing aspects. Correspondingly, Parquet is a columnar file format and allows unparalleled storage optimization due to its size benefits. Additionally, there are a great deal of options available in the market for compression and encoding of Parquet files. Date warehousing services such as BigQuery and Snowflake support Parquet file format, enabling granular control on performance and cost.

Why Partition?

As discussed above, partitioning files on the basis of date directly confines the amount of data that needs to be processed and, therefore, allows read-optimization. While unpartitioned data can also be queried, the antiquated approach introduces performance and cost inefficiencies. In essence, partitioning helps optimize data that needs to be scanned by the user, enabling higher performance throughputs.

Steps to convert the files into Parquet

 

Step 1: Extract of Old Data

As first steps, extract historical data from the source database along with with headers in CSV format. To enable better readability of data, you may also use Pipe separator(). After structuring the data with Pipe separator, store the CSV file in S3 bucket.

Step 2: Creating Crawlers for Fetching File Meta-data

With the purpose of identifying the schema of CSV files, you need to create and run Crawlers. Find the steps below:

  • Go to AWS Glue home page.
  • After selecting Crawlers section , click “Add crawler”
  • Name your crawler.

  • Select the path of your CSV folder in S3 (Do not select specific CSV files). As a prerequisite, create a folder that includes all your CSV files.

  • As demonstrated below, we give a path name instead of selecting the filename s3://Bucketname/foldername

  • You may add additional data sources, else click “NO”
  • Since the crawlers need both read and write access in order to read the source file and write the parquet file back to S3, you need to create an IAM that allows both read and write access.

  • Set up the crawler as Run as On Demand

  • Enter the database name to create a table schema for the CSV file

Step 3: Running the Crawler

After you successfully create the crawlers, click “Run it Now” and wait for a few minutes. Shortly you will see a new table that has the same schema as your CSV file in the Data Catalog section

  • Here, we see the csv file table created by the crawler

Step 4: Adding the partition columns to Historical data using Athena

  • Once the table is created by the crawler open athena and click “Run query”.

  • As illustrated in the figure below, the Date Column is in yyyy/mm/dd As part of the partitioning procedure, you can separate columns for year, month and day by running the partitioning query:

Step 5: Running ETL for converting to Parquet format

  • Select ETL Section, go to Jobs and click “Add Job”
  • Name your job and select the IAM role(select the role you created in the earlier step).

  • Select the data source created by the crawler

  • Choose your data target as s3

  • The next screen allows column mapping. If you need to remap or remove any column from CSV, you may modify it from this screen.

  • The following screen shows you the Diagram and source code for the job. As a next step, add PartitionKey and mention the column name for year,month and day to enable partition in that order. See example  below: “partitionKeys”:[“year”,”month”,”day”]

  • Save the changes and click “Run Job” button. Standby for a few mins( based on your total data size) to allow the job to complete. You can see the logs from the bottom.

Step6 : Verifying the files in S3.

Go to s3 bucket where you have saved the parquet file.  You will see that there new folders structured in year–month–date format.

Conclusion:

As organizations continue to move workloads on the cloud, there will be considerable increase  in volume, velocity and variety of data. In order to maintain a healthy trade off between cost and performance, measures such as converting to Parquet format and date-based partitioning can help organizations manage their data requirements with more effectively.

Enabling the leadership of a large OTT access business data just by asking for it

By | Alexa, AWS, Blogs, data | No Comments

Written By: Kartikeya Sinha, Lead Data Architect, Powerupcloud & Siva S, CEO, Powerupcloud Technologies

Just imagine the work-life of a Chief Executive or someone from the senior leadership team of a company. You would see them getting into meetings after meetings. They always seem to be thinking about something. To make better business decisions, they need to understand their business data. In their super busy schedule, it often turns out to be cumbersome for them to navigate through complex Business Intelligence (BI) dashboards and tens & hundreds of reports to find the metrics they need.

With the introduction of Natural Language Processing (NLP) APIs from leading pubic cloud providers like AWS, Azure & Google, we have started receiving a lot of requirements around integrating these NLP APIs with BI dashboards so that the senior business executives can simply ask for specific data and hear them out instantly.

One such case is discussed in this blog post.

 

Problem Statement

One of our customers is a large video streaming company. They collect several metrics including video streaming, customer behaviour, application usage, network usage, etc. But these metrics were distributed across several software used by them for video streaming including the likes of Mixpanel, Youbora, Appsee, etc. The customer had the following requirements:

 

  1. Build a data lake so that all data can be accessed from one centralized location
  2. Build ML engines for prediction, correlation of the app data
  3. Build a highly responsive and graphically rich reporting dashboard
  4. Enable NLP to search metrics using voice or text query

In this blog, we will be covering the custom reporting dashboard and NLP integration modules.

 

Data Lake Solution

Powerupcloud’s data team built a data lake using Amazon Redshift, Amazon S3 to support the data analysis processes. The data was loaded to Amazon S3 by Talend jobs. An ETL job converts the raw data files to readable CSV files and pushes to a target bucket. This allows the data to be queried either by Redshift Spectrum or Athena directly from Amazon S3 and this brings down the data storage costs quite a bit.

Below is a high-level architecture diagram without the Redshift Spectrum or Athena component.

 

 

Tech Stack

– Amazon Redshift as DWH.

– Amazon Lex to do NLP on the query text and extract intent and slot values.

– Elasticbeanstalk based Query processing engine written in Python3

– Webkit Speech Recognition API to convert speech to text.

– Elasticbeanstalk to host the BI dashboard

– Tech stack for the BI dashboard — Bootstrap, jQuery, Morris.js charts

 

Rich Reporting Dashboard

Once the data lake was implemented, we were faced with the next big problem-how can you integrate NLP into a BI platform? We tried several out-of-the-box BI platforms like Redash, PowerBI, etc. But integrating a browser-based voice-to-text converter was a challenge. So we decided to go with Google Web Kit and a custom reporting dashboard.

As the customer needed a rich UI, we chose morris.js charts running on a bootstrap theme. Morris.js allowed us to have rich colours and graphics in the graphs while the bootstrap theme helped in a high level of customization.

 

 

Integrating Amazon Lex

This architecture gives you a flow of data from the browser to Redshift.

The queries generated by Google Webkit is passed to Amazon NLP for intents and associated slots. Once the slots are identified, the parameters are passed to the Query Processing API which queries the Redshift for relevant data. This data is then presented through the custom reports built.

 

How does the solution work?

 

  1. Click on the ‘mic’ icon and ask your query.
  2. The BI tool does the speech to text conversion using Webkit Speech API.
  3. The text query is then sent to a Query Processing engine.
  4. Query processing engine sends a request to Amazon Lex for extracting intent and slot values from the query.
  5. Amazon Lex responds back with the intent name and slot values.
  6. Query processing engine uses the intent name and slot values to form a SQL query to the backend DWH-Amazon Redshift.
  7. Using the result of the query from Redshift, the query processing engine forms a response back to the frontend dashboard (BI).
  8. The frontend (BI) dashboard uses the response data to plot the graph/display it in the table.

 

Training Amazon Lex

The utterances are trained as below. Please note that the more utterances you train, the smarter the engine gets. The slots can be added as per the reports built in the dashboard. In this example, we chose ‘DeviceOS’, ‘GraphType’ and ‘# of days’ as the slots that are needed to be supplied from the customer’s query.

 

 

 

Challenges Faced

 

  1. Webkit Speech API does a pretty good job of converting speech to text. However, it works only on Google Chrome browser. Firefox has recently launched support for speech recognition, but that is still in very nascent stage.
  2. Although the ideal situation would be that you ask any meaningful query to the BI tool and it should be able to answer it. However, in order to do that Query processing engine needs to be really super smart to form dynamic SQL queries based on the user query. We have not yet achieved that and are evolving the Query processing engine to handle as many queries as possible without a need for modification.

 

Voice-Based BI Engine in Action

The voice search can pull reports based on 3 inputs,

 

  • Metrics-Visitors or Viewers or Video Views
  • Devices-iOS or Android or TV or PWA
  • Time-Last X days
  • Sample Query: Can you show me the number of visitors from iOS for the last 10 days?
  • Note: Voice search for terms like ‘Video Views’ and ‘PWA’ might be a little difficult for Lex to comprehend. Text search works better.

Hope this read was insightful. The future is voice-based platforms, be it apps, reports, customer service, etc.

If you would like to know more details on this project or if you want us to build something similar for you, please write to us at data@powerupcloud.com.

 

AWS Autoscaling Based On Database Query Custom Metrics

By | AWS, Blogs, data | No Comments

Written by Priyanka Sharma, Senior Cloud Engineer at Powerupcloud Technologies.

Did you ever run into a situation where the generic autoscaling triggers available like Avg CPU, Network IO, custom metrics memory, etc are not enough to rightly decide whether you should scale up or scale down? We ran into this problem for a client with a very specific workload and we had to find a way to scale up and down based on the result of a SQL query on the target database?

So how did we go about achieving this? Custom metrics again, of course. We created a lambda function to poll the database at regular intervals and the results were shipped to CloudWatch as custom metrics. This custom metric was used to trigger scale up and scale down events. What follows is a detailed step by step for configuring this with a generic SELECT COUNT(*) SQL Query on an MYSQL database. Depending on your needs, you can make changes to what gets polled for custom metrics.

Prerequisites

DB Instance

The Instance for which you want to put count value to Cloudwatch. 
We have an RDS Instance available in private subnet with publicly accessible set to No

The RDS instance has a database named powerdb which has an employeestable. The employee’s table has two columns of type INT – id and count. The count value is being added to the cloudwatch metric through Lambda Function. Like I said already, this is a simplistic example. Your database query in real life might be more complex than a count(*)

AutoScaling Group

The autoscaling group which will scale with the Cloudwatch Metric Data. We have one available with initially one server in running state and no scaling policy assigned.

Lambda Execution IAM Role

Create a lambda_basic_execution role with the following inline policy attached

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "cloudwatch:PutMetricData", "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface" ], "Resource": "*" } ] }

This policy will allow you to create a lambda function within a VPC.

Creating The Lambda Function

We have posted the lambda function code on this public repo. Few snippets from the code are explained below:

We have used mysql nodejs module to connect the function to the Mysql RDS Instance. If you are using a different database, there is nothing stopping you from connecting as long as the node has a connector. Provide the database credentials in the code as shown below.

var connection = mysql.createConnection({ host : 'xxxxxx.us-east-1.rds.amazonaws.com', user : 'puc', password : 'xxxxxx', database : 'powerdb' }); connection.connect();

Next, we are querying the database and getting the value that needs to be put in the Cloudwatch. This is the place to change the query to your liking. If your database supports NOLOCK hints etc, you might want to add them too to avoid locking.

connection.query('SELECT count from employees where id=1', function(err, rows, fields) { if (err) throw err; console.log('The count is: ', rows[0].count); value = rows[0].count putCloudWatchMetric('DBCount', value); });

Once we get the count value in a variable ‘value’, we are calling a function putCloudWatchMetric() that we defined as

function putCloudWatchMetric(metricName, count){ cw.putMetricData({ Namespace: 'DBData', MetricData: [{ 'MetricName': metricName, 'Unit': 'Count', 'Value': count }]}, function(err, data) { if (err) console.log(err, err.stack); // an error occurred else console.log(data); // successful response } ) }

Create this lambda function in the same VPC as RDS. Ensure to use private subnets for this function.

Also, for the lambda function to access the RDS, open the DB instance port in its security group to the security group used by the lambda function.

We have scheduled this lambda function to trigger every 15 mins. You may change this frequency as well if you need faster scaleup and down events.

Creating CloudWatch Alarms From Polled Query Results

Once the lambda function triggered successfully, go to Cloudwatch console and see the graph for the DB Count.

Create an alarm for this count graph.

We have created an alarm for count value greater than 10, this is for scale-up. Create one more Cloudwatch alarm for Scale down policy when the count value is less than 10.

Creating Scaling Policies

Go to Autoscaling Console and create a scale-up policy with alarm count_alarm. So whenever the count value will be greater than 10 for 300 seconds, it will set the instance count to 2.

Create a scale down policy too with other alarm count_alarm01. So, whenever the count value will be less than 10 for 300 seconds, it will set the instance count to 1.

Once we create both policies, we will have scale up and scale down policies as shown below in the screenshot.

Verify Autoscaling

Connect to the Mysql Database and update the count value to 14 (greater than 10).

We have scheduled the lambda function to trigger every 15 mins. So, the count value will get updated in the Cloudwatch Metric every 15 mins. We can see the count graph in Cloudwatch.

Due to the Scale-Up policy, the auto-scaling will happen and we can see the updated instance count to 2.

Next, we can verify the scale down policy too by updating the count value again to less than 10.

Once the lambda function will trigger, we can see the updated count value in the Cloudwatch graph.

Autoscaling will happen and the instance count will scale down to 1

Also, you can set the DBCount range in Autoscaling policies.

Next, we have updated the count value to 25.

As per the policy, the autoscaling will set the instance count to 3.

Hope you found this useful.

Happy scaling! 🙂

Originally published at blog.powerupcloud.com on January 27, 2017.


Export/Import Data on Amazon Oracle RDS using Data Pump utility

By | AWS, Blogs, data | No Comments

Written by Selvakumar K, Associate Tech Lead at Powerupcloud

It has been a long time plan to test the Data Pump Scenario between Oracle RDS instances

Problem Statement 

On a daily basis, the data is been restored between Oracle RDS instances. (i.e)  making the copy of particular production schema to dev Oracle RDS instances 

Steps to implement Backup / Restore 

1. Take backup of  the Oracle schema from source RDS into data_pump_dir as master user 

DECLARE

Bkp NUMBER;

BEGIN

Bkp := DBMS_DATAPUMP.OPEN( operation => ‘EXPORT’, job_mode => ‘SCHEMA’, job_name=>null);

DBMS_DATAPUMP.ADD_FILE( handle => Bkp, filename => ‘rdsdumpsrc1.dmp’, directory => ‘DATA_PUMP_DIR’, filetype => dbms_datapump.ku$_file_type_dump_file,reusefile => 1);

DBMS_DATAPUMP.ADD_FILE( handle => Bkp, filename => ‘rdsdumpsrc1.log’, directory => ‘DATA_PUMP_DIR’, filetype => dbms_datapump.ku$_file_type_log_file);

DBMS_DATAPUMP.METADATA_FILTER(hdnl,’SCHEMA_EXPR’,’IN (”rdsdumpsrc”)’);

DBMS_DATAPUMP.START_JOB(Bkp);

END;

/

Source Schema Name: rdsdumpsrc

Once the backup is completed , please use the below queries to verify its in data_pump_dir directory 

select * from table(RDSADMIN.RDS_FILE_UTIL.LISTDIR(‘DATA_PUMP_DIR’)) order by mtime;

2. Create Database Link to Target Instance 

create database link to_target_transfer connect to admin identified by admin123 using 

‘(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=selvadump.cv7jsqukf31b.us-east-1.rds.amazonaws.com)(PORT=1521))(CONNECT_DATA=(SID=ORCL)))’;

Target Instance : selvadump.cv7jsqukf31b.us-east-1.rds.amazonaws.com

Target Instance Port : 1521

Target Instance SID : ORCL

3. Transfer the Exported Dump into target instance Data_Pump_Dir

Using DBMS_FILE_TRANSFER utility , copy and transfer the file to target instance on destination directory 

BEGIN

DBMS_FILE_TRANSFER.PUT_FILE(

source_directory_object       => ‘DATA_PUMP_DIR’,

source_file_name              => ‘rdsdumpsrc1.dmp’,

destination_directory_object  => ‘DATA_PUMP_DIR’,

destination_file_name         => ‘rdsdumpsrc_copied1.dmp’, 

destination_database          => ‘to_target_transfer’ 

);

END;

Using Step 1 command verify the file is copied to target instance

4. Import the schema into the target instance 

Restored the source schema into different schema using DBMS_DATAPUMP.METADATA_REMAP , If required use more data pump utilities to restore into different tablespace as well 

DECLARE

restre NUMBER;

BEGIN

restre := DBMS_DATAPUMP.OPEN( operation => ‘IMPORT’, job_mode => ‘SCHEMA’, job_name=>null);

DBMS_DATAPUMP.METADATA_REMAP(restore,’REMAP_SCHEMA’,’RDSDUMPSRC’,’rdsdumpsrc_copied3′);

DBMS_DATAPUMP.START_JOB(restre);

END;

/

We can use a python script to automate this process , use the below script to backup and restore the oracle schema to target instance

Conclusion:

Data pump is the most convenient way to do frequent backup and restore oracle schema between prod, dev environments for data refresh purpose


DynamoDB attributes Batch and Bulk Update

By | AWS, data | No Comments

Written by Selvakumar K, Associate Tech Lead — DBA & Kavitha L, Software Developer at Powerupcloud Technologies.

It’s been a couple of weeks, me and my colleague were struggling to get the work together to put learning and solutions for help.

In the beginning, we have written scripts which fortunately worked for Dev and QA Dynamodb Environment but if we look for the real-time scenarios where there could be numerous (say 3 crores) records in the Dynamodb table, the solution would not work. After some days of research, we accomplish a solution using python.

Problem Statement

Retrieve the primary key from the Dynamodb table for the particular policy number and update the dependent items in the excel sheet.

Problems and Limitations in DynamoDB

  1. Dynamodb Read and Write capacity is limited to 20, so we have changed unlimited the provisioned capacity
  2. To perform an update in one shot it’s difficult in case of huge data size. (e.g Compare the policy number from an excel sheet with Dynamodb table). The BatchGetItem operation can retrieve a maximum of 100 items at a time. The total size of all the items retrieved cannot exceed 16 MB
  3. Batch wise update consumes more memory resources so we have increased instance type and updated the items

How does it work?

First, we would read the excel data and convert into the python dictionary.

with open(‘QA.csv’, ‘r’) as f:

reader = csv.reader(f)

your_list = list(reader)

list_1=[]

dict1={}

#convert all value to list

for i in range(1, len(your_list)):

dict1[your_list[0][0]]=your_list[i][0]

dict1[your_list[0][1]]=your_list[i][1]

dict1[your_list[0][2]]=your_list[i][2]

dict1[your_list[0][3]]=your_list[i][3]

list_1.append(dict1)

dict1={}

In the above scenario, each ID has multiple policy information and we are fetching single policy ID from the excel sheet and storing in memory.

If we have more than one policy information we would need to separate and retrieve the policy ID for the update.

Before we begin a comparison of policy number with Dynamodb table, establish the connectivity with DynamoDB.

dynamodb=boto3.resource(‘dynamodb’,region_name=’us-east-1′,aws_access_key_id=’AAAAAAAAAAAAAA’, aws_secret_access_key=’SSSSSSSSSSSSSS’)

table = dynamodb.Table(‘testclaim’)

response = table.scan()

response_text = response[‘Items’]

Comparing the policy number from excel and DynamoDB table to fetch the ID of the DynamoDB table.

Finally, update the records in two batches. First for the ID’s which has more than one policy information and than ID’s which has one policy information.


response=table.update_item(

Key={

‘id’:i[0],

},

UpdateExpression=”SET claims= :val1″,

ExpressionAttributeValues={

‘:val1’: val1

},

ReturnValues=”UPDATED_NEW”

print(“\nUpdated — — — — — — — — — -\n”)

Conclusion

Frequent Modification of data is very important and it’s required for customer business. Python is a convenient program to automate the update tasks in an easy way. In the above experiment, we have compared two different items and updated the records in the DynamoDB table.