| | """ |
| | Lambda handler to export DynamoDB usage log table to CSV and upload to S3. |
| | |
| | All inputs are read from environment variables (no argparse). |
| | Intended to run as an AWS Lambda function; can also be invoked locally |
| | by setting env vars and calling lambda_handler({}, None). |
| | |
| | Environment variables (same semantics as load_dynamo_logs.py CLI): |
| | DYNAMODB_TABLE_NAME - DynamoDB table name (default: redaction_usage) |
| | AWS_REGION - AWS region (optional; if unset, uses AWS_DEFAULT_REGION, |
| | then region from Lambda context ARN, then eu-west-2) |
| | OUTPUT_FOLDER - Local output directory, e.g. /tmp (optional) |
| | OUTPUT_FILENAME - Local output file name (default: dynamodb_logs_export.csv) |
| | OUTPUT - Full local output path (overrides folder + filename if set). |
| | In Lambda only /tmp is writable; relative paths are auto-resolved to /tmp. |
| | FROM_DATE - Only include entries on/after this date YYYY-MM-DD (optional) |
| | TO_DATE - Only include entries on/before this date YYYY-MM-DD (optional) |
| | DATE_ATTRIBUTE - Attribute name for date filtering (default: timestamp) |
| | S3_OUTPUT_BUCKET - S3 bucket for the output CSV (required for upload) |
| | S3_OUTPUT_KEY - S3 object key/path for the output CSV (required for upload) |
| | """ |
| |
|
| | import csv |
| | import datetime |
| | import os |
| | from decimal import Decimal |
| | from io import StringIO |
| |
|
| | import boto3 |
| |
|
| |
|
| | def _get_region_from_context(context): |
| | """Extract region from Lambda context invoked_function_arn (arn:aws:lambda:REGION:ACCOUNT:function:NAME).""" |
| | if context is None: |
| | return None |
| | arn = getattr(context, "invoked_function_arn", None) |
| | if not arn or not isinstance(arn, str): |
| | return None |
| | parts = arn.split(":") |
| | if len(parts) >= 4: |
| | return parts[3] |
| | return None |
| |
|
| |
|
| | def get_config_from_env(context=None): |
| | """Read all settings from environment variables (same inputs as load_dynamo_logs.py). |
| | When running in Lambda, context can be passed to derive region from the function ARN if env is not set. |
| | """ |
| | today = datetime.datetime.now().date() |
| | one_year_ago = today - datetime.timedelta(days=365) |
| |
|
| | table_name = os.environ.get("DYNAMODB_TABLE_NAME") or os.environ.get( |
| | "USAGE_LOG_DYNAMODB_TABLE_NAME", "redaction_usage" |
| | ) |
| | region = ( |
| | os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or "" |
| | ).strip() |
| | output = os.environ.get("OUTPUT") |
| | output_folder = os.environ.get("OUTPUT_FOLDER", "output/") |
| | output_filename = os.environ.get("OUTPUT_FILENAME", "dynamodb_logs_export.csv") |
| | from_date_str = os.environ.get("FROM_DATE") |
| | to_date_str = os.environ.get("TO_DATE") |
| | date_attribute = os.environ.get("DATE_ATTRIBUTE", "timestamp") |
| | s3_output_bucket = os.environ.get("S3_OUTPUT_BUCKET") |
| | s3_output_key = os.environ.get("S3_OUTPUT_KEY") |
| |
|
| | if output: |
| | local_output_path = output |
| | else: |
| | folder = output_folder.rstrip("/").rstrip("\\") |
| | local_output_path = os.path.join(folder, output_filename) |
| |
|
| | |
| | if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): |
| | resolved = os.path.abspath(local_output_path) |
| | if not resolved.startswith("/tmp"): |
| | local_output_path = os.path.join( |
| | "/tmp", os.path.basename(local_output_path) |
| | ) |
| |
|
| | |
| | if not region and context is not None: |
| | region = _get_region_from_context(context) or "" |
| | if not region: |
| | region = "FILL IN DEFAULT REGION HERE" |
| |
|
| | from_date = None |
| | to_date = None |
| | if from_date_str: |
| | from_date = datetime.datetime.strptime(from_date_str, "%Y-%m-%d").date() |
| | if to_date_str: |
| | to_date = datetime.datetime.strptime(to_date_str, "%Y-%m-%d").date() |
| | if from_date is None and to_date is None: |
| | from_date = one_year_ago |
| | to_date = today |
| | elif from_date is None: |
| | from_date = one_year_ago |
| | elif to_date is None: |
| | to_date = today |
| |
|
| | return { |
| | "table_name": table_name, |
| | "region": region, |
| | "local_output_path": local_output_path, |
| | "from_date": from_date, |
| | "to_date": to_date, |
| | "date_attribute": date_attribute, |
| | "s3_output_bucket": s3_output_bucket, |
| | "s3_output_key": s3_output_key, |
| | } |
| |
|
| |
|
| | |
| | def convert_types(item): |
| | new_item = {} |
| | for key, value in item.items(): |
| | if isinstance(value, Decimal): |
| | new_item[key] = int(value) if value % 1 == 0 else float(value) |
| | elif isinstance(value, str): |
| | try: |
| | dt_obj = datetime.datetime.fromisoformat(value.replace("Z", "+00:00")) |
| | new_item[key] = dt_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] |
| | except (ValueError, TypeError): |
| | new_item[key] = value |
| | else: |
| | new_item[key] = value |
| | return new_item |
| |
|
| |
|
| | def _parse_item_date(value): |
| | """Parse a DynamoDB attribute value to datetime for comparison. Returns None if unparseable.""" |
| | if value is None: |
| | return None |
| | if isinstance(value, Decimal): |
| | try: |
| | return datetime.datetime.utcfromtimestamp(float(value)) |
| | except (ValueError, OSError): |
| | return None |
| | if isinstance(value, (int, float)): |
| | try: |
| | return datetime.datetime.utcfromtimestamp(float(value)) |
| | except (ValueError, OSError): |
| | return None |
| | if isinstance(value, str): |
| | for fmt in ( |
| | "%Y-%m-%d %H:%M:%S.%f", |
| | "%Y-%m-%d %H:%M:%S", |
| | "%Y-%m-%d", |
| | "%Y-%m-%dT%H:%M:%S", |
| | ): |
| | try: |
| | return datetime.datetime.strptime(value, fmt) |
| | except (ValueError, TypeError): |
| | continue |
| | try: |
| | return datetime.datetime.fromisoformat(value.replace("Z", "+00:00")) |
| | except (ValueError, TypeError): |
| | pass |
| | return None |
| |
|
| |
|
| | def filter_items_by_date(items, from_date, to_date, date_attribute: str): |
| | """Return items whose date attribute falls within [from_date, to_date] (inclusive).""" |
| | if from_date is None and to_date is None: |
| | return items |
| | start = datetime.datetime.combine(from_date, datetime.time.min) |
| | end = datetime.datetime.combine(to_date, datetime.time.max) |
| | filtered = [] |
| | for item in items: |
| | raw = item.get(date_attribute) |
| | dt = _parse_item_date(raw) |
| | if dt is None: |
| | continue |
| | if dt.tzinfo: |
| | dt = dt.replace(tzinfo=None) |
| | if start <= dt <= end: |
| | filtered.append(item) |
| | return filtered |
| |
|
| |
|
| | def scan_table(table): |
| | """Paginated scan of DynamoDB table.""" |
| | items = [] |
| | response = table.scan() |
| | items.extend(response["Items"]) |
| | while "LastEvaluatedKey" in response: |
| | response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"]) |
| | items.extend(response["Items"]) |
| | return items |
| |
|
| |
|
| | def export_to_csv_buffer(items, fields_to_drop=None): |
| | """ |
| | Write items to a CSV in memory; return (csv_string, fieldnames). |
| | Use for uploading to S3 without writing to disk. |
| | """ |
| | if not items: |
| | return "", [] |
| |
|
| | drop_set = set(fields_to_drop or []) |
| | all_keys = set() |
| | for item in items: |
| | all_keys.update(item.keys()) |
| | fieldnames = sorted(list(all_keys - drop_set)) |
| |
|
| | buf = StringIO() |
| | writer = csv.DictWriter( |
| | buf, fieldnames=fieldnames, extrasaction="ignore", restval="" |
| | ) |
| | writer.writeheader() |
| | for item in items: |
| | writer.writerow(convert_types(item)) |
| | return buf.getvalue(), fieldnames |
| |
|
| |
|
| | def export_to_csv_file(items, output_path, fields_to_drop=None): |
| | """Write items to a CSV file (for optional /tmp or local path).""" |
| | csv_string, _ = export_to_csv_buffer(items, fields_to_drop) |
| | if not csv_string: |
| | return |
| | os.makedirs(os.path.dirname(os.path.abspath(output_path)) or ".", exist_ok=True) |
| | with open(output_path, "w", newline="", encoding="utf-8-sig") as f: |
| | f.write(csv_string) |
| |
|
| |
|
| | def run_export(config): |
| | """ |
| | Run the full export: scan DynamoDB, filter by date, write CSV (buffer and/or file), upload to S3. |
| | """ |
| | table_name = config["table_name"] |
| | region = config["region"] |
| | local_output_path = config["local_output_path"] |
| | from_date = config["from_date"] |
| | to_date = config["to_date"] |
| | date_attribute = config["date_attribute"] |
| | s3_output_bucket = config["s3_output_bucket"] |
| | s3_output_key = config["s3_output_key"] |
| |
|
| | if from_date > to_date: |
| | raise ValueError("FROM_DATE must be on or before TO_DATE") |
| |
|
| | dynamodb = boto3.resource("dynamodb", region_name=region or None) |
| | table = dynamodb.Table(table_name) |
| |
|
| | items = scan_table(table) |
| | items = filter_items_by_date(items, from_date, to_date, date_attribute) |
| |
|
| | csv_string, fieldnames = export_to_csv_buffer(items, fields_to_drop=[]) |
| | result = { |
| | "item_count": len(items), |
| | "from_date": str(from_date), |
| | "to_date": str(to_date), |
| | "columns": fieldnames, |
| | } |
| |
|
| | if csv_string: |
| | |
| | try: |
| | export_to_csv_file(items, local_output_path, fields_to_drop=[]) |
| | result["local_path"] = local_output_path |
| | except Exception as e: |
| | result["local_write_error"] = str(e) |
| |
|
| | |
| | if s3_output_bucket and s3_output_key: |
| | s3 = boto3.client("s3", region_name=region or None) |
| | s3.put_object( |
| | Bucket=s3_output_bucket, |
| | Key=s3_output_key, |
| | Body=csv_string.encode("utf-8-sig"), |
| | ContentType="text/csv; charset=utf-8", |
| | ) |
| | result["s3_uri"] = f"s3://{s3_output_bucket}/{s3_output_key}" |
| | elif s3_output_bucket or s3_output_key: |
| | result["s3_skip_reason"] = ( |
| | "Both S3_OUTPUT_BUCKET and S3_OUTPUT_KEY must be set" |
| | ) |
| |
|
| | return result |
| |
|
| |
|
| | def lambda_handler(event, context): |
| | """ |
| | AWS Lambda entrypoint. Config is read from environment variables. |
| | |
| | Event is not required for config; it can be used to override env vars |
| | (e.g. pass table_name, from_date, to_date, s3_output_bucket, s3_output_key). |
| | """ |
| | config = get_config_from_env(context=context) |
| |
|
| | |
| | if isinstance(event, dict): |
| | if event.get("table_name"): |
| | config["table_name"] = event["table_name"] |
| | if event.get("region"): |
| | config["region"] = event["region"] |
| | if event.get("from_date"): |
| | config["from_date"] = datetime.datetime.strptime( |
| | event["from_date"], "%Y-%m-%d" |
| | ).date() |
| | if event.get("to_date"): |
| | config["to_date"] = datetime.datetime.strptime( |
| | event["to_date"], "%Y-%m-%d" |
| | ).date() |
| | if event.get("date_attribute"): |
| | config["date_attribute"] = event["date_attribute"] |
| | if event.get("s3_output_bucket"): |
| | config["s3_output_bucket"] = event["s3_output_bucket"] |
| | if event.get("s3_output_key"): |
| | config["s3_output_key"] = event["s3_output_key"] |
| |
|
| | result = run_export(config) |
| | return {"statusCode": 200, "body": result} |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | import json |
| |
|
| | result = lambda_handler({}, None) |
| | print(json.dumps(result, indent=2)) |
| |
|