Terraform Automation: Daily CloudFront Parquet

T

Hi, DevOps fans

Current article is continuation of 1st part: Making AWS Log Analytics Faster and Cheaper with Athena Partitions

Here we will concentrate at terrafom module that will allow to convert raw CloudFront logs into Parquet + partitions and stores them in parquet table at S3. Let’s start.

Here is physical file’s structure:

Now let’s pass over files gradually. At first some basic files which do not require any bigger explanations:

# main.tf

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = ">=5.54.0"
    }
  }

  required_version = ">=1.8.5"
}
# locals.tf 

locals {
  name_prefix = format("%s-%s", var.project, var.env)

  common_tags = {
    Env       = var.env
    ManagedBy = "terraform"
    Project   = var.project
  }
}
# data.tf 

data "aws_sns_topic" "alarm_topic" {
  name = var.alarm_sns_topic_name
}

S3 bucket for parquet results :

# s3_logs_bucket.tf

locals {
  create_parquet = var.create_parquet_bucket ? { create = true } : {}
}

resource "aws_s3_bucket" "parquet" {
  for_each = local.create_parquet
  bucket   = var.parquet_bucket
  tags     = local.common_tags
}

resource "aws_s3_bucket_public_access_block" "parquet_pab" {
  for_each                = local.create_parquet
  bucket                  = aws_s3_bucket.parquet[each.key].id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_server_side_encryption_configuration" "parquet_enc" {
  for_each = local.create_parquet
  bucket   = aws_s3_bucket.parquet[each.key].id
  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "parquet_lifecycle" {
  for_each = local.create_parquet
  bucket   = aws_s3_bucket.parquet[each.key].id

  rule {
    id     = "expire-all-after-60-days"
    status = "Enabled"

    expiration { days = 60 }

    noncurrent_version_expiration { noncurrent_days = 60 }

    abort_incomplete_multipart_upload { days_after_initiation = 7 }
  }
}

Lambda terraform part:

# lmabda.tf

data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "${path.module}/templates"
  output_path = "${path.module}/lambda.zip"
}

resource "aws_lambda_function" "cloudfront_daily_append" {
  function_name = "${local.name_prefix}-cf-parquet-append"
  role          = aws_iam_role.lambda_exec_role.arn
  handler       = "handler.lambda_handler"
  runtime       = "python3.11"
  timeout       = 300 # 5 minutes hard limit

  filename         = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256

  environment {
    variables = {
      ATHENA_DB          = var.athena_db
      ATHENA_WORKGROUP   = var.athena_workgroup
      OUTPUT_S3          = "s3://${var.athena_results_bucket}/"
      MAX_WAIT_SECONDS   = "270" # leave buffer under 300s Lambda timeout
      POLL_DELAY_SECONDS = "10"
    }
  }

  tags = local.common_tags
}

resource "aws_cloudwatch_log_group" "lambda" {
  name              = "/aws/lambda/${aws_lambda_function.cloudfront_daily_append.function_name}"
  retention_in_days = 30
  tags              = local.common_tags
}

Now, lambda python code which:

  • Insert new partition form yesterday to parquet CloudFront table
  • Uses ClientRequestToken to make StartQueryExecution idempotent on retries for the same day
  • Polls Athena every 5s, up to MAX_WAIT_SECONDS (default 270s), to get query result
  • If the query ends FAILED/CANCELLED, raises exception -> then Lambda Errors alarm will fire
  • If time is up, calls StopQueryExecution then raises a TimeoutError
  • Standardizes partition expressions to date_format
# templates/handler.py

import boto3, os, datetime, json, time

athena = boto3.client("athena")

SQL = """
-- job: cf-parquet-daily
INSERT INTO cloudfront_parquet
SELECT
  date, time, location, bytes, request_ip, method, host, uri, status, referrer,
  user_agent, query_string, cookie, result_type, request_id, host_header,
  request_protocol, request_bytes, time_taken, xforwarded_for,
  ssl_protocol, ssl_cipher, response_result_type, http_version,
  fle_status, fle_encrypted_fields, c_port, time_to_first_byte,
  x_edge_detailed_result_type, sc_content_type, sc_content_len,
  sc_range_start, sc_range_end,
  CAST(EXTRACT(year FROM date) AS INT)  AS year,
  date_format(date, '%%m')              AS month,
  date_format(date, '%%d')              AS day,
  substr(time,1,2)                      AS hour
FROM cloudfront_logs
WHERE date = DATE '%(yesterday)s';
"""

def _now_iso():
    return datetime.datetime.utcnow().isoformat() + "Z"

