File size: 5,426 Bytes
7c918e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
security_logger.py
==================
Structured security event logger.

All attack attempts, flagged inputs, and guardrail violations are
written as JSON-Lines (one JSON object per line) to a rotating log file.
Logs are also emitted to the Python logging framework so they appear in
stdout / application log aggregators.

Log schema per event:
  {
    "timestamp": "<ISO-8601>",
    "event_type": "request_blocked|request_flagged|request_safe|output_blocked",
    "risk_score": 0.91,
    "risk_level": "critical",
    "attack_type": "prompt_injection",
    "attack_category": "system_override",
    "flags": [...],
    "prompt_hash": "<sha256[:16]>",   # never log raw PII
    "sanitized_preview": "first 120 chars of sanitized prompt",
  }
"""

from __future__ import annotations

import hashlib
import json
import logging
import os
import time
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from ai_firewall.guardrails import FirewallDecision
    from ai_firewall.output_guardrail import GuardrailResult

_pylogger = logging.getLogger("ai_firewall.security_logger")


class SecurityLogger:
    """
    Writes structured JSON-Lines security events to a rotating log file
    and forwards a summary to the Python logging system.

    Parameters
    ----------
    log_dir : str
        Directory where `ai_firewall_security.jsonl` will be written.
    max_bytes : int
        Max log-file size before rotation (default 10 MB).
    backup_count : int
        Number of rotated backup files to keep (default 5).
    """

    def __init__(
        self,
        log_dir: str = ".",
        max_bytes: int = 10 * 1024 * 1024,
        backup_count: int = 5,
    ) -> None:
        os.makedirs(log_dir, exist_ok=True)
        log_path = os.path.join(log_dir, "ai_firewall_security.jsonl")

        handler = RotatingFileHandler(
            log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
        )
        handler.setFormatter(logging.Formatter("%(message)s"))  # raw JSON lines

        self._file_logger = logging.getLogger("ai_firewall.events")
        self._file_logger.setLevel(logging.DEBUG)
        # Avoid duplicate handlers if logger already set up
        if not self._file_logger.handlers:
            self._file_logger.addHandler(handler)
        self._file_logger.propagate = False  # don't double-log to root

        _pylogger.info("Security event log → %s", log_path)

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _hash_prompt(prompt: str) -> str:
        return hashlib.sha256(prompt.encode()).hexdigest()[:16]

    @staticmethod
    def _now() -> str:
        return datetime.now(timezone.utc).isoformat()

    def _write(self, event: dict) -> None:
        self._file_logger.info(json.dumps(event, ensure_ascii=False))

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def log_request(
        self,
        prompt: str,
        sanitized: str,
        decision: "FirewallDecision",
    ) -> None:
        """Log the input-check decision."""
        rr = decision.risk_report
        status = rr.status.value
        event_type = (
            "request_blocked" if status == "blocked"
            else "request_flagged" if status == "flagged"
            else "request_safe"
        )

        event = {
            "timestamp": self._now(),
            "event_type": event_type,
            "risk_score": rr.risk_score,
            "risk_level": rr.risk_level.value,
            "attack_type": rr.attack_type,
            "attack_category": rr.attack_category,
            "flags": rr.flags,
            "prompt_hash": self._hash_prompt(prompt),
            "sanitized_preview": sanitized[:120],
            "injection_score": rr.injection_score,
            "adversarial_score": rr.adversarial_score,
            "latency_ms": rr.latency_ms,
        }
        self._write(event)

        if status in ("blocked", "flagged"):
            _pylogger.warning("[%s] %s | score=%.3f", event_type.upper(), rr.attack_type or "unknown", rr.risk_score)

    def log_response(
        self,
        output: str,
        safe_output: str,
        guardrail_result: "GuardrailResult",
    ) -> None:
        """Log the output guardrail decision."""
        event_type = "output_safe" if guardrail_result.is_safe else "output_blocked"
        event = {
            "timestamp": self._now(),
            "event_type": event_type,
            "risk_score": guardrail_result.risk_score,
            "flags": guardrail_result.flags,
            "output_hash": self._hash_prompt(output),
            "redacted": not guardrail_result.is_safe,
            "latency_ms": guardrail_result.latency_ms,
        }
        self._write(event)

        if not guardrail_result.is_safe:
            _pylogger.warning("[OUTPUT_BLOCKED] flags=%s score=%.3f", guardrail_result.flags, guardrail_result.risk_score)

    def log_raw_event(self, event_type: str, data: dict) -> None:
        """Log an arbitrary structured event."""
        event = {"timestamp": self._now(), "event_type": event_type, **data}
        self._write(event)