import json import hashlib from datetime import datetime from pathlib import Path from typing import Any, Dict from .config import config class AuditLogger: """Dual logging system: redacted for analytics, verbatim for compliance.""" def __init__(self): self.verbatim_path = config.VERBATIM_LOG_PATH self.redacted_path = config.REDACTED_LOG_PATH def _generate_request_id(self, text: str) -> str: """Generate a unique request ID.""" timestamp = datetime.utcnow().isoformat() return hashlib.sha256(f"{timestamp}{text}".encode()).hexdigest()[:16] def _redact_text(self, text: str) -> str: """Redact sensitive content for analytics logs.""" # Replace with hash to preserve uniqueness while hiding content return f"REDACTED_{hashlib.md5(text.encode()).hexdigest()[:8]}" def log_request( self, text: str, context: str, contains_profanity: bool, toxicity_level: str, safe_text: str, metadata: Dict[str, Any] = None ) -> str: """ Log a profanity check request to both redacted and verbatim logs. Returns: request_id: Unique identifier for this request """ request_id = self._generate_request_id(text) timestamp = datetime.utcnow().isoformat() # Redacted log (for analytics) redacted_entry = { "request_id": request_id, "timestamp": timestamp, "context": context, "contains_profanity": contains_profanity, "toxicity_level": toxicity_level, "text_hash": hashlib.md5(text.encode()).hexdigest(), "text_length": len(text), "metadata": metadata or {} } # Verbatim log (for compliance/audit) verbatim_entry = { "request_id": request_id, "timestamp": timestamp, "context": context, "original_text": text, "safe_text": safe_text, "contains_profanity": contains_profanity, "toxicity_level": toxicity_level, "metadata": metadata or {} } # Write redacted log redacted_file = self.redacted_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" with open(redacted_file, 'a') as f: f.write(json.dumps(redacted_entry) + '\n') # Write verbatim log (if enabled) if config.ENABLE_VERBATIM_LOGS: verbatim_file = self.verbatim_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" with open(verbatim_file, 'a') as f: f.write(json.dumps(verbatim_entry) + '\n') return request_id def get_redacted_logs(self, date: str = None) -> list: """Retrieve redacted logs for a specific date.""" if date is None: date = datetime.utcnow().strftime('%Y-%m-%d') log_file = self.redacted_path / f"{date}.jsonl" if not log_file.exists(): return [] logs = [] with open(log_file, 'r') as f: for line in f: logs.append(json.loads(line)) return logs def get_verbatim_log(self, request_id: str, date: str = None) -> dict: """ Retrieve verbatim log for a specific request (compliance only). This should be access-controlled in production. """ if not config.ENABLE_VERBATIM_LOGS: return {"error": "Verbatim logs are disabled"} if date is None: date = datetime.utcnow().strftime('%Y-%m-%d') log_file = self.verbatim_path / f"{date}.jsonl" if not log_file.exists(): return {"error": "Log file not found"} with open(log_file, 'r') as f: for line in f: entry = json.loads(line) if entry['request_id'] == request_id: return entry return {"error": "Request ID not found"} # Singleton instance audit_logger = AuditLogger()