LeomordKaly's picture
deploy: phase 3 BYOK backend (Dockerfile.hf, FastAPI on 7860)
f4ef3b8 verified
"""PII redaction for secondary stores (audit log, query cache, conversation history).
Two strategies:
- Regex-based (always available) β€” covers email, phone, SSN, credit card,
IBAN, IPv4, URL with credentials.
- Microsoft Presidio (optional dependency) β€” invoked when installed; higher
recall and language-aware NER for names, locations, organizations.
This module never sees plaintext from the LLM β€” it operates only on text that
is about to be persisted to disk. Live prompts and retrieved contexts remain
unmodified so model quality is not affected.
"""
from __future__ import annotations
import re
from typing import Any
from config.settings import settings
from utils.logging import get_logger
logger = get_logger(__name__)
# Order matters β€” most specific patterns first so they win against the
# broader phone regex. Provider-specific API-key shapes (added 2026-05-26
# for BYOK mode) live ABOVE the generic ``[API_KEY]`` rule because their
# prefixes are not catchable by the legacy ``(sk|pk|api|key)`` alternation.
_REGEX_PATTERNS: list[tuple[re.Pattern[str], str]] = [
(re.compile(r"https?://[^\s/]+:[^\s/]+@[^\s]+"), "[URL_WITH_CREDS]"),
(re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), "[EMAIL]"),
(re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
(re.compile(r"\b(?:\d[ -]*?){13,19}\b"), "[CC]"), # Luhn-validated below
(re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{10,30}\b"), "[IBAN]"),
(re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"), "[IP]"),
# ── BYOK key shapes (P6 production launch) ──────────────────────────
# Anthropic must come BEFORE OpenAI because ``sk-ant-...`` also matches
# the generic ``sk-...`` rule below.
(re.compile(r"\bsk-ant-[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
(re.compile(r"\bsk-(?:proj|svcacct)-[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
(re.compile(r"\bgsk_[A-Za-z0-9]{40,}\b"), "[API_KEY]"),
(re.compile(r"\bhf_[A-Za-z0-9]{30,}\b"), "[API_KEY]"),
(re.compile(r"\bvcp_[A-Za-z0-9]{20,}\b"), "[API_KEY]"),
# JWT-format database API keys (Qdrant Cloud auth v2). Three dot-separated
# base64url segments β€” the middle one is always ``eyJ...`` start.
(re.compile(r"\beyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\b"), "[API_KEY]"),
# Qdrant Cloud management keys: ``<uuid>|<token>``.
(
re.compile(
r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\|[A-Za-z0-9_-]{20,}\b"
),
"[API_KEY]",
),
# Legacy generic β€” keeps catching ``sk-...`` and ``api_...`` shapes from
# older docs and tests.
(re.compile(r"\b(?:sk|pk|api|key)[-_][A-Za-z0-9_-]{16,}\b", re.IGNORECASE), "[API_KEY]"),
(re.compile(r"\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}\b"), "[PHONE]"),
]
# Try Presidio for richer detection (names, locations, etc.)
try:
from presidio_analyzer import AnalyzerEngine # type: ignore[import-not-found]
from presidio_anonymizer import AnonymizerEngine # type: ignore[import-not-found]
_PRESIDIO_AVAILABLE = True
_analyzer: Any = AnalyzerEngine()
_anonymizer: Any = AnonymizerEngine()
except Exception:
_PRESIDIO_AVAILABLE = False
_analyzer = None
_anonymizer = None
def _luhn_valid(num: str) -> bool:
"""Luhn checksum to filter false-positive credit-card matches."""
digits = [int(c) for c in num if c.isdigit()]
if not (13 <= len(digits) <= 19):
return False
s = 0
for i, d in enumerate(reversed(digits)):
if i % 2 == 1:
d *= 2
if d > 9:
d -= 9
s += d
return s % 10 == 0
def redact(text: str) -> str:
"""Return ``text`` with PII tokens masked.
Args:
text: Arbitrary string that may contain PII.
Returns:
Redacted copy of the text. If redaction is disabled via settings
the original string is returned unchanged.
"""
if not settings.pii_redaction_enabled or not text:
return text
out = text
for pattern, token in _REGEX_PATTERNS:
if token == "[CC]":
# Apply Luhn to avoid over-masking phone numbers / arbitrary digits.
out = pattern.sub(lambda m: "[CC]" if _luhn_valid(m.group(0)) else m.group(0), out)
else:
out = pattern.sub(token, out)
if _PRESIDIO_AVAILABLE and _analyzer is not None and _anonymizer is not None:
try:
results = _analyzer.analyze(text=out, language="en")
if results:
out = _anonymizer.anonymize(text=out, analyzer_results=results).text
except Exception as exc:
logger.debug("presidio_redact_failed", error=str(exc))
return out
def redact_dict(data: dict[str, Any], fields: tuple[str, ...] | None = None) -> dict[str, Any]:
"""Recursively redact string values in a dict.
Args:
data: Dict (possibly nested) to redact.
fields: If given, only redact these top-level keys. Otherwise redact
every string in the structure.
Returns:
Deep-redacted copy.
"""
if not settings.pii_redaction_enabled:
return data
def _walk(value: Any, *, force: bool) -> Any:
if isinstance(value, str):
return redact(value) if force else value
if isinstance(value, dict):
return {
k: _walk(v, force=force or (fields is not None and k in fields))
for k, v in value.items()
}
if isinstance(value, list):
return [_walk(v, force=force) for v in value]
return value
return _walk(data, force=fields is None)