File size: 5,022 Bytes
a2e1879
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# guardrails/pii_output_guard.py
from typing import Generator, Dict, Any, Tuple

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine


class PiiOutputGuard:
    """
    A specialized PII guard focused specifically on output processing.
    This version includes enhanced features for output testing and monitoring.
    """

    def __init__(self, config: Dict[str, Any]):
        """Initializes the PiiOutputGuard with a given configuration."""
        self.config = config
        self.analyzer = AnalyzerEngine()
        self.anonymizer = AnonymizerEngine()
        print("βœ… PII Output Guard initialized.")

    def process_input(self, prompt: str) -> Tuple[str, bool]:
        """
        This guard is output-focused, so input processing is minimal.
        """
        if not self.config.get("on_input", False):
            return prompt, True
            
        # Simple input processing if enabled
        analyzer_results = self.analyzer.analyze(
            text=prompt,
            language="en",
            entities=self.config.get("anonymize_entities", []),
        )
        
        if analyzer_results:
            pii_types = {res.entity_type for res in analyzer_results}
            print(f"   ⚠️  Input contains PII: {', '.join(pii_types)}")
            
        return prompt, True  # Don't block input in output-focused guard

    def process_output_stream(
        self, text_stream: Generator[str, None, None]
    ) -> Generator[str, None, None]:
        """Enhanced PII detection and handling for output streams."""
        if not self.config.get("on_output", True):
            yield from text_stream
            return

        accumulated_text = ""
        pii_found = False
        
        for chunk in text_stream:
            accumulated_text += chunk
            
            # Analyze the accumulated text for PII
            analyzer_results = self.analyzer.analyze(
                text=accumulated_text,
                language="en",
                entities=self.config.get("anonymize_entities", []),
            )
            
            if analyzer_results and not pii_found:
                pii_found = True
                pii_types = {res.entity_type for res in analyzer_results}
                print(f"\n   πŸ” PII detected in output: {', '.join(pii_types)}")
            
            # Apply anonymization to the accumulated text
            if analyzer_results:
                action = self.config.get("output_action", "anonymize")
                
                if action == "block":
                    pii_types = {res.entity_type for res in analyzer_results}
                    yield f"\n\nπŸ”’ [OUTPUT BLOCKED: PII detected - {', '.join(pii_types)}]"
                    return
                elif action == "anonymize":
                    anonymized_result = self.anonymizer.anonymize(
                        text=accumulated_text,
                        analyzer_results=analyzer_results,
                    )
                    # Calculate the new chunk based on the difference
                    anonymized_text = anonymized_result.text
                    if len(anonymized_text) >= len(accumulated_text) - len(chunk):
                        new_chunk = anonymized_text[len(accumulated_text) - len(chunk):]
                        yield new_chunk
                        accumulated_text = anonymized_text
                    else:
                        yield chunk
            else:
                yield chunk

    def process_complete_output(self, text: str) -> Tuple[str, bool]:
        """
        Process a complete output text for PII (non-streaming).
        This is specifically designed for output testing.
        """
        analyzer_results = self.analyzer.analyze(
            text=text,
            language="en",
            entities=self.config.get("anonymize_entities", []),
        )
        
        if not analyzer_results:
            return text, True  # No PII found
            
        pii_types = {res.entity_type for res in analyzer_results}
        print(f"πŸ” PII Analysis Results:")
        for result in analyzer_results:
            print(f"   - {result.entity_type}: '{text[result.start:result.end]}' (confidence: {result.score:.2f})")
            
        action = self.config.get("output_action", "anonymize")
        
        if action == "block":
            return f"Output blocked: PII detected ({', '.join(pii_types)}).", False
        elif action == "anonymize":
            anonymized_result = self.anonymizer.anonymize(
                text=text,
                analyzer_results=analyzer_results,
            )
            print(f"βœ… PII anonymized in output")
            return anonymized_result.text, True
        
        # Default to anonymization for unknown actions
        anonymized_result = self.anonymizer.anonymize(
            text=text,
            analyzer_results=analyzer_results,
        )
        return anonymized_result.text, True