DocAgentSystem / ops /guardrails.py
RamsesCamas's picture
Initial clean commit for HF Space deployment
d0d2f42
"""
Clase 16 — Guardrails de seguridad para el DocOps Agent.
Tres capas complementarias:
1. InputGuardrail — bloquea prompt injection y mensajes abusivos.
2. OutputGuardrail — redacta PII (emails, teléfonos MX, RFC, CURP, tarjetas).
3. ToolGuardrail — clasifica tools por riesgo y gestiona rate limits.
Todo en regex propio (sin Presidio): para una clase en vivo los estudiantes
necesitan ver los patrones directamente, no una caja negra.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Literal
# ─────────────────────────────────────────────────────────────────
# Resultado común
# ─────────────────────────────────────────────────────────────────
@dataclass
class GuardrailResult:
blocked: bool
reason: str | None = None
scrubbed_text: str | None = None
# ─────────────────────────────────────────────────────────────────
# 1. Input guardrail
# ─────────────────────────────────────────────────────────────────
class InputGuardrail:
"""Valida mensajes de entrada del usuario antes de invocar al agente."""
MAX_LENGTH = 4000
INJECTION_PATTERNS = [
r"ignore\s+(the\s+)?previous\s+instructions?",
r"ignore\s+(all\s+)?prior\s+instructions?",
r"disregard\s+(all|the)\s+(above|prior|previous)",
r"you\s+are\s+now\s+",
r"system\s+prompt",
r"reveal\s+your\s+instructions?",
r"forget\s+everything",
r"override\s+your\s+rules",
]
def __init__(self, max_length: int = MAX_LENGTH):
self.max_length = max_length
self._compiled = [
re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
]
def check(self, user_message: str) -> GuardrailResult:
if not isinstance(user_message, str) or not user_message.strip():
return GuardrailResult(
blocked=True, reason="Mensaje vacío o inválido."
)
if len(user_message) > self.max_length:
return GuardrailResult(
blocked=True,
reason=(
f"Mensaje demasiado largo "
f"({len(user_message)} > {self.max_length} caracteres)."
),
)
for pattern in self._compiled:
if pattern.search(user_message):
return GuardrailResult(
blocked=True,
reason=(
"Posible prompt injection detectado "
f"(patrón: '{pattern.pattern}')."
),
)
return GuardrailResult(blocked=False)
# ─────────────────────────────────────────────────────────────────
# 2. Output guardrail (PII scrubbing)
# ─────────────────────────────────────────────────────────────────
class OutputGuardrail:
"""Redacta PII de las respuestas antes de mostrarlas al usuario."""
# Orden IMPORTA — tarjetas primero, luego teléfonos (ambos son dígitos).
PATTERNS: list[tuple[str, str, re.Pattern]] = [
(
"EMAIL",
"[EMAIL]",
re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"),
),
(
"CARD",
"[CARD]",
# 13-19 dígitos con separadores opcionales (espacios o guiones).
re.compile(r"\b(?:\d[ -]?){12,18}\d\b"),
),
(
"CURP",
"[CURP]",
re.compile(
r"\b[A-Z][AEIOUX][A-Z]{2}\d{2}"
r"(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])"
r"[HM][A-Z]{2}[B-DF-HJ-NP-TV-Z]{3}[0-9A-Z]\d\b",
re.IGNORECASE,
),
),
(
"RFC",
"[RFC]",
re.compile(
r"\b[A-ZÑ&]{3,4}\d{6}(?:[A-Z0-9]{3})?\b",
re.IGNORECASE,
),
),
(
"PHONE",
"[PHONE]",
# Teléfonos MX: opcional +52, 10 dígitos con separadores opcionales.
re.compile(
r"(?:\+?52[\s-]?)?(?:\d[\s-]?){9}\d"
),
),
]
def scrub(self, response: str) -> GuardrailResult:
if not isinstance(response, str):
return GuardrailResult(
blocked=False, scrubbed_text=str(response)
)
scrubbed = response
detected: list[str] = []
for name, replacement, pattern in self.PATTERNS:
new_text, count = pattern.subn(replacement, scrubbed)
if count > 0:
detected.append(f"{name}×{count}")
scrubbed = new_text
reason = ", ".join(detected) if detected else None
return GuardrailResult(
blocked=False,
reason=reason,
scrubbed_text=scrubbed,
)
# ─────────────────────────────────────────────────────────────────
# 3. Tool guardrail (clasificación por riesgo)
# ─────────────────────────────────────────────────────────────────
RiskLevel = Literal["read", "write_reversible", "write_destructive"]
TOOL_RISK_LEVELS: dict[str, RiskLevel] = {
# Lectura: sin side effects.
"search_docs": "read",
"retrieve": "read",
"vector_search": "read",
"list_documents": "read",
# Escritura reversible: puedes deshacer con otra llamada.
"create_ticket": "write_reversible",
"send_email_draft": "write_reversible",
"tag_document": "write_reversible",
"update_status": "write_reversible",
# Destructivo: requiere aprobación humana.
"delete_document": "write_destructive",
"execute_sql": "write_destructive",
"shell_exec": "write_destructive",
"deploy_production": "write_destructive",
}
RATE_LIMITS: dict[RiskLevel, int] = {
"read": 60,
"write_reversible": 20,
"write_destructive": 2,
}
class ToolGuardrail:
"""Decide qué tools necesitan HITL y cuántas calls/min permite cada una."""
UNKNOWN_RISK: RiskLevel = "write_destructive"
def __init__(
self,
risk_levels: dict[str, RiskLevel] | None = None,
rate_limits: dict[RiskLevel, int] | None = None,
):
self.risk_levels = risk_levels or TOOL_RISK_LEVELS
self.rate_limits = rate_limits or RATE_LIMITS
def risk_of(self, tool_name: str) -> RiskLevel:
return self.risk_levels.get(tool_name, self.UNKNOWN_RISK)
def require_approval(self, tool_name: str) -> bool:
return self.risk_of(tool_name) == "write_destructive"
def rate_limit_for(self, tool_name: str) -> int:
return self.rate_limits[self.risk_of(tool_name)]