""" security_logger.py ================== Structured security event logger. All attack attempts, flagged inputs, and guardrail violations are written as JSON-Lines (one JSON object per line) to a rotating log file. Logs are also emitted to the Python logging framework so they appear in stdout / application log aggregators. Log schema per event: { "timestamp": "", "event_type": "request_blocked|request_flagged|request_safe|output_blocked", "risk_score": 0.91, "risk_level": "critical", "attack_type": "prompt_injection", "attack_category": "system_override", "flags": [...], "prompt_hash": "", # never log raw PII "sanitized_preview": "first 120 chars of sanitized prompt", } """ from __future__ import annotations import hashlib import json import logging import os import time from datetime import datetime, timezone from logging.handlers import RotatingFileHandler from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from ai_firewall.guardrails import FirewallDecision from ai_firewall.output_guardrail import GuardrailResult _pylogger = logging.getLogger("ai_firewall.security_logger") class SecurityLogger: """ Writes structured JSON-Lines security events to a rotating log file and forwards a summary to the Python logging system. Parameters ---------- log_dir : str Directory where `ai_firewall_security.jsonl` will be written. max_bytes : int Max log-file size before rotation (default 10 MB). backup_count : int Number of rotated backup files to keep (default 5). """ def __init__( self, log_dir: str = ".", max_bytes: int = 10 * 1024 * 1024, backup_count: int = 5, ) -> None: os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, "ai_firewall_security.jsonl") handler = RotatingFileHandler( log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8" ) handler.setFormatter(logging.Formatter("%(message)s")) # raw JSON lines self._file_logger = logging.getLogger("ai_firewall.events") self._file_logger.setLevel(logging.DEBUG) # Avoid duplicate handlers if logger already set up if not self._file_logger.handlers: self._file_logger.addHandler(handler) self._file_logger.propagate = False # don't double-log to root _pylogger.info("Security event log → %s", log_path) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _hash_prompt(prompt: str) -> str: return hashlib.sha256(prompt.encode()).hexdigest()[:16] @staticmethod def _now() -> str: return datetime.now(timezone.utc).isoformat() def _write(self, event: dict) -> None: self._file_logger.info(json.dumps(event, ensure_ascii=False)) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def log_request( self, prompt: str, sanitized: str, decision: "FirewallDecision", ) -> None: """Log the input-check decision.""" rr = decision.risk_report status = rr.status.value event_type = ( "request_blocked" if status == "blocked" else "request_flagged" if status == "flagged" else "request_safe" ) event = { "timestamp": self._now(), "event_type": event_type, "risk_score": rr.risk_score, "risk_level": rr.risk_level.value, "attack_type": rr.attack_type, "attack_category": rr.attack_category, "flags": rr.flags, "prompt_hash": self._hash_prompt(prompt), "sanitized_preview": sanitized[:120], "injection_score": rr.injection_score, "adversarial_score": rr.adversarial_score, "latency_ms": rr.latency_ms, } self._write(event) if status in ("blocked", "flagged"): _pylogger.warning("[%s] %s | score=%.3f", event_type.upper(), rr.attack_type or "unknown", rr.risk_score) def log_response( self, output: str, safe_output: str, guardrail_result: "GuardrailResult", ) -> None: """Log the output guardrail decision.""" event_type = "output_safe" if guardrail_result.is_safe else "output_blocked" event = { "timestamp": self._now(), "event_type": event_type, "risk_score": guardrail_result.risk_score, "flags": guardrail_result.flags, "output_hash": self._hash_prompt(output), "redacted": not guardrail_result.is_safe, "latency_ms": guardrail_result.latency_ms, } self._write(event) if not guardrail_result.is_safe: _pylogger.warning("[OUTPUT_BLOCKED] flags=%s score=%.3f", guardrail_result.flags, guardrail_result.risk_score) def log_raw_event(self, event_type: str, data: dict) -> None: """Log an arbitrary structured event.""" event = {"timestamp": self._now(), "event_type": event_type, **data} self._write(event)