SheildSense_API_SDK / ai_firewall /security_logger.py
cloud450's picture
Upload 48 files
4afcb3a verified
"""
security_logger.py
==================
Structured security event logger.
All attack attempts, flagged inputs, and guardrail violations are
written as JSON-Lines (one JSON object per line) to a rotating log file.
Logs are also emitted to the Python logging framework so they appear in
stdout / application log aggregators.
Log schema per event:
{
"timestamp": "<ISO-8601>",
"event_type": "request_blocked|request_flagged|request_safe|output_blocked",
"risk_score": 0.91,
"risk_level": "critical",
"attack_type": "prompt_injection",
"attack_category": "system_override",
"flags": [...],
"prompt_hash": "<sha256[:16]>", # never log raw PII
"sanitized_preview": "first 120 chars of sanitized prompt",
}
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ai_firewall.guardrails import FirewallDecision
from ai_firewall.output_guardrail import GuardrailResult
_pylogger = logging.getLogger("ai_firewall.security_logger")
class SecurityLogger:
"""
Writes structured JSON-Lines security events to a rotating log file
and forwards a summary to the Python logging system.
Parameters
----------
log_dir : str
Directory where `ai_firewall_security.jsonl` will be written.
max_bytes : int
Max log-file size before rotation (default 10 MB).
backup_count : int
Number of rotated backup files to keep (default 5).
"""
def __init__(
self,
log_dir: str = ".",
max_bytes: int = 10 * 1024 * 1024,
backup_count: int = 5,
) -> None:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "ai_firewall_security.jsonl")
handler = RotatingFileHandler(
log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
)
handler.setFormatter(logging.Formatter("%(message)s")) # raw JSON lines
self._file_logger = logging.getLogger("ai_firewall.events")
self._file_logger.setLevel(logging.DEBUG)
# Avoid duplicate handlers if logger already set up
if not self._file_logger.handlers:
self._file_logger.addHandler(handler)
self._file_logger.propagate = False # don't double-log to root
_pylogger.info("Security event log → %s", log_path)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _hash_prompt(prompt: str) -> str:
return hashlib.sha256(prompt.encode()).hexdigest()[:16]
@staticmethod
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _write(self, event: dict) -> None:
self._file_logger.info(json.dumps(event, ensure_ascii=False))
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def log_request(
self,
prompt: str,
sanitized: str,
decision: "FirewallDecision",
) -> None:
"""Log the input-check decision."""
rr = decision.risk_report
status = rr.status.value
event_type = (
"request_blocked" if status == "blocked"
else "request_flagged" if status == "flagged"
else "request_safe"
)
event = {
"timestamp": self._now(),
"event_type": event_type,
"risk_score": rr.risk_score,
"risk_level": rr.risk_level.value,
"attack_type": rr.attack_type,
"attack_category": rr.attack_category,
"flags": rr.flags,
"prompt_hash": self._hash_prompt(prompt),
"sanitized_preview": sanitized[:120],
"injection_score": rr.injection_score,
"adversarial_score": rr.adversarial_score,
"latency_ms": rr.latency_ms,
}
self._write(event)
if status in ("blocked", "flagged"):
_pylogger.warning("[%s] %s | score=%.3f", event_type.upper(), rr.attack_type or "unknown", rr.risk_score)
def log_response(
self,
output: str,
safe_output: str,
guardrail_result: "GuardrailResult",
) -> None:
"""Log the output guardrail decision."""
event_type = "output_safe" if guardrail_result.is_safe else "output_blocked"
event = {
"timestamp": self._now(),
"event_type": event_type,
"risk_score": guardrail_result.risk_score,
"flags": guardrail_result.flags,
"output_hash": self._hash_prompt(output),
"redacted": not guardrail_result.is_safe,
"latency_ms": guardrail_result.latency_ms,
}
self._write(event)
if not guardrail_result.is_safe:
_pylogger.warning("[OUTPUT_BLOCKED] flags=%s score=%.3f", guardrail_result.flags, guardrail_result.risk_score)
def log_raw_event(self, event_type: str, data: dict) -> None:
"""Log an arbitrary structured event."""
event = {"timestamp": self._now(), "event_type": event_type, **data}
self._write(event)