Copying objects using AWS Lambda based on S3 events – Part 2 – date partition

By April 21, 2020 May 18th, 2020 AWS, Blogs, Cloud

Written by Tejaswee Das, Software Engineer, Powerupcloud Technologies

Introduction

If you are here from the first of this series on S3 events with AWS Lambda, you can find some complex S3 object keys that we will be handling here.

If you are new here, you would like to visit the first part – which is more into the basics & steps in creating your Lambda function and configuring S3 event triggers.

You can find link to part 1 here :

Use Case

This is a similar use case where we try Copying new files to a different location(bucket/path) while preserving the hierarchy, plus we will partition the files according to their file names and store them in a date-partitioned structure.

Problem Statement

Our Tech Lead suggested a change in the application logic, so now the same application is writing files to  S3 bucket in a different fashion. The activity file for Ravi Bharti is written to source-bucket-006/RaviRanjanKumarBharti/20200406-1436246999.parquet.

Haha! Say our Manager wants to check activity files of Ravi Bharti date-wise, hour-wise, minute-wise, and.. no not seconds, we can skip that!

 So we need to store them in our destination bucket  as:

  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/20200406-1436246999.parquet — Date wise
  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/14/20200406-1436246999.parquet — Hour wise
  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/14/36/20200406-1436246999.parquet — Hour/Min wise

Tree:

source-bucket-006
| - AjayMuralidhar
| - GopinathP
| - IshitaSaha
| - RachanaSharma
| - RaviRanjanKumarBharti
		| - 20200406-143624699.parquet
| - Sagar Gupta
| - SiddhantPathak

Solution

Our problem is not that complex, just a good quick play with split & join of strings should solve it. You can choose any programming language for this. But we are continuing using Python & AWS Python SDK – boto3.

Python Script

Everything remains the same, we will just need to change our script as per our sub-requirements. We will make use of the event dictionary to get the file name & path of the uploaded object.

source_bucket_name = event['Records'][0]['s3']['bucket']['name']

file_key_name = event['Records'][0]['s3']['object']['key']
  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/20200406-1436246999.parquet

Format: source_file_path/YYYY-MM-DD/file.parquet

You can be lazy to do

file_key_name = “RaviRanjanKumarBharti/20200406-1436246999.parquet”

Splitting file_key_name with ‘/’ to extract Employee (folder name) & filename

file_root_dir_struct = file_key_name.split(‘/’)[0]

date_file_path_struct = file_key_name.split(‘/’)[1]

Splitting filename with ‘-’ to extract date & time

date_file_path_struct = file_key_name.split(‘/’)[1].split(‘-‘)[0]

Since we know the string will be always the same, we will concat it as per the position

YYYY		  - 		MM		-	DD
String[:4] - string[4:6] - string[6:8]


date_partition_path_struct = date_file_path_struct[:4] + "-" + date_file_path_struct[4:6] + "-" + date_file_path_struct[6:8]

Since Python is all about one-liners! We will try to solve this using List Comprehension

n_split = [4, 2, 2]

date_partition_path_struct = "-".join([date_file_path_struct[sum(n_split[:i]):sum(n_split[:i+1])] for i in range(len(n_split))])

We get date_partition_path_struct as ‘2020-04-06’

  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/14/20200406-1436246999.parquet
time_file_path_struct = file_key_name.split('/')[1]

We will further need to split this to separate the file extension. Using the same variable for simplicity

time_file_path_struct = file_key_name.split('/')[1].split('-')[1].split('.')[0]


This gives us time_file_path_struct  as '1436246999'


hour_time_file_path_struct = time_file_path_struct[:2]
  • destination-test-bucket-006/RaviRanjanKumarBharti/2020-04-06/14/36/20200406-1436246999.parquet

Similarly for minute

min_time_file_path_struct = time_file_path_struct[2:4]

# Complete Code

import json
import boto3

# boto3 S3 initialization
s3_client = boto3.client("s3")


def lambda_handler(event, context):
  destination_bucket_name = 'destination-test-bucket-006'

  source_bucket_name = event['Records'][0]['s3']['bucket']['name']

  file_key_name = event['Records'][0]['s3']['object']['key']

  #Split file_key_name with ‘ / ’ to extract Employee & filename
  file_root_dir_struct = file_key_name.split('/')[0]

  file_path_struct = file_key_name.split('/')[1]

  # Split filename with ‘-’ to extract date & time
  date_file_path_struct = file_path_struct.split('-')[0]

  # Date Partition Lazy Solution

  # date_partition_path_struct = date_file_path_struct[:4] + "-" + date_file_path_struct[4:6] + "-" + date_file_path_struct[6:8]

  # Date Partition using List Comprehension

  n_split = [4, 2, 2]

  date_partition_path_struct = "-".join([date_file_path_struct[sum(n_split[:i]):sum(n_split[:i+1])] for i in range(len(n_split))])

  # Split to get time part
  time_file_path_split = file_key_name.split('/')[1]

  # Time Partition
  time_file_path_struct = time_file_path_split.split('-')[1].split('.')[0]

  # Hour Partition
  hour_time_file_path_struct = time_file_path_struct[:2]

  # Minute Partition
  min_time_file_path_struct = time_file_path_struct[2:4]

  # Concat all required strings to form destination path || date
  destination_file_path = file_root_dir_struct + "/" \
   + date_partition_path_struct + "/" + file_path_struct

  # # Concat all required strings to form destination path || hour partition
  # destination_file_path = file_root_dir_struct + "/" + date_partition_path_struct + "/" + \
  #                         hour_time_file_path_struct + "/" + file_path_struct

  # # Concat all required strings to form destination path || minute partition
  destination_file_path = file_root_dir_struct + "/" + date_partition_path_struct + "/" + \
                          hour_time_file_path_struct + "/" + min_time_file_path_struct + "/" + file_path_struct

  # Copy Source Object
  copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}

  # S3 copy object operation
  s3_client.copy_object(CopySource=copy_source_object, Bucket=destination_bucket_name, Key=destination_file_path)

  return {
      'statusCode': 200,
      'body': json.dumps('Hello from S3 events Lambda!')
  }

You can test your implementation by uploading a file in any folders of your source bucket, and then check your destination bucket of the respective Employee.

source-test-bucket-006

destination-test-bucket-006

Conclusion

This has helped us to solve the most popular use-case involved in data migration of storing files in a partitioned structure for better readability.

Hope this two series blog was useful to understand how we can use AWS Lambda and process your S3 objects based on event triggers.

Do leave your comments. Happy reading.

References

https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html

https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

https://stackoverflow.com/questions/44648145/split-the-string-into-different-lengths-chunks

Tags: Amazon S3, AWS Lambda, S3 events, Python, Boto3, S3 Triggers, Lambda Trigger, S3 copy objects, date-partitioned, time-partitioned

Leave a Reply