guardrails-final / guardrails /pii_output_guard.py
zazaman's picture
Add multilingual translation support with Qwen3-0.6B-GGUF and optimize for Hugging Face Spaces deployment
a2e1879
raw
history blame
5.02 kB
# guardrails/pii_output_guard.py
from typing import Generator, Dict, Any, Tuple
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
class PiiOutputGuard:
"""
A specialized PII guard focused specifically on output processing.
This version includes enhanced features for output testing and monitoring.
"""
def __init__(self, config: Dict[str, Any]):
"""Initializes the PiiOutputGuard with a given configuration."""
self.config = config
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
print("βœ… PII Output Guard initialized.")
def process_input(self, prompt: str) -> Tuple[str, bool]:
"""
This guard is output-focused, so input processing is minimal.
"""
if not self.config.get("on_input", False):
return prompt, True
# Simple input processing if enabled
analyzer_results = self.analyzer.analyze(
text=prompt,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if analyzer_results:
pii_types = {res.entity_type for res in analyzer_results}
print(f" ⚠️ Input contains PII: {', '.join(pii_types)}")
return prompt, True # Don't block input in output-focused guard
def process_output_stream(
self, text_stream: Generator[str, None, None]
) -> Generator[str, None, None]:
"""Enhanced PII detection and handling for output streams."""
if not self.config.get("on_output", True):
yield from text_stream
return
accumulated_text = ""
pii_found = False
for chunk in text_stream:
accumulated_text += chunk
# Analyze the accumulated text for PII
analyzer_results = self.analyzer.analyze(
text=accumulated_text,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if analyzer_results and not pii_found:
pii_found = True
pii_types = {res.entity_type for res in analyzer_results}
print(f"\n πŸ” PII detected in output: {', '.join(pii_types)}")
# Apply anonymization to the accumulated text
if analyzer_results:
action = self.config.get("output_action", "anonymize")
if action == "block":
pii_types = {res.entity_type for res in analyzer_results}
yield f"\n\nπŸ”’ [OUTPUT BLOCKED: PII detected - {', '.join(pii_types)}]"
return
elif action == "anonymize":
anonymized_result = self.anonymizer.anonymize(
text=accumulated_text,
analyzer_results=analyzer_results,
)
# Calculate the new chunk based on the difference
anonymized_text = anonymized_result.text
if len(anonymized_text) >= len(accumulated_text) - len(chunk):
new_chunk = anonymized_text[len(accumulated_text) - len(chunk):]
yield new_chunk
accumulated_text = anonymized_text
else:
yield chunk
else:
yield chunk
def process_complete_output(self, text: str) -> Tuple[str, bool]:
"""
Process a complete output text for PII (non-streaming).
This is specifically designed for output testing.
"""
analyzer_results = self.analyzer.analyze(
text=text,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if not analyzer_results:
return text, True # No PII found
pii_types = {res.entity_type for res in analyzer_results}
print(f"πŸ” PII Analysis Results:")
for result in analyzer_results:
print(f" - {result.entity_type}: '{text[result.start:result.end]}' (confidence: {result.score:.2f})")
action = self.config.get("output_action", "anonymize")
if action == "block":
return f"Output blocked: PII detected ({', '.join(pii_types)}).", False
elif action == "anonymize":
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
)
print(f"βœ… PII anonymized in output")
return anonymized_result.text, True
# Default to anonymization for unknown actions
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
)
return anonymized_result.text, True