prof-demo / src /utils /logger.py
sbicy's picture
Upload 17 files
deff797 verified
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from .config import config
class AuditLogger:
"""Dual logging system: redacted for analytics, verbatim for compliance."""
def __init__(self):
self.verbatim_path = config.VERBATIM_LOG_PATH
self.redacted_path = config.REDACTED_LOG_PATH
def _generate_request_id(self, text: str) -> str:
"""Generate a unique request ID."""
timestamp = datetime.utcnow().isoformat()
return hashlib.sha256(f"{timestamp}{text}".encode()).hexdigest()[:16]
def _redact_text(self, text: str) -> str:
"""Redact sensitive content for analytics logs."""
# Replace with hash to preserve uniqueness while hiding content
return f"REDACTED_{hashlib.md5(text.encode()).hexdigest()[:8]}"
def log_request(
self,
text: str,
context: str,
contains_profanity: bool,
toxicity_level: str,
safe_text: str,
metadata: Dict[str, Any] = None
) -> str:
"""
Log a profanity check request to both redacted and verbatim logs.
Returns:
request_id: Unique identifier for this request
"""
request_id = self._generate_request_id(text)
timestamp = datetime.utcnow().isoformat()
# Redacted log (for analytics)
redacted_entry = {
"request_id": request_id,
"timestamp": timestamp,
"context": context,
"contains_profanity": contains_profanity,
"toxicity_level": toxicity_level,
"text_hash": hashlib.md5(text.encode()).hexdigest(),
"text_length": len(text),
"metadata": metadata or {}
}
# Verbatim log (for compliance/audit)
verbatim_entry = {
"request_id": request_id,
"timestamp": timestamp,
"context": context,
"original_text": text,
"safe_text": safe_text,
"contains_profanity": contains_profanity,
"toxicity_level": toxicity_level,
"metadata": metadata or {}
}
# Write redacted log
redacted_file = self.redacted_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl"
with open(redacted_file, 'a') as f:
f.write(json.dumps(redacted_entry) + '\n')
# Write verbatim log (if enabled)
if config.ENABLE_VERBATIM_LOGS:
verbatim_file = self.verbatim_path / f"{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl"
with open(verbatim_file, 'a') as f:
f.write(json.dumps(verbatim_entry) + '\n')
return request_id
def get_redacted_logs(self, date: str = None) -> list:
"""Retrieve redacted logs for a specific date."""
if date is None:
date = datetime.utcnow().strftime('%Y-%m-%d')
log_file = self.redacted_path / f"{date}.jsonl"
if not log_file.exists():
return []
logs = []
with open(log_file, 'r') as f:
for line in f:
logs.append(json.loads(line))
return logs
def get_verbatim_log(self, request_id: str, date: str = None) -> dict:
"""
Retrieve verbatim log for a specific request (compliance only).
This should be access-controlled in production.
"""
if not config.ENABLE_VERBATIM_LOGS:
return {"error": "Verbatim logs are disabled"}
if date is None:
date = datetime.utcnow().strftime('%Y-%m-%d')
log_file = self.verbatim_path / f"{date}.jsonl"
if not log_file.exists():
return {"error": "Log file not found"}
with open(log_file, 'r') as f:
for line in f:
entry = json.loads(line)
if entry['request_id'] == request_id:
return entry
return {"error": "Request ID not found"}
# Singleton instance
audit_logger = AuditLogger()