import sqlite3 import os import json from datetime import datetime, timezone, timedelta from typing import List, Dict, Any DB_DIR = os.path.dirname(__file__) DB_PATH = os.path.join(DB_DIR, "logs.db") def get_connection(): """Returns a connection to the SQLite database.""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): """Initializes the audit logs table schema.""" os.makedirs(DB_DIR, exist_ok=True) with get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS audit_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, prompt TEXT, response_text TEXT, safe INTEGER NOT NULL, risk_score REAL NOT NULL, category TEXT NOT NULL, matched_rule TEXT NOT NULL, details TEXT NOT NULL ) """) conn.commit() def log_event( prompt: str, response_text: str, safe: bool, risk_score: float, category: str, matched_rule: str, details: Dict[str, Any] ): """Inserts a new validation log entry.""" # Calculate UTC and IST (Indian Standard Time) timestamps utc_now = datetime.now(timezone.utc) ist = timezone(timedelta(hours=5, minutes=30)) ist_now = utc_now.astimezone(ist) # Format: YYYY-MM-DDTHH:MM:SS.mmmZ (YYYY-MM-DD HH:MM:SS AM/PM IST) timestamp_str = f"{utc_now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z ({ist_now.strftime('%Y-%m-%d %I:%M:%S %p')} IST)" with get_connection() as conn: conn.execute( """ INSERT INTO audit_logs (timestamp, prompt, response_text, safe, risk_score, category, matched_rule, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( timestamp_str, prompt, response_text, 1 if safe else 0, risk_score, category, matched_rule, json.dumps(details) ) ) conn.commit() def get_logs(limit: int = 50) -> List[Dict[str, Any]]: """Retrieves recent logs for audit verification.""" with get_connection() as conn: cursor = conn.execute( "SELECT * FROM audit_logs ORDER BY id DESC LIMIT ?", (limit,) ) rows = cursor.fetchall() logs = [] for row in rows: log_dict = dict(row) log_dict["safe"] = bool(log_dict["safe"]) try: log_dict["details"] = json.loads(log_dict["details"]) except Exception: pass logs.append(log_dict) return logs