Spaces:
No application file
No application file
| """ | |
| security_logger.py | |
| ================== | |
| Structured security event logger. | |
| All attack attempts, flagged inputs, and guardrail violations are | |
| written as JSON-Lines (one JSON object per line) to a rotating log file. | |
| Logs are also emitted to the Python logging framework so they appear in | |
| stdout / application log aggregators. | |
| Log schema per event: | |
| { | |
| "timestamp": "<ISO-8601>", | |
| "event_type": "request_blocked|request_flagged|request_safe|output_blocked", | |
| "risk_score": 0.91, | |
| "risk_level": "critical", | |
| "attack_type": "prompt_injection", | |
| "attack_category": "system_override", | |
| "flags": [...], | |
| "prompt_hash": "<sha256[:16]>", # never log raw PII | |
| "sanitized_preview": "first 120 chars of sanitized prompt", | |
| } | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from datetime import datetime, timezone | |
| from logging.handlers import RotatingFileHandler | |
| from typing import TYPE_CHECKING, Optional | |
| if TYPE_CHECKING: | |
| from ai_firewall.guardrails import FirewallDecision | |
| from ai_firewall.output_guardrail import GuardrailResult | |
| _pylogger = logging.getLogger("ai_firewall.security_logger") | |
| class SecurityLogger: | |
| """ | |
| Writes structured JSON-Lines security events to a rotating log file | |
| and forwards a summary to the Python logging system. | |
| Parameters | |
| ---------- | |
| log_dir : str | |
| Directory where `ai_firewall_security.jsonl` will be written. | |
| max_bytes : int | |
| Max log-file size before rotation (default 10 MB). | |
| backup_count : int | |
| Number of rotated backup files to keep (default 5). | |
| """ | |
| def __init__( | |
| self, | |
| log_dir: str = ".", | |
| max_bytes: int = 10 * 1024 * 1024, | |
| backup_count: int = 5, | |
| ) -> None: | |
| os.makedirs(log_dir, exist_ok=True) | |
| log_path = os.path.join(log_dir, "ai_firewall_security.jsonl") | |
| handler = RotatingFileHandler( | |
| log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8" | |
| ) | |
| handler.setFormatter(logging.Formatter("%(message)s")) # raw JSON lines | |
| self._file_logger = logging.getLogger("ai_firewall.events") | |
| self._file_logger.setLevel(logging.DEBUG) | |
| # Avoid duplicate handlers if logger already set up | |
| if not self._file_logger.handlers: | |
| self._file_logger.addHandler(handler) | |
| self._file_logger.propagate = False # don't double-log to root | |
| _pylogger.info("Security event log → %s", log_path) | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _hash_prompt(prompt: str) -> str: | |
| return hashlib.sha256(prompt.encode()).hexdigest()[:16] | |
| def _now() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _write(self, event: dict) -> None: | |
| self._file_logger.info(json.dumps(event, ensure_ascii=False)) | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def log_request( | |
| self, | |
| prompt: str, | |
| sanitized: str, | |
| decision: "FirewallDecision", | |
| ) -> None: | |
| """Log the input-check decision.""" | |
| rr = decision.risk_report | |
| status = rr.status.value | |
| event_type = ( | |
| "request_blocked" if status == "blocked" | |
| else "request_flagged" if status == "flagged" | |
| else "request_safe" | |
| ) | |
| event = { | |
| "timestamp": self._now(), | |
| "event_type": event_type, | |
| "risk_score": rr.risk_score, | |
| "risk_level": rr.risk_level.value, | |
| "attack_type": rr.attack_type, | |
| "attack_category": rr.attack_category, | |
| "flags": rr.flags, | |
| "prompt_hash": self._hash_prompt(prompt), | |
| "sanitized_preview": sanitized[:120], | |
| "injection_score": rr.injection_score, | |
| "adversarial_score": rr.adversarial_score, | |
| "latency_ms": rr.latency_ms, | |
| } | |
| self._write(event) | |
| if status in ("blocked", "flagged"): | |
| _pylogger.warning("[%s] %s | score=%.3f", event_type.upper(), rr.attack_type or "unknown", rr.risk_score) | |
| def log_response( | |
| self, | |
| output: str, | |
| safe_output: str, | |
| guardrail_result: "GuardrailResult", | |
| ) -> None: | |
| """Log the output guardrail decision.""" | |
| event_type = "output_safe" if guardrail_result.is_safe else "output_blocked" | |
| event = { | |
| "timestamp": self._now(), | |
| "event_type": event_type, | |
| "risk_score": guardrail_result.risk_score, | |
| "flags": guardrail_result.flags, | |
| "output_hash": self._hash_prompt(output), | |
| "redacted": not guardrail_result.is_safe, | |
| "latency_ms": guardrail_result.latency_ms, | |
| } | |
| self._write(event) | |
| if not guardrail_result.is_safe: | |
| _pylogger.warning("[OUTPUT_BLOCKED] flags=%s score=%.3f", guardrail_result.flags, guardrail_result.risk_score) | |
| def log_raw_event(self, event_type: str, data: dict) -> None: | |
| """Log an arbitrary structured event.""" | |
| event = {"timestamp": self._now(), "event_type": event_type, **data} | |
| self._write(event) | |