"""Append-only structured audit logging. Writes one JSON record per line to a JSONL file. Supports log rotation and HMAC-SHA256 IP hashing for GDPR compliance. """ from __future__ import annotations import hashlib import hmac import json import os import shutil import threading import uuid from datetime import datetime, timezone from pathlib import Path import structlog logger = structlog.get_logger() class AuditLogger: """Append-only JSONL audit logger with optional rotation.""" def __init__( self, path: str = "logs/audit.jsonl", max_size_bytes: int = 100 * 1024 * 1024, # 100 MB rotate: bool = True, hmac_key: str = "", ) -> None: self.path = Path(path) self.max_size_bytes = max_size_bytes self.rotate = rotate self._lock = threading.Lock() # HMAC key: explicit arg > env var > random per-process key key_str = hmac_key or os.environ.get("AUDIT_HMAC_KEY", "") if key_str: self._hmac_key = key_str.encode() else: self._hmac_key = os.urandom(32) logger.warning( "audit_hmac_key_missing", msg="No HMAC key provided; using random per-process key. " "IP hashes will not be stable across restarts or instances. " "Set AUDIT_HMAC_KEY env var or pass hmac_key for stable audit correlation.", ) def log(self, record: dict) -> None: """Append a record to the audit log. Adds a timestamp if not present. Thread-safe. Best-effort: any exception during the write is caught and logged via structlog, never raised. Audit writes must not be able to crash the request path — a misconfigured filesystem, a rotation bug, or a serialization issue should degrade gracefully to "audit missed, request served," not "audit failed, 500 to user." """ if "timestamp" not in record: record["timestamp"] = datetime.now(timezone.utc).isoformat() try: with self._lock: self.path.parent.mkdir(parents=True, exist_ok=True) if self.rotate and self.path.exists(): if self.path.stat().st_size >= self.max_size_bytes: self._do_rotate() with open(self.path, "a") as f: f.write(json.dumps(record, default=str) + "\n") except Exception as exc: logger.error( "audit_write_failed", error=str(exc), error_type=type(exc).__name__, path=str(self.path), ) def hash_ip(self, ip: str) -> str: """HMAC-SHA256 hash an IP address. Keyed and irreversible.""" return hmac.new(self._hmac_key, ip.encode(), hashlib.sha256).hexdigest() def _do_rotate(self) -> None: """Rotate the current log file with a globally unique suffix.""" ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") uid = uuid.uuid4().hex[:8] rotated = self.path.with_name(f"{self.path.name}.{ts}.{uid}") shutil.move(str(self.path), str(rotated))