This week I will talk about Amazon Web Services CloudTrail. Cloudtrail is the service that keeps all logs related to AWS API calls. For example, restarting an instance, creating a bucket, console login etc. When we enable CloudTrail, logs are have to be send to a S3 bucket and all logs are encrypted by using server-side encryption. Also logs can be send to CloudWatch log groups (to create metrics and alarms) and also to a SNS topic (to deliver the logs immediately). Cloudtrail is created per region. So you have to enable it for all regions you operate. You don’t have to create a bucket for all regions. You can use the same bucket you created before (of course, with the rights permissions using bucket policy). As AWS says, the logs are delivered within 15 minutes.
When we want to create trail these step should be done:
- Enable Cloudtrail
- Setup a S3 bucket. Normally if we create the bucket when we create the trail, necessary permissions are set by AWS. Otherwise we have to add a bucket policy.
- Setup log group for Cloudwatch logs. This is an optional step. Again if we use it via Cloudtrail, all necessary permissions are set.
- Setup SNS. This is an optional step too. When we configure the topic, only notification is managed by Cloudtrail. We have to manuall configure the subscriptions.
Let’s see how to configure the CloudTrail using AWS Console.
As you see, I created a bucket named “wekanban-cloudtrail” and a bucket policy is generated automatically for necessary permissions. I also enabled SNS publications since I will use it in my demo. Finally, I enabled Cloudwatch Logs. Again, if you configure the setting via Cloudtrail console,cli etc, it will create all the roles,policies and permissions for you. By default , Global services ( services that are not specific to any region lie IAM,STS etc ) are included.
After we create the trail, we can filter and search event using the filters , Username, Event Name, Resource Type, Resource Name and Time Range using API activity history.
As I said, If you enable Cloudwatch logs for CloudTrail, you can create metrics and alarms for them. For example, you can create a metric for failed authentication attemps, then create alarm to be notified. You can see more examples here
Today I will use a sample code that does the same example. In my setup, I enabled CloudTrail and configured it to publish to a SNS topic. A SQS queue is subscribed to this topic and my code will poll this queue. If there is an authentication failure event, it will write the username and the ip address.
For this setup I only manually subscribe the sqs with the sns topic. First I search for messages in the queue. If there is, I get the bucket name and the object key. Then I connect to s3 using the bucket name and the objectkey and download the object to my tmp folder. Since it is gzip file, I unzip it and search for the “Failed authentication” message int the content. If I find one, I print it to the console. Finally I delete the message from the queue.
import boto3 import json from boto3.s3.transfer import S3Transfer import gzip def poll_queue(): print('Polling queue...') sqs = boto3.client('sqs') url = sqs.get_queue_url(QueueName='Cloudtrail-queue')['QueueUrl'] messages = sqs.receive_message(QueueUrl=url, MaxNumberOfMessages=10) #Search for messages in the queue if 'Messages' in messages: body = messages['Messages']['Body'] receipthandle = messages['Messages']['ReceiptHandle'] #Receipthandle is used when we want the delete the message from the queue bucket = json.loads(body)['s3Bucket'] object_key = json.loads(body)['s3ObjectKey'] #Parse the log file parse_log(bucket, object_key) #Delete the message from the queue sqs.delete_message(QueueUrl=url, ReceiptHandle=receipthandle) else: print("There are no messages in the queue") def parse_log(bucket, logfile): s3 = boto3.client('s3') transfer = S3Transfer(s3) local_file = '/tmp/cloudtrail' #Download the log file to local transfer.download_file(bucket, logfile, local_file) #Unzip it and search for the pattern, failed authentication file = gzip.open(local_file, "r") file_content = json.loads(file.read().decode("utf-8"))['Records'] for content in file_content: #print(content) if 'errorMessage' in content: if content['errorMessage'] == 'Failed authentication': print("Authentication failed username ", content['userIdentity']['userName'], "from the ip address ", content['sourceIPAddress'],) file.close() if __name__ == "__main__": try: poll_queue() except: raise
If there is no messages in the queue
Polling queue... There are no messages in the queue
If we match the pattern:
Polling queue... Authentication failed username onur from the ip address 220.127.116.11
You can find the sample code here
This was the basic explanation of CloudTrail service. I hope you find it useful. If you have any question or comment, please feel free to write and don’t forget to share please.