Automatically Create AWS Athena Partitions For Cloudtrail Between Two Dates

AWS Athena Automatically Create Partition For Between Two Dates

A WS Athena is a schema on read platform. Now Athena is one of best services in AWS to build a Data Lake solutions and do analytics on flat files which are stored in the S3. In the backend its actually using presto clusters. So most of the Presto functions are support and you can use native SQL query to query your data. You can create partitions to speedup your query and reduce the cost for scanning. Here Im gonna explain automatically create AWS Athena partitions for cloudtrail between two dates. When it is introduced I used this for analyze CloudTrail Logs which was very helpful to get some particular activities like who launched this instance, track a particular user’s activity and etc.

But the challenge was I had 3 years of CloudTrail log. It was really a huge data. So If I query to find when an instance is launched then it’ll start scanning the complete data and it took more time and added additional cost for me. Also I used around 8 regions.

Here is my AWS CloudTrail Log path in S3.

s3://bucket/AWSLogs/Account_ID/Cloudtrail/regions/year/month/day/log_files

So I need to create partitions from /regions/year/month/day/
Execute create partition  query for 3 years is not an easy job. Also I  need to run the partition query on 8 regions for 3 years. Uuuuuhhhhh!! It doesn’t make sense. So I have created a Lambda function to make my job easier.  

Automatically Create AWS Athena Partitions

Create the table with Partitions

CREATE EXTERNAL TABLE cloudtrail_log (
eventversion STRING,
useridentity STRUCT<
               type:STRING,
               principalid:STRING,
               arn:STRING,
               accountid:STRING,
               invokedby:STRING,
               accesskeyid:STRING,
               userName:STRING,
sessioncontext:STRUCT<
attributes:STRUCT<
               mfaauthenticated:STRING,
               creationdate:STRING>,
sessionissuer:STRUCT<  
               type:STRING,
               principalId:STRING,
               arn:STRING, 
               accountId:STRING,
               userName:STRING>>>,
eventtime STRING,
eventsource STRING,
eventname STRING,
awsregion STRING,
sourceipaddress STRING,
useragent STRING,
errorcode STRING,
errormessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestid STRING,
eventid STRING,
resources ARRAY<STRUCT<
               ARN:STRING,
               accountId:STRING,
               type:STRING>>,
eventtype STRING,
apiversion STRING,
readonly STRING,
recipientaccountid STRING,
serviceeventdetails STRING,
sharedeventid STRING,
vpcendpointid STRING
)
PARTITIONED BY(region string,year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://sqladmin-cloudtrail/AWSLogs/XXXXXXXXXX/CloudTrail/';

Create the Lambda function

Go to Lambda and create a new function with Python 3.6.

IAM role for Lambda:

  • S3 – ListBukcet.
  • S3 – Read Object on the Cloudtrail log bucket (My bucket Name: sqladmin-cloudtraillog).
  • S3 – Write Object on the athena results bucket (My results bucket: aws-athena-query-results-XXXXXXXXXXXXXXXX-us-east-1).
  • Athena – Create Names query and Execution.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": [
                "arn:aws:s3:::aws-athena-query-results-XXXXXXXXXX-us-east-1",
                "arn:aws:s3:::sqladmin-cloudtrail"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::aws-athena-query-results-XXXXXXXXXXXXXXXX-us-east-1/*"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "s3:GetObjectAcl",
                "s3:GetObject",
                "s3:GetObjectTagging",
                "s3:GetBucketPolicy"
            ],
            "Resource": [
                "arn:aws:s3:::sqladmin-cloudtrail",
                "arn:aws:s3:::sqladmin-cloudtrail/*"
            ]
        },
        {
            "Sid": "VisualEditor3",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:CreateNamedQuery",
                "athena:RunQuery"
            ],
            "Resource": "*"
        }
    ]
}

NOTE

AWS Lambda can run upto 5mins only. So if you want to create partitions for more than 3 months then give the start_date and end_date only for 3 months per execution. OR run this on a server.

Parameters for S3 bucket and Athena

  • ​s3_bucket – Bucket name where your cloudtrail logs stored.
  • s3_prefix – Path for your cloudtrail logs (give the prefix before the regions. For eg: s3://bucket/AWSLogs/AccountID/Cloudtrail/regions/year/month/day/log_files. So you need to use path: AWSLogs/AccountID/Cloudtrail/ ).
  • s3_ouput – Path for where your Athena query results need to be saved.
  • database – Name of the DB where your cloudwatch logs table located.
  • table_name – Nanme of the table where your cloudwatch logs table located.
  • start_date – Start date for creating partition.
  • end_date – Last date for creating partition.

Function for get range from given two dates

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
yield start_date + timedelta(n)

Function for execute the Athena Query

def run_query(query, database, s3_output):
        query_response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
        print('Execution ID: ' + query_response['QueryExecutionId'])
return query_response

Main Function to Automatically create AWS Athena partitions

Athena support 20 concurrent executions only. We need control the Lambda/Python to wait for sometime before execute the next loop. So I have given 2 secs before execute the next loop.

def lambda_handler(event, context):
 result =  s3.list_objects(Bucket=s3_buckcet,Prefix=s3_prefix, Delimiter='/')
 for regions in result.get('CommonPrefixes'):
             get_region=(regions.get('Prefix','').replace(s3_prefix,'').replace('/',''))
             for single_date in daterange(start_date, end_date):
                 partition_day = str(single_date.day).rjust(2, '0')
                 partition_month = str(single_date.month).rjust(2, '0')
                 partition_year = str(single_date.year).rjust(2, '0')
                 print(partition_day, partition_month, partition_year)
                 query = str("ALTER TABLE "+ table_name +" ADD PARTITION (region='"
                 + get_region + "',year="
                 + partition_year + ",month=" 
                 + partition_month + ",day="
                 + partition_day
                 + ") location '"+s3_input
                 + get_region
                 + "/" + partition_year + "/" + partition_month + "/"
                 + partition_day + "';")
                 #print(get_region)
                 #print(query)
                 run_query(query, database, s3_ouput)
time.sleep(2)

Find the final Code

I have created the complete script in gits. You can get it from the below link.

Automate AWS Athena Create Partition On Daily Basis

You may also like this

Leave a Reply

Your email address will not be published. Required fields are marked *