Spaces:
Sleeping
Sleeping
File size: 5,022 Bytes
a2e1879 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# guardrails/pii_output_guard.py
from typing import Generator, Dict, Any, Tuple
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
class PiiOutputGuard:
"""
A specialized PII guard focused specifically on output processing.
This version includes enhanced features for output testing and monitoring.
"""
def __init__(self, config: Dict[str, Any]):
"""Initializes the PiiOutputGuard with a given configuration."""
self.config = config
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
print("β
PII Output Guard initialized.")
def process_input(self, prompt: str) -> Tuple[str, bool]:
"""
This guard is output-focused, so input processing is minimal.
"""
if not self.config.get("on_input", False):
return prompt, True
# Simple input processing if enabled
analyzer_results = self.analyzer.analyze(
text=prompt,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if analyzer_results:
pii_types = {res.entity_type for res in analyzer_results}
print(f" β οΈ Input contains PII: {', '.join(pii_types)}")
return prompt, True # Don't block input in output-focused guard
def process_output_stream(
self, text_stream: Generator[str, None, None]
) -> Generator[str, None, None]:
"""Enhanced PII detection and handling for output streams."""
if not self.config.get("on_output", True):
yield from text_stream
return
accumulated_text = ""
pii_found = False
for chunk in text_stream:
accumulated_text += chunk
# Analyze the accumulated text for PII
analyzer_results = self.analyzer.analyze(
text=accumulated_text,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if analyzer_results and not pii_found:
pii_found = True
pii_types = {res.entity_type for res in analyzer_results}
print(f"\n π PII detected in output: {', '.join(pii_types)}")
# Apply anonymization to the accumulated text
if analyzer_results:
action = self.config.get("output_action", "anonymize")
if action == "block":
pii_types = {res.entity_type for res in analyzer_results}
yield f"\n\nπ [OUTPUT BLOCKED: PII detected - {', '.join(pii_types)}]"
return
elif action == "anonymize":
anonymized_result = self.anonymizer.anonymize(
text=accumulated_text,
analyzer_results=analyzer_results,
)
# Calculate the new chunk based on the difference
anonymized_text = anonymized_result.text
if len(anonymized_text) >= len(accumulated_text) - len(chunk):
new_chunk = anonymized_text[len(accumulated_text) - len(chunk):]
yield new_chunk
accumulated_text = anonymized_text
else:
yield chunk
else:
yield chunk
def process_complete_output(self, text: str) -> Tuple[str, bool]:
"""
Process a complete output text for PII (non-streaming).
This is specifically designed for output testing.
"""
analyzer_results = self.analyzer.analyze(
text=text,
language="en",
entities=self.config.get("anonymize_entities", []),
)
if not analyzer_results:
return text, True # No PII found
pii_types = {res.entity_type for res in analyzer_results}
print(f"π PII Analysis Results:")
for result in analyzer_results:
print(f" - {result.entity_type}: '{text[result.start:result.end]}' (confidence: {result.score:.2f})")
action = self.config.get("output_action", "anonymize")
if action == "block":
return f"Output blocked: PII detected ({', '.join(pii_types)}).", False
elif action == "anonymize":
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
)
print(f"β
PII anonymized in output")
return anonymized_result.text, True
# Default to anonymization for unknown actions
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
)
return anonymized_result.text, True |