Spaces:
Sleeping
Sleeping
| """ | |
| Clase 16 — Guardrails de seguridad para el DocOps Agent. | |
| Tres capas complementarias: | |
| 1. InputGuardrail — bloquea prompt injection y mensajes abusivos. | |
| 2. OutputGuardrail — redacta PII (emails, teléfonos MX, RFC, CURP, tarjetas). | |
| 3. ToolGuardrail — clasifica tools por riesgo y gestiona rate limits. | |
| Todo en regex propio (sin Presidio): para una clase en vivo los estudiantes | |
| necesitan ver los patrones directamente, no una caja negra. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Literal | |
| # ───────────────────────────────────────────────────────────────── | |
| # Resultado común | |
| # ───────────────────────────────────────────────────────────────── | |
| class GuardrailResult: | |
| blocked: bool | |
| reason: str | None = None | |
| scrubbed_text: str | None = None | |
| # ───────────────────────────────────────────────────────────────── | |
| # 1. Input guardrail | |
| # ───────────────────────────────────────────────────────────────── | |
| class InputGuardrail: | |
| """Valida mensajes de entrada del usuario antes de invocar al agente.""" | |
| MAX_LENGTH = 4000 | |
| INJECTION_PATTERNS = [ | |
| r"ignore\s+(the\s+)?previous\s+instructions?", | |
| r"ignore\s+(all\s+)?prior\s+instructions?", | |
| r"disregard\s+(all|the)\s+(above|prior|previous)", | |
| r"you\s+are\s+now\s+", | |
| r"system\s+prompt", | |
| r"reveal\s+your\s+instructions?", | |
| r"forget\s+everything", | |
| r"override\s+your\s+rules", | |
| ] | |
| def __init__(self, max_length: int = MAX_LENGTH): | |
| self.max_length = max_length | |
| self._compiled = [ | |
| re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS | |
| ] | |
| def check(self, user_message: str) -> GuardrailResult: | |
| if not isinstance(user_message, str) or not user_message.strip(): | |
| return GuardrailResult( | |
| blocked=True, reason="Mensaje vacío o inválido." | |
| ) | |
| if len(user_message) > self.max_length: | |
| return GuardrailResult( | |
| blocked=True, | |
| reason=( | |
| f"Mensaje demasiado largo " | |
| f"({len(user_message)} > {self.max_length} caracteres)." | |
| ), | |
| ) | |
| for pattern in self._compiled: | |
| if pattern.search(user_message): | |
| return GuardrailResult( | |
| blocked=True, | |
| reason=( | |
| "Posible prompt injection detectado " | |
| f"(patrón: '{pattern.pattern}')." | |
| ), | |
| ) | |
| return GuardrailResult(blocked=False) | |
| # ───────────────────────────────────────────────────────────────── | |
| # 2. Output guardrail (PII scrubbing) | |
| # ───────────────────────────────────────────────────────────────── | |
| class OutputGuardrail: | |
| """Redacta PII de las respuestas antes de mostrarlas al usuario.""" | |
| # Orden IMPORTA — tarjetas primero, luego teléfonos (ambos son dígitos). | |
| PATTERNS: list[tuple[str, str, re.Pattern]] = [ | |
| ( | |
| "EMAIL", | |
| "[EMAIL]", | |
| re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), | |
| ), | |
| ( | |
| "CARD", | |
| "[CARD]", | |
| # 13-19 dígitos con separadores opcionales (espacios o guiones). | |
| re.compile(r"\b(?:\d[ -]?){12,18}\d\b"), | |
| ), | |
| ( | |
| "CURP", | |
| "[CURP]", | |
| re.compile( | |
| r"\b[A-Z][AEIOUX][A-Z]{2}\d{2}" | |
| r"(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])" | |
| r"[HM][A-Z]{2}[B-DF-HJ-NP-TV-Z]{3}[0-9A-Z]\d\b", | |
| re.IGNORECASE, | |
| ), | |
| ), | |
| ( | |
| "RFC", | |
| "[RFC]", | |
| re.compile( | |
| r"\b[A-ZÑ&]{3,4}\d{6}(?:[A-Z0-9]{3})?\b", | |
| re.IGNORECASE, | |
| ), | |
| ), | |
| ( | |
| "PHONE", | |
| "[PHONE]", | |
| # Teléfonos MX: opcional +52, 10 dígitos con separadores opcionales. | |
| re.compile( | |
| r"(?:\+?52[\s-]?)?(?:\d[\s-]?){9}\d" | |
| ), | |
| ), | |
| ] | |
| def scrub(self, response: str) -> GuardrailResult: | |
| if not isinstance(response, str): | |
| return GuardrailResult( | |
| blocked=False, scrubbed_text=str(response) | |
| ) | |
| scrubbed = response | |
| detected: list[str] = [] | |
| for name, replacement, pattern in self.PATTERNS: | |
| new_text, count = pattern.subn(replacement, scrubbed) | |
| if count > 0: | |
| detected.append(f"{name}×{count}") | |
| scrubbed = new_text | |
| reason = ", ".join(detected) if detected else None | |
| return GuardrailResult( | |
| blocked=False, | |
| reason=reason, | |
| scrubbed_text=scrubbed, | |
| ) | |
| # ───────────────────────────────────────────────────────────────── | |
| # 3. Tool guardrail (clasificación por riesgo) | |
| # ───────────────────────────────────────────────────────────────── | |
| RiskLevel = Literal["read", "write_reversible", "write_destructive"] | |
| TOOL_RISK_LEVELS: dict[str, RiskLevel] = { | |
| # Lectura: sin side effects. | |
| "search_docs": "read", | |
| "retrieve": "read", | |
| "vector_search": "read", | |
| "list_documents": "read", | |
| # Escritura reversible: puedes deshacer con otra llamada. | |
| "create_ticket": "write_reversible", | |
| "send_email_draft": "write_reversible", | |
| "tag_document": "write_reversible", | |
| "update_status": "write_reversible", | |
| # Destructivo: requiere aprobación humana. | |
| "delete_document": "write_destructive", | |
| "execute_sql": "write_destructive", | |
| "shell_exec": "write_destructive", | |
| "deploy_production": "write_destructive", | |
| } | |
| RATE_LIMITS: dict[RiskLevel, int] = { | |
| "read": 60, | |
| "write_reversible": 20, | |
| "write_destructive": 2, | |
| } | |
| class ToolGuardrail: | |
| """Decide qué tools necesitan HITL y cuántas calls/min permite cada una.""" | |
| UNKNOWN_RISK: RiskLevel = "write_destructive" | |
| def __init__( | |
| self, | |
| risk_levels: dict[str, RiskLevel] | None = None, | |
| rate_limits: dict[RiskLevel, int] | None = None, | |
| ): | |
| self.risk_levels = risk_levels or TOOL_RISK_LEVELS | |
| self.rate_limits = rate_limits or RATE_LIMITS | |
| def risk_of(self, tool_name: str) -> RiskLevel: | |
| return self.risk_levels.get(tool_name, self.UNKNOWN_RISK) | |
| def require_approval(self, tool_name: str) -> bool: | |
| return self.risk_of(tool_name) == "write_destructive" | |
| def rate_limit_for(self, tool_name: str) -> int: | |
| return self.rate_limits[self.risk_of(tool_name)] | |