File size: 4,151 Bytes
deff797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from .config import config

class AuditLogger:
    """Dual logging system: redacted for analytics, verbatim for compliance."""
    
    def __init__(self):
        self.verbatim_path = config.VERBATIM_LOG_PATH
        self.redacted_path = config.REDACTED_LOG_PATH
        
    def _generate_request_id(self, text: str) -> str:
        """Generate a unique request ID."""
        timestamp = datetime.utcnow().isoformat()
        return hashlib.sha256(f"{timestamp}{text}".encode()).hexdigest()[:16]
    
    def _redact_text(self, text: str) -> str:
        """Redact sensitive content for analytics logs."""
        # Replace with hash to preserve uniqueness while hiding content
        return f"REDACTED_{hashlib.md5(text.encode()).hexdigest()[:8]}"
    
    def log_request(
        self,
        text: str,
        context: str,
        contains_profanity: bool,
        toxicity_level: str,
        safe_text: str,
        metadata: Dict[str, Any] = None
    ) -> str:
        """
        Log a profanity check request to both redacted and verbatim logs.
        
        Returns:
            request_id: Unique identifier for this request
        """
        request_id = self._generate_request_id(text)
        timestamp = datetime.utcnow().isoformat()
        
        # Redacted log (for analytics)
        redacted_entry = {
            "request_id": request_id,
            "timestamp": timestamp,
            "context": context,
            "contains_profanity": contains_profanity,
            "toxicity_level": toxicity_level,
            "text_hash": hashlib.md5(text.encode()).hexdigest(),
            "text_length": len(text),
            "metadata": metadata or {}
        }
        
        # Verbatim log (for compliance/audit)
        verbatim_entry = {
            "request_id": request_id,
            "timestamp": timestamp,
            "context": context,
            "original_text": text,
            "safe_text": safe_text,
            "contains_profanity": contains_profanity,
            "toxicity_level": toxicity_level,
            "metadata": metadata or {}
        }
        
        # Write redacted log
        redacted_file = self.redacted_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl"
        with open(redacted_file, 'a') as f:
            f.write(json.dumps(redacted_entry) + '\n')
        
        # Write verbatim log (if enabled)
        if config.ENABLE_VERBATIM_LOGS:
            verbatim_file = self.verbatim_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl"
            with open(verbatim_file, 'a') as f:
                f.write(json.dumps(verbatim_entry) + '\n')
        
        return request_id
    
    def get_redacted_logs(self, date: str = None) -> list:
        """Retrieve redacted logs for a specific date."""
        if date is None:
            date = datetime.utcnow().strftime('%Y-%m-%d')
        
        log_file = self.redacted_path / f"{date}.jsonl"
        if not log_file.exists():
            return []
        
        logs = []
        with open(log_file, 'r') as f:
            for line in f:
                logs.append(json.loads(line))
        return logs
    
    def get_verbatim_log(self, request_id: str, date: str = None) -> dict:
        """
        Retrieve verbatim log for a specific request (compliance only).
        This should be access-controlled in production.
        """
        if not config.ENABLE_VERBATIM_LOGS:
            return {"error": "Verbatim logs are disabled"}
        
        if date is None:
            date = datetime.utcnow().strftime('%Y-%m-%d')
        
        log_file = self.verbatim_path / f"{date}.jsonl"
        if not log_file.exists():
            return {"error": "Log file not found"}
        
        with open(log_file, 'r') as f:
            for line in f:
                entry = json.loads(line)
                if entry['request_id'] == request_id:
                    return entry
        
        return {"error": "Request ID not found"}

# Singleton instance
audit_logger = AuditLogger()