def lambda_handler(event, context):
    # yesterday in UTC
    y = (datetime.date.today() - datetime.timedelta(days=1)).isoformat()
    query = SQL % {"yesterday": y}

    # soft limit inside Lambda timeout (Lambda timeout remains 300s)
    max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "270"))
    poll_delay = int(os.environ.get("POLL_DELAY_SECONDS", "10"))

    # idempotent start token (prevents duplicate starts if retried)
    token = f"cf-parquet-{y}"

    try:
        resp = athena.start_query_execution(
            QueryString=query,
            ClientRequestToken=token,
            QueryExecutionContext={"Database": os.environ["ATHENA_DB"]},
            WorkGroup=os.environ.get("ATHENA_WORKGROUP", "primary"),
            ResultConfiguration={"OutputLocation": os.environ["OUTPUT_S3"]},
        )
        qid = resp["QueryExecutionId"]
        t0 = time.time()

        while True:
            q = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]
            state = q["Status"]["State"]

            if state in ("SUCCEEDED", "FAILED", "CANCELLED"):
                break

            if time.time() - t0 >= max_wait:
                # stop query if we’re out of time and mark as failure
                try:
                    athena.stop_query_execution(QueryExecutionId=qid)
                except Exception:
                    pass
                raise TimeoutError(f"Athena query {qid} exceeded {max_wait}s; stopped")

            time.sleep(poll_delay)

        result = {
            "time": _now_iso(),
            "yesterday": y,
            "query_execution_id": qid,
            "state": state,
        }

        if state != "SUCCEEDED":
            # Bubble up the reason so CW alarm on Lambda Errors fires
            reason = q["Status"].get("StateChangeReason", "unknown")
            result["reason"] = reason
            print(json.dumps(result))
            raise RuntimeError(f"Athena {qid} finished {state}: {reason}")

        # Optional: include some execution stats for logs
        stats = q.get("Statistics", {})
        result["execution_stats"] = {
            "engine": q.get("EngineVersion", {}).get("SelectedEngineVersion"),
            "data_scanned_bytes": stats.get("DataScannedInBytes"),
            "execution_time_ms": stats.get("EngineExecutionTimeInMillis"),
        }

        print(json.dumps(result))
        return result

    except Exception as e:
        print(json.dumps({"time": _now_iso(), "error": str(e)}))
        raise

To run scheduled running Event Bridge is used:

# eventbridge.tf

resource "aws_cloudwatch_event_rule" "daily" {
  name                = "${local.name_prefix}-cf-parquet-append-daily"
  schedule_expression = var.schedule_cron
  tags                = local.common_tags
}

resource "aws_cloudwatch_event_target" "lambda_target" {
  rule      = aws_cloudwatch_event_rule.daily.name
  target_id = "lambda"
  arn       = aws_lambda_function.cloudfront_daily_append.arn

  retry_policy {
    maximum_retry_attempts       = 0 # disable to avoid double-runs
    maximum_event_age_in_seconds = 3600
  }
}

resource "aws_lambda_permission" "allow_eventbridge" {
  statement_id  = "AllowExecutionFromEventBridge"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.cloudfront_daily_append.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.daily.arn
}

Permissions related code:

# iam.tf

resource "aws_iam_role" "lambda_exec_role" {
  name = "${local.name_prefix}-cf-parquet-append-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect    = "Allow",
      Principal = { Service = "lambda.amazonaws.com" },
      Action    = "sts:AssumeRole"
    }]
  })
}

# Basic Lambda logging
resource "aws_iam_role_policy_attachment" "lambda_basic" {
  role       = aws_iam_role.lambda_exec_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# Least-privilege inline policy for Athena + S3
resource "aws_iam_role_policy" "lambda_permissions" {
  name = "${local.name_prefix}-cf-parquet-append-policy"
  role = aws_iam_role.lambda_exec_role.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Sid    = "AthenaRun"
        Effect = "Allow",
        Action = [
          "athena:StartQueryExecution",
          "athena:GetQueryExecution",
          "athena:StopQueryExecution"
        ],
        Resource = "*"
      },
      {
        Sid    = "GlueReadCatalog"
        Effect = "Allow",
        Action = [
          "glue:GetDatabase",
          "glue:GetDatabases",
          "glue:GetTable",
          "glue:GetTables"
        ],
        Resource = "*"
      },
      {
        Sid    = "ReadRawLogs"
        Effect = "Allow",
        Action = ["s3:GetObject", "s3:ListBucket"],
        Resource = [
          "arn:aws:s3:::${var.raw_logs_bucket}",
          "arn:aws:s3:::${var.raw_logs_bucket}/*"
        ]
      },
      {
        Sid    = "WriteParquetAndResults"
        Effect = "Allow",
        Action = ["s3:PutObject", "s3:GetObject", "s3:ListBucket"],
        Resource = [
          "arn:aws:s3:::${var.parquet_bucket}",
          "arn:aws:s3:::${var.parquet_bucket}/*",
          "arn:aws:s3:::${var.athena_results_bucket}",
          "arn:aws:s3:::${var.athena_results_bucket}/*"
        ]
      },
    ]
  })
}

It is also worth to have alarms at lambda failure:

# alarms.tf

resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
  alarm_name          = "${local.name_prefix}-cf-parquet-lambda-errors"
  alarm_description   = "Daily Athena append Lambda reported an error (incl. timeouts)."
  namespace           = "AWS/Lambda"
  metric_name         = "Errors"
  statistic           = "Sum"
  period              = 300
  evaluation_periods  = 1
  threshold           = 0
  comparison_operator = "GreaterThanThreshold"

  treat_missing_data        = "ignore" # don't flip state when no datapoints
  insufficient_data_actions = []       # no emails while waiting for first datapoint

  dimensions = {
    FunctionName = aws_lambda_function.cloudfront_daily_append.function_name
  }

  alarm_actions = [data.aws_sns_topic.alarm_topic.arn]
  ok_actions    = [data.aws_sns_topic.alarm_topic.arn]
}

Finally, variables:

# variables.tf

variable "raw_logs_bucket" {
  type        = string
  description = "S3 bucket with raw CloudFront logs (text)."
}

variable "parquet_bucket" {
  type        = string
  description = "S3 bucket for Parquet dataset."
}

variable "athena_results_bucket" {
  type        = string
  description = "S3 bucket for Athena query results."
}

variable "athena_db" {
  type    = string
  default = "cloudfront"
}

variable "athena_workgroup" {
  type    = string
  default = "primary"
}

# Schedule in cron format (UTC). Example: 01:30 UTC daily.
variable "schedule_cron" {
  type    = string
  default = "cron(30 1 * * ? *)"
}

variable "alarm_sns_topic_name" {}

variable "create_parquet_bucket" {
  type    = bool
  default = false
}
# variables-env.tf

variable "account_id" {
  type        = string
  description = "AWS Account ID"
}

variable "env" {
  type        = string
  description = "Environment name"
}

variable "project" {
  type        = string
  description = "Project name"
}

variable "region" {
  type        = string
  description = "AWS Region"
}

And here is example of module implementation:

data "terraform_remote_state" "sns" {
  backend = "s3"

  config = {
    bucket = "terraform-state-xxx"
    key    = "dev-sns.tfstate"
    region = var.region
  }
}

provider "aws" {
  region = var.region
}

provider "aws" {
  alias  = "use1"
  region = "us-east-1"
}

module "cloudfront" {
  source    = "../../modules/cf-parquet"
  providers = { aws = aws.use1 }

  account_id = var.account_id
  env        = var.env
  project    = var.project
  region     = var.region

  # buckets
  raw_logs_bucket       = "dev-cloudfront-logs" # raw CF logs (text)
  parquet_bucket        = "dev-analytics-cloudfront" # parquet dataset (must be us-east-1)
  create_parquet_bucket = false                     # set true if you want module to create it

  # Athena
  athena_results_bucket = "athena-query-outcome-us-east-1" # must be us-east-1
  athena_db             = "cloudfront"
  athena_workgroup      = "primary"
  schedule_cron         = "cron(30 1 * * ? *)" # 01:30 UTC daily

  # SNS topic name (in us-east-1); remote state exposes a names map
  alarm_sns_topic_name = data.terraform_remote_state.sns.outputs.name["dev_n_virginia"]
}

Thank you for you attention. If you are interested at similar information, you may sign up for my newsletter:

Best regards.

architecture AWS cluster cyber-security devops devops-basics docker elasticsearch flask geo high availability java machine learning opensearch php programming languages python recommendation systems search systems spring boot symfony

Privacy Overview
Sergii Demianchuk Blog

This website uses cookies so that we can provide you with the best user experience possible. Cookie information is stored in your browser and performs functions such as recognising you when you return to our website and helping our team to understand which sections of the website you find most interesting and useful.

Strictly Necessary Cookies

Strictly Necessary Cookie should be enabled at all times so that we can save your preferences for cookie settings.

3rd Party Cookies

This website uses Google Analytics to collect anonymous information such as the number of visitors to the site, and the most popular pages.

Keeping this cookie enabled helps us to improve our website.