Hi, DevOps fans
Current article is continuation of 1st part: Making AWS Log Analytics Faster and Cheaper with Athena Partitions
Here we will concentrate at terrafom module that will allow to convert raw CloudFront logs into Parquet + partitions and stores them in parquet table at S3. Let’s start.
Here is physical file’s structure:

Now let’s pass over files gradually. At first some basic files which do not require any bigger explanations:
# main.tf
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">=5.54.0"
}
}
required_version = ">=1.8.5"
}
# locals.tf
locals {
name_prefix = format("%s-%s", var.project, var.env)
common_tags = {
Env = var.env
ManagedBy = "terraform"
Project = var.project
}
}
# data.tf
data "aws_sns_topic" "alarm_topic" {
name = var.alarm_sns_topic_name
}
S3 bucket for parquet results :
# s3_logs_bucket.tf
locals {
create_parquet = var.create_parquet_bucket ? { create = true } : {}
}
resource "aws_s3_bucket" "parquet" {
for_each = local.create_parquet
bucket = var.parquet_bucket
tags = local.common_tags
}
resource "aws_s3_bucket_public_access_block" "parquet_pab" {
for_each = local.create_parquet
bucket = aws_s3_bucket.parquet[each.key].id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_server_side_encryption_configuration" "parquet_enc" {
for_each = local.create_parquet
bucket = aws_s3_bucket.parquet[each.key].id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
resource "aws_s3_bucket_lifecycle_configuration" "parquet_lifecycle" {
for_each = local.create_parquet
bucket = aws_s3_bucket.parquet[each.key].id
rule {
id = "expire-all-after-60-days"
status = "Enabled"
expiration { days = 60 }
noncurrent_version_expiration { noncurrent_days = 60 }
abort_incomplete_multipart_upload { days_after_initiation = 7 }
}
}
Lambda terraform part:
# lmabda.tf
data "archive_file" "lambda_zip" {
type = "zip"
source_dir = "${path.module}/templates"
output_path = "${path.module}/lambda.zip"
}
resource "aws_lambda_function" "cloudfront_daily_append" {
function_name = "${local.name_prefix}-cf-parquet-append"
role = aws_iam_role.lambda_exec_role.arn
handler = "handler.lambda_handler"
runtime = "python3.11"
timeout = 300 # 5 minutes hard limit
filename = data.archive_file.lambda_zip.output_path
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
environment {
variables = {
ATHENA_DB = var.athena_db
ATHENA_WORKGROUP = var.athena_workgroup
OUTPUT_S3 = "s3://${var.athena_results_bucket}/"
MAX_WAIT_SECONDS = "270" # leave buffer under 300s Lambda timeout
POLL_DELAY_SECONDS = "10"
}
}
tags = local.common_tags
}
resource "aws_cloudwatch_log_group" "lambda" {
name = "/aws/lambda/${aws_lambda_function.cloudfront_daily_append.function_name}"
retention_in_days = 30
tags = local.common_tags
}
Now, lambda python code which:
- Insert new partition form yesterday to parquet CloudFront table
- Uses ClientRequestToken to make StartQueryExecution idempotent on retries for the same day
- Polls Athena every 5s, up to MAX_WAIT_SECONDS (default 270s), to get query result
- If the query ends FAILED/CANCELLED, raises exception -> then Lambda Errors alarm will fire
- If time is up, calls StopQueryExecution then raises a TimeoutError
- Standardizes partition expressions to date_format
# templates/handler.py
import boto3, os, datetime, json, time
athena = boto3.client("athena")
SQL = """
-- job: cf-parquet-daily
INSERT INTO cloudfront_parquet
SELECT
date, time, location, bytes, request_ip, method, host, uri, status, referrer,
user_agent, query_string, cookie, result_type, request_id, host_header,
request_protocol, request_bytes, time_taken, xforwarded_for,
ssl_protocol, ssl_cipher, response_result_type, http_version,
fle_status, fle_encrypted_fields, c_port, time_to_first_byte,
x_edge_detailed_result_type, sc_content_type, sc_content_len,
sc_range_start, sc_range_end,
CAST(EXTRACT(year FROM date) AS INT) AS year,
date_format(date, '%%m') AS month,
date_format(date, '%%d') AS day,
substr(time,1,2) AS hour
FROM cloudfront_logs
WHERE date = DATE '%(yesterday)s';
"""
def _now_iso():
return datetime.datetime.utcnow().isoformat() + "Z"
def lambda_handler(event, context):
# yesterday in UTC
y = (datetime.date.today() - datetime.timedelta(days=1)).isoformat()
query = SQL % {"yesterday": y}
# soft limit inside Lambda timeout (Lambda timeout remains 300s)
max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "270"))
poll_delay = int(os.environ.get("POLL_DELAY_SECONDS", "10"))
# idempotent start token (prevents duplicate starts if retried)
token = f"cf-parquet-{y}"
try:
resp = athena.start_query_execution(
QueryString=query,
ClientRequestToken=token,
QueryExecutionContext={"Database": os.environ["ATHENA_DB"]},
WorkGroup=os.environ.get("ATHENA_WORKGROUP", "primary"),
ResultConfiguration={"OutputLocation": os.environ["OUTPUT_S3"]},
)
qid = resp["QueryExecutionId"]
t0 = time.time()
while True:
q = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]
state = q["Status"]["State"]
if state in ("SUCCEEDED", "FAILED", "CANCELLED"):
break
if time.time() - t0 >= max_wait:
# stop query if we’re out of time and mark as failure
try:
athena.stop_query_execution(QueryExecutionId=qid)
except Exception:
pass
raise TimeoutError(f"Athena query {qid} exceeded {max_wait}s; stopped")
time.sleep(poll_delay)
result = {
"time": _now_iso(),
"yesterday": y,
"query_execution_id": qid,
"state": state,
}
if state != "SUCCEEDED":
# Bubble up the reason so CW alarm on Lambda Errors fires
reason = q["Status"].get("StateChangeReason", "unknown")
result["reason"] = reason
print(json.dumps(result))
raise RuntimeError(f"Athena {qid} finished {state}: {reason}")
# Optional: include some execution stats for logs
stats = q.get("Statistics", {})
result["execution_stats"] = {
"engine": q.get("EngineVersion", {}).get("SelectedEngineVersion"),
"data_scanned_bytes": stats.get("DataScannedInBytes"),
"execution_time_ms": stats.get("EngineExecutionTimeInMillis"),
}
print(json.dumps(result))
return result
except Exception as e:
print(json.dumps({"time": _now_iso(), "error": str(e)}))
raise
To run scheduled running Event Bridge is used:
# eventbridge.tf
resource "aws_cloudwatch_event_rule" "daily" {
name = "${local.name_prefix}-cf-parquet-append-daily"
schedule_expression = var.schedule_cron
tags = local.common_tags
}
resource "aws_cloudwatch_event_target" "lambda_target" {
rule = aws_cloudwatch_event_rule.daily.name
target_id = "lambda"
arn = aws_lambda_function.cloudfront_daily_append.arn
retry_policy {
maximum_retry_attempts = 0 # disable to avoid double-runs
maximum_event_age_in_seconds = 3600
}
}
resource "aws_lambda_permission" "allow_eventbridge" {
statement_id = "AllowExecutionFromEventBridge"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.cloudfront_daily_append.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.daily.arn
}
Permissions related code:
# iam.tf
resource "aws_iam_role" "lambda_exec_role" {
name = "${local.name_prefix}-cf-parquet-append-role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Effect = "Allow",
Principal = { Service = "lambda.amazonaws.com" },
Action = "sts:AssumeRole"
}]
})
}
# Basic Lambda logging
resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.lambda_exec_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
# Least-privilege inline policy for Athena + S3
resource "aws_iam_role_policy" "lambda_permissions" {
name = "${local.name_prefix}-cf-parquet-append-policy"
role = aws_iam_role.lambda_exec_role.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Sid = "AthenaRun"
Effect = "Allow",
Action = [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:StopQueryExecution"
],
Resource = "*"
},
{
Sid = "GlueReadCatalog"
Effect = "Allow",
Action = [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetTables"
],
Resource = "*"
},
{
Sid = "ReadRawLogs"
Effect = "Allow",
Action = ["s3:GetObject", "s3:ListBucket"],
Resource = [
"arn:aws:s3:::${var.raw_logs_bucket}",
"arn:aws:s3:::${var.raw_logs_bucket}/*"
]
},
{
Sid = "WriteParquetAndResults"
Effect = "Allow",
Action = ["s3:PutObject", "s3:GetObject", "s3:ListBucket"],
Resource = [
"arn:aws:s3:::${var.parquet_bucket}",
"arn:aws:s3:::${var.parquet_bucket}/*",
"arn:aws:s3:::${var.athena_results_bucket}",
"arn:aws:s3:::${var.athena_results_bucket}/*"
]
},
]
})
}
It is also worth to have alarms at lambda failure:
# alarms.tf
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
alarm_name = "${local.name_prefix}-cf-parquet-lambda-errors"
alarm_description = "Daily Athena append Lambda reported an error (incl. timeouts)."
namespace = "AWS/Lambda"
metric_name = "Errors"
statistic = "Sum"
period = 300
evaluation_periods = 1
threshold = 0
comparison_operator = "GreaterThanThreshold"
treat_missing_data = "ignore" # don't flip state when no datapoints
insufficient_data_actions = [] # no emails while waiting for first datapoint
dimensions = {
FunctionName = aws_lambda_function.cloudfront_daily_append.function_name
}
alarm_actions = [data.aws_sns_topic.alarm_topic.arn]
ok_actions = [data.aws_sns_topic.alarm_topic.arn]
}
Finally, variables:
# variables.tf
variable "raw_logs_bucket" {
type = string
description = "S3 bucket with raw CloudFront logs (text)."
}
variable "parquet_bucket" {
type = string
description = "S3 bucket for Parquet dataset."
}
variable "athena_results_bucket" {
type = string
description = "S3 bucket for Athena query results."
}
variable "athena_db" {
type = string
default = "cloudfront"
}
variable "athena_workgroup" {
type = string
default = "primary"
}
# Schedule in cron format (UTC). Example: 01:30 UTC daily.
variable "schedule_cron" {
type = string
default = "cron(30 1 * * ? *)"
}
variable "alarm_sns_topic_name" {}
variable "create_parquet_bucket" {
type = bool
default = false
}
# variables-env.tf
variable "account_id" {
type = string
description = "AWS Account ID"
}
variable "env" {
type = string
description = "Environment name"
}
variable "project" {
type = string
description = "Project name"
}
variable "region" {
type = string
description = "AWS Region"
}
And here is example of module implementation:
data "terraform_remote_state" "sns" {
backend = "s3"
config = {
bucket = "terraform-state-xxx"
key = "dev-sns.tfstate"
region = var.region
}
}
provider "aws" {
region = var.region
}
provider "aws" {
alias = "use1"
region = "us-east-1"
}
module "cloudfront" {
source = "../../modules/cf-parquet"
providers = { aws = aws.use1 }
account_id = var.account_id
env = var.env
project = var.project
region = var.region
# buckets
raw_logs_bucket = "dev-cloudfront-logs" # raw CF logs (text)
parquet_bucket = "dev-analytics-cloudfront" # parquet dataset (must be us-east-1)
create_parquet_bucket = false # set true if you want module to create it
# Athena
athena_results_bucket = "athena-query-outcome-us-east-1" # must be us-east-1
athena_db = "cloudfront"
athena_workgroup = "primary"
schedule_cron = "cron(30 1 * * ? *)" # 01:30 UTC daily
# SNS topic name (in us-east-1); remote state exposes a names map
alarm_sns_topic_name = data.terraform_remote_state.sns.outputs.name["dev_n_virginia"]
}
Thank you for you attention. If you are interested at similar information, you may sign up for my newsletter:
Best regards.