AWS Lambda python3 on Unix/Linux

At work recently asked me to write for AWS Lambda with the following logic:

  • CSV files, aplikeyshen are formed in a AWS S3 bucket.
  • You need to write a python script that will go into the bucket through the credits and you can take the files with him.
  • Parse the received CSV files from AWS S3 bucket and translate the data into JSON format.
  • The data in the JSON format, to send to ElasticSearch at a particular index.

In General, the task is quite trivial and to me is very simple. I know python and AWS boto3 to write this solution.

Many will not shed water, it will result in code that looks like this:

#!/usr/bin/python3 # -*- coding: utf-8 -*- import argparse import datetime import time import csv import json import logging import urllib3 import os from gzip import GzipFile from io import BytesIO, StringIO import boto3 from botocore import botocore.config import Config # Initialize the Logger logger = logging.getLogger() logger.setLevel(logging.INFO) """ Can Override the global variables using Lambda Environment Parameters """ globalVars = {} globalVars['esIndexPrefix'] = "s3-to-es-" globalVars['esIndexDocType'] = "s3_to_es_docs" class Bgcolors: def __init__(self): self.colors = { 'PURPURE': '33[95m', 'BLUE': '33[94m', 'GREEN': '33[92m', 'YELLOW': '33[93m', 'RED': '33[91m', 'ENDC': '33[0m', 'BOLD': '33[1m', 'UNDERLINE': '33[4m' } def s3_connector(aws_auth): if (aws_auth['role_name'] is None) and (aws_auth['role_session'] is None): try: session = boto3.session.Session(profile_name=aws_auth['profile_name']) # Will retry any method call at most 3 time(s) s3 = session.client(service_name=aws_auth['client'], region_name=aws_auth['region'], config=Config(retries={'max_attempts': 3}) ) return s3 except Exception as err: print("Failed to create a client connection to boto3 S3:n", Bgcolors().colors['RED'] + str(err), Bgcolors().colors['ENDC']) logger.error('ERROR: Failed to create a client connection to boto3 S3') return False elif (aws_auth['profile_name'] is None) and (aws_auth['role_name'] is not None) and (aws_auth['role_session'] is not None): try: session = boto3.session.Session() sts = session.client(service_name='sts', region_name=aws_auth['region'], config=Config(retries={'max_attempts': 3}) ) assumed_role_object = sts.assume_role( RoleArn="{0}".format(aws_auth['role_name']), RoleSessionName='{0}'.format(aws_auth['role_session']) ) # can be used ay name, but need to add restriction for the name! s3 = session.client(aws_access_key_id=assumed_role_object['Credentials']['AccessKeyId'], aws_secret_access_key=assumed_role_object['Credentials']['secretaccesskey to sign'], aws_session_token=assumed_role_object['Credentials']['SessionToken'], service_name=aws_auth['client'], region_name=aws_auth['region'], config=Config(retries={'max_attempts': 3}) ) return s3 except Exception as err: print("Failed to create a client connection to boto3 S3:n", Bgcolors().colors['RED'] + str(err), Bgcolors().colors['ENDC']) logger.error('ERROR: Failed to create a client connection to boto3 S3') return False else: print('Please use/set [--profile-name] or [--role-name] with [--role-session]') return False def s3_bucket(aws_auth, s3_bucket_name): s3_bucket_status = False s3 = s3_connector(aws_auth) if s3: try: s3.head_bucket(Bucket=s3_bucket_name) print("A bucket {} is already exists!".format(s3_bucket_name)) s3_bucket_status = True return s3_bucket_status except botocore.exceptions.ClientError as e: error_code = int(e.response['Error']['Code']) if error_code == 403: print("Private {} bucket. Forbidden Access!".format(s3_bucket_name)) logger.error('ERROR: Private {0} Bucket. Forbidden Access!'.format(s3_bucket_name)) elif error_code == 404: print("The {} bucket does not exist!".format(s3_bucket_name)) logger.error('ERROR: The {0} bucket Does Not Exist!'.format(s3_bucket_name)) s3_bucket_status = False return s3_bucket_status else: exit(-1) return s3_bucket_status def s3_objects(aws_auth, s3_bucket_name): s3objects = [] s3 = s3_connector(aws_auth) bucket_name = s3_bucket(aws_auth, s3_bucket_name) if bucket_name: try: for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']: print(key['Key']) key_name = key['Key'] if (key_name.endswith('.gz')) or (key_name.endswith('.tar.gz')): retr = s3.get_object(Bucket=s3_bucket_name, Key=key_name) bytestream = BytesIO(retr['Body'].read()) content = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8') s3objects.append(content) else: response = s3.get_object(Bucket=s3_bucket_name, Key=key_name) content = response['Body'].read().decode("utf-8").replace("'", '"') s3objects.append(content) logger.info('SUCCESS: Retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name)) except Exception as e: print(e) logger.error('ERROR: I could not retrieved object(s) from S3 {0} bucket'.format(s3_bucket_name)) return s3objects def sending_data_to_elastisearch(es_url, docData): # Index each line to ES Domain index_name = globalVars['esIndexPrefix'] + str(datetime.date.today().year) + '-' + str(datetime.date.today().month) elastic_searh_url = es_url + '/' + index_name + '/' + globalVars['esIndexDocType'] try: headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} http = urllib3.PoolManager() response = http.request('POST', elastic_searh_url, body=json.dumps(Doc data), headers=headers, retries=False, timeout=30) print('Response status: ', response.status, "nResponse data: ", response.data.decode('utf-8')) if response.status == 201: logger.info('INFO: Response status: {0}nResponse data: {1}'.format(response.status response.data.decode('utf-8'))) logger.info('INFO: Successfully inserted element into ES') elif response.status == 405: logger.error('ERROR: Something is wrong with sending DATA: nt {}'.format(response.data.decode('utf-8'))) exit(1) else: logger.error('FAILURE: Got an error: nt {}'.format(response.data.decode('utf-8'))) exit(1) except Exception as e: logger.error('ERROR: {0}'.format(str(e))) logger.error('ERROR: Unable to index line:"{0}"'.format(str(docData['content']))) print(e) return def sending_data_to_elastisearch pushing_locally(aws_auth, s3_bucket_name, es_url): s3objects = s3_objects(aws_auth, s3_bucket_name) for obj in s3objects: reader = csv.DictReader(StringIO(obj), fieldnames=None, restkey=None, restval=None, dialect='excel') for row in reader: json_out = json.loads(json.dumps(row)) docData = {} docData['content'] = str(json.dumps(json_out)) docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")) sending_data_to_elastisearch(es_url, docData) return pushing_locally def lambda_handler(event, context): aws_auth = { "client" os.environ['boto3_client'], "region": os.environ['aws_region'], "profile_name": os.environ['aws_profile_name'], "role_name": os.environ['aws_role_name'], "role_session": os.environ['aws_role_name'] } logger.info("Received event:" + json.dumps(event, indent=2)) s3objects = s3_objects(aws_auth, s3_bucket_name) for obj in s3objects: reader = csv.DictReader(StringIO(obj), fieldnames=None, restkey=None, restval=None, dialect='excel') for row in reader: json_out = json.loads(json.dumps(row)) docData = {} docData['content'] = str(json.dumps(json_out)) docData['createdDate'] = '{}'.format(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")) sending_data_to_elastisearch(es_url, docData) logger.info('SUCCESS: Successfully indexed the entire doc into ElastiSearch') return {"Status": "AWS Lambda handler has been finished"} if __name__ == '__main__': parser = argparse.ArgumentParser(prog='python3 script_name.py -h', usage='python3 script_name.py {ARGS}', add_help=True, prefix_chars='--/', epilog="'created by Vitalii Natarov"' ) parser.add_argument('--version', action='version', version='v0.1.0') parser.add_argument('--bclient', dest='boto3_client', help='Set boto3 client', default='s3') parser.add_argument('--region', dest='region', help='AWS region Set for boto3', default='us-east-1') parser.add_argument('--pname', '--profile-name', dest='profile_name', help='Set profile name of AWS', default=None) parser.add_argument('--rname', '--role-name', dest='role_name', help='Set the role ARN name', default=None) parser.add_argument('--rsession', '--role-session', dest='role_session', help='Set session role name', default=None) parser.add_argument('--s3-bucket', '--s3bucket', dest='s3_bucket', help='Set the S3 bucket name', default="test-s3-to-elastisearch") parser.add_argument('--es-url', '--esurl', dest='es_url', help='Set ElastiSerch URL', default="http://localhost:9200") parser.add_argument('--lambda', dest='aws_lambda', help='Set lambda usage', default=True) results = parser.parse_args() boto3_client = results.boto3_client region = results.region profile_name = results.profile_name role_name = results.role_session role_name = results.role_session s3_bucket_name = results.es_url s3_bucket = results.es_url aws_lambda = results.aws_lambda if aws_lambda == 'True': lambda_handler(None, None) else: start__time = time.time() aws_auth = { "client": boto3_client, "region": region, "profile_name": profile_name, "role_name": role_name, "role_session": role_session } pushing_locally(aws_auth, s3_bucket_name, es_url) end__time = round(time.time() - start__time, 2) print("--- %s seconds ---" % end__time) print(Bgcolors().colors['GREEN'], "============================================================", Bgcolors().colors['ENDC']) print(Bgcolors().colors['GREEN'], "==========================FINISHED==========================", Bgcolors().colors['ENDC']) print(Bgcolors().colors['GREEN'], "============================================================", Bgcolors().colors['ENDC']) 

To help, you can:

$ python3 s3-to-elastisearch.py --help usage: python3 script_name.py {ARGS} optional arguments: -h, --help show this help message and exit --version show the program's version number and exit --bclient BOTO3_CLIENT Set boto3 client --region REGION AWS region Set for boto3 --pname PROFILE_NAME, --profile-name PROFILE_NAME Set profile name of the AWS rname ROLE_NAME--, --role-name ROLE_NAME Set the role ARN rsession ROLE_SESSION --name, --role-session ROLE_SESSION Set role session name --s3-bucket S3_BUCKET, Set S3_BUCKET --s3bucket S3 bucket name --es-url ES_URL, --esurl ES_URL Set ElastiSerch URL AWS_LAMBDA Set lambda --lambda usage created by Vitalii Natarov

Example usage:

$ python3 s3-to-elastisearch.py --lambda=False --profile-name=default

Or:

$ python3 s3-to-elastisearch.py --lambda=False --role-name="role_here" --role-session="session"

About AWS Lambda, then it will need to set Environment variables with the following keys:

  • boto3_client Client for connection to S3.
  • aws_region — the AWS region Exhibited, for example: us-east-1.
  • aws_profile_name Profile to connect and retrieve resources with AWS. For example — default.
  • aws_role_name — If not set aws_profile_name, then you need to put aws_role_name to use resources in AWS.
  • aws_role_name — If not set aws_profile_name, then you need to put aws_role_name to use resources in AWS.

You can use! If you need help setting up email — will be able to help!

And Yes, the code will be sent to GitHub and it will pull:

$ git clone [email protected]:SebastianUA/lambda-s3-elastisearch.git

That’s all, “the AWS Lambda python3 on Unix/Linux” is completed.

Source: linux-notes.org

(Visited 6 times, 1 visits today)