|
|
import json |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict |
|
|
from .config import config |
|
|
|
|
|
class AuditLogger: |
|
|
"""Dual logging system: redacted for analytics, verbatim for compliance.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.verbatim_path = config.VERBATIM_LOG_PATH |
|
|
self.redacted_path = config.REDACTED_LOG_PATH |
|
|
|
|
|
def _generate_request_id(self, text: str) -> str: |
|
|
"""Generate a unique request ID.""" |
|
|
timestamp = datetime.utcnow().isoformat() |
|
|
return hashlib.sha256(f"{timestamp}{text}".encode()).hexdigest()[:16] |
|
|
|
|
|
def _redact_text(self, text: str) -> str: |
|
|
"""Redact sensitive content for analytics logs.""" |
|
|
|
|
|
return f"REDACTED_{hashlib.md5(text.encode()).hexdigest()[:8]}" |
|
|
|
|
|
def log_request( |
|
|
self, |
|
|
text: str, |
|
|
context: str, |
|
|
contains_profanity: bool, |
|
|
toxicity_level: str, |
|
|
safe_text: str, |
|
|
metadata: Dict[str, Any] = None |
|
|
) -> str: |
|
|
""" |
|
|
Log a profanity check request to both redacted and verbatim logs. |
|
|
|
|
|
Returns: |
|
|
request_id: Unique identifier for this request |
|
|
""" |
|
|
request_id = self._generate_request_id(text) |
|
|
timestamp = datetime.utcnow().isoformat() |
|
|
|
|
|
|
|
|
redacted_entry = { |
|
|
"request_id": request_id, |
|
|
"timestamp": timestamp, |
|
|
"context": context, |
|
|
"contains_profanity": contains_profanity, |
|
|
"toxicity_level": toxicity_level, |
|
|
"text_hash": hashlib.md5(text.encode()).hexdigest(), |
|
|
"text_length": len(text), |
|
|
"metadata": metadata or {} |
|
|
} |
|
|
|
|
|
|
|
|
verbatim_entry = { |
|
|
"request_id": request_id, |
|
|
"timestamp": timestamp, |
|
|
"context": context, |
|
|
"original_text": text, |
|
|
"safe_text": safe_text, |
|
|
"contains_profanity": contains_profanity, |
|
|
"toxicity_level": toxicity_level, |
|
|
"metadata": metadata or {} |
|
|
} |
|
|
|
|
|
|
|
|
redacted_file = self.redacted_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" |
|
|
with open(redacted_file, 'a') as f: |
|
|
f.write(json.dumps(redacted_entry) + '\n') |
|
|
|
|
|
|
|
|
if config.ENABLE_VERBATIM_LOGS: |
|
|
verbatim_file = self.verbatim_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" |
|
|
with open(verbatim_file, 'a') as f: |
|
|
f.write(json.dumps(verbatim_entry) + '\n') |
|
|
|
|
|
return request_id |
|
|
|
|
|
def get_redacted_logs(self, date: str = None) -> list: |
|
|
"""Retrieve redacted logs for a specific date.""" |
|
|
if date is None: |
|
|
date = datetime.utcnow().strftime('%Y-%m-%d') |
|
|
|
|
|
log_file = self.redacted_path / f"{date}.jsonl" |
|
|
if not log_file.exists(): |
|
|
return [] |
|
|
|
|
|
logs = [] |
|
|
with open(log_file, 'r') as f: |
|
|
for line in f: |
|
|
logs.append(json.loads(line)) |
|
|
return logs |
|
|
|
|
|
def get_verbatim_log(self, request_id: str, date: str = None) -> dict: |
|
|
""" |
|
|
Retrieve verbatim log for a specific request (compliance only). |
|
|
This should be access-controlled in production. |
|
|
""" |
|
|
if not config.ENABLE_VERBATIM_LOGS: |
|
|
return {"error": "Verbatim logs are disabled"} |
|
|
|
|
|
if date is None: |
|
|
date = datetime.utcnow().strftime('%Y-%m-%d') |
|
|
|
|
|
log_file = self.verbatim_path / f"{date}.jsonl" |
|
|
if not log_file.exists(): |
|
|
return {"error": "Log file not found"} |
|
|
|
|
|
with open(log_file, 'r') as f: |
|
|
for line in f: |
|
|
entry = json.loads(line) |
|
|
if entry['request_id'] == request_id: |
|
|
return entry |
|
|
|
|
|
return {"error": "Request ID not found"} |
|
|
|
|
|
|
|
|
audit_logger = AuditLogger() |
|
|
|