Spaces:
No application file
No application file
File size: 5,426 Bytes
7c918e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """
security_logger.py
==================
Structured security event logger.
All attack attempts, flagged inputs, and guardrail violations are
written as JSON-Lines (one JSON object per line) to a rotating log file.
Logs are also emitted to the Python logging framework so they appear in
stdout / application log aggregators.
Log schema per event:
{
"timestamp": "<ISO-8601>",
"event_type": "request_blocked|request_flagged|request_safe|output_blocked",
"risk_score": 0.91,
"risk_level": "critical",
"attack_type": "prompt_injection",
"attack_category": "system_override",
"flags": [...],
"prompt_hash": "<sha256[:16]>", # never log raw PII
"sanitized_preview": "first 120 chars of sanitized prompt",
}
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ai_firewall.guardrails import FirewallDecision
from ai_firewall.output_guardrail import GuardrailResult
_pylogger = logging.getLogger("ai_firewall.security_logger")
class SecurityLogger:
"""
Writes structured JSON-Lines security events to a rotating log file
and forwards a summary to the Python logging system.
Parameters
----------
log_dir : str
Directory where `ai_firewall_security.jsonl` will be written.
max_bytes : int
Max log-file size before rotation (default 10 MB).
backup_count : int
Number of rotated backup files to keep (default 5).
"""
def __init__(
self,
log_dir: str = ".",
max_bytes: int = 10 * 1024 * 1024,
backup_count: int = 5,
) -> None:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "ai_firewall_security.jsonl")
handler = RotatingFileHandler(
log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
)
handler.setFormatter(logging.Formatter("%(message)s")) # raw JSON lines
self._file_logger = logging.getLogger("ai_firewall.events")
self._file_logger.setLevel(logging.DEBUG)
# Avoid duplicate handlers if logger already set up
if not self._file_logger.handlers:
self._file_logger.addHandler(handler)
self._file_logger.propagate = False # don't double-log to root
_pylogger.info("Security event log → %s", log_path)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _hash_prompt(prompt: str) -> str:
return hashlib.sha256(prompt.encode()).hexdigest()[:16]
@staticmethod
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _write(self, event: dict) -> None:
self._file_logger.info(json.dumps(event, ensure_ascii=False))
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def log_request(
self,
prompt: str,
sanitized: str,
decision: "FirewallDecision",
) -> None:
"""Log the input-check decision."""
rr = decision.risk_report
status = rr.status.value
event_type = (
"request_blocked" if status == "blocked"
else "request_flagged" if status == "flagged"
else "request_safe"
)
event = {
"timestamp": self._now(),
"event_type": event_type,
"risk_score": rr.risk_score,
"risk_level": rr.risk_level.value,
"attack_type": rr.attack_type,
"attack_category": rr.attack_category,
"flags": rr.flags,
"prompt_hash": self._hash_prompt(prompt),
"sanitized_preview": sanitized[:120],
"injection_score": rr.injection_score,
"adversarial_score": rr.adversarial_score,
"latency_ms": rr.latency_ms,
}
self._write(event)
if status in ("blocked", "flagged"):
_pylogger.warning("[%s] %s | score=%.3f", event_type.upper(), rr.attack_type or "unknown", rr.risk_score)
def log_response(
self,
output: str,
safe_output: str,
guardrail_result: "GuardrailResult",
) -> None:
"""Log the output guardrail decision."""
event_type = "output_safe" if guardrail_result.is_safe else "output_blocked"
event = {
"timestamp": self._now(),
"event_type": event_type,
"risk_score": guardrail_result.risk_score,
"flags": guardrail_result.flags,
"output_hash": self._hash_prompt(output),
"redacted": not guardrail_result.is_safe,
"latency_ms": guardrail_result.latency_ms,
}
self._write(event)
if not guardrail_result.is_safe:
_pylogger.warning("[OUTPUT_BLOCKED] flags=%s score=%.3f", guardrail_result.flags, guardrail_result.risk_score)
def log_raw_event(self, event_type: str, data: dict) -> None:
"""Log an arbitrary structured event."""
event = {"timestamp": self._now(), "event_type": event_type, **data}
self._write(event)
|