File size: 5,833 Bytes
f4ef3b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""PII redaction for secondary stores (audit log, query cache, conversation history).



Two strategies:

- Regex-based (always available) β€” covers email, phone, SSN, credit card,

  IBAN, IPv4, URL with credentials.

- Microsoft Presidio (optional dependency) β€” invoked when installed; higher

  recall and language-aware NER for names, locations, organizations.



This module never sees plaintext from the LLM β€” it operates only on text that

is about to be persisted to disk. Live prompts and retrieved contexts remain

unmodified so model quality is not affected.

"""

from __future__ import annotations

import re
from typing import Any

from config.settings import settings
from utils.logging import get_logger

logger = get_logger(__name__)

# Order matters β€” most specific patterns first so they win against the
# broader phone regex. Provider-specific API-key shapes (added 2026-05-26
# for BYOK mode) live ABOVE the generic ``[API_KEY]`` rule because their
# prefixes are not catchable by the legacy ``(sk|pk|api|key)`` alternation.
_REGEX_PATTERNS: list[tuple[re.Pattern[str], str]] = [
    (re.compile(r"https?://[^\s/]+:[^\s/]+@[^\s]+"), "[URL_WITH_CREDS]"),
    (re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), "[EMAIL]"),
    (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
    (re.compile(r"\b(?:\d[ -]*?){13,19}\b"), "[CC]"),  # Luhn-validated below
    (re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{10,30}\b"), "[IBAN]"),
    (re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"), "[IP]"),
    # ── BYOK key shapes (P6 production launch) ──────────────────────────
    # Anthropic must come BEFORE OpenAI because ``sk-ant-...`` also matches
    # the generic ``sk-...`` rule below.
    (re.compile(r"\bsk-ant-[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
    (re.compile(r"\bsk-(?:proj|svcacct)-[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
    (re.compile(r"\bgsk_[A-Za-z0-9]{40,}\b"), "[API_KEY]"),
    (re.compile(r"\bhf_[A-Za-z0-9]{30,}\b"), "[API_KEY]"),
    (re.compile(r"\bvcp_[A-Za-z0-9]{20,}\b"), "[API_KEY]"),
    # JWT-format database API keys (Qdrant Cloud auth v2). Three dot-separated
    # base64url segments β€” the middle one is always ``eyJ...`` start.
    (re.compile(r"\beyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\b"), "[API_KEY]"),
    # Qdrant Cloud management keys: ``<uuid>|<token>``.
    (
        re.compile(
            r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\|[A-Za-z0-9_-]{20,}\b"
        ),
        "[API_KEY]",
    ),
    # Legacy generic β€” keeps catching ``sk-...`` and ``api_...`` shapes from
    # older docs and tests.
    (re.compile(r"\b(?:sk|pk|api|key)[-_][A-Za-z0-9_-]{16,}\b", re.IGNORECASE), "[API_KEY]"),
    (re.compile(r"\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}\b"), "[PHONE]"),
]

# Try Presidio for richer detection (names, locations, etc.)
try:
    from presidio_analyzer import AnalyzerEngine  # type: ignore[import-not-found]
    from presidio_anonymizer import AnonymizerEngine  # type: ignore[import-not-found]

    _PRESIDIO_AVAILABLE = True
    _analyzer: Any = AnalyzerEngine()
    _anonymizer: Any = AnonymizerEngine()
except Exception:
    _PRESIDIO_AVAILABLE = False
    _analyzer = None
    _anonymizer = None


def _luhn_valid(num: str) -> bool:
    """Luhn checksum to filter false-positive credit-card matches."""
    digits = [int(c) for c in num if c.isdigit()]
    if not (13 <= len(digits) <= 19):
        return False
    s = 0
    for i, d in enumerate(reversed(digits)):
        if i % 2 == 1:
            d *= 2
            if d > 9:
                d -= 9
        s += d
    return s % 10 == 0


def redact(text: str) -> str:
    """Return ``text`` with PII tokens masked.



    Args:

        text: Arbitrary string that may contain PII.



    Returns:

        Redacted copy of the text. If redaction is disabled via settings

        the original string is returned unchanged.

    """
    if not settings.pii_redaction_enabled or not text:
        return text

    out = text
    for pattern, token in _REGEX_PATTERNS:
        if token == "[CC]":
            # Apply Luhn to avoid over-masking phone numbers / arbitrary digits.
            out = pattern.sub(lambda m: "[CC]" if _luhn_valid(m.group(0)) else m.group(0), out)
        else:
            out = pattern.sub(token, out)

    if _PRESIDIO_AVAILABLE and _analyzer is not None and _anonymizer is not None:
        try:
            results = _analyzer.analyze(text=out, language="en")
            if results:
                out = _anonymizer.anonymize(text=out, analyzer_results=results).text
        except Exception as exc:
            logger.debug("presidio_redact_failed", error=str(exc))

    return out


def redact_dict(data: dict[str, Any], fields: tuple[str, ...] | None = None) -> dict[str, Any]:
    """Recursively redact string values in a dict.



    Args:

        data: Dict (possibly nested) to redact.

        fields: If given, only redact these top-level keys. Otherwise redact

            every string in the structure.



    Returns:

        Deep-redacted copy.

    """
    if not settings.pii_redaction_enabled:
        return data

    def _walk(value: Any, *, force: bool) -> Any:
        if isinstance(value, str):
            return redact(value) if force else value
        if isinstance(value, dict):
            return {
                k: _walk(v, force=force or (fields is not None and k in fields))
                for k, v in value.items()
            }
        if isinstance(value, list):
            return [_walk(v, force=force) for v in value]
        return value

    return _walk(data, force=fields is None)