Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import os | |
| import json | |
| from datetime import datetime, timezone, timedelta | |
| from typing import List, Dict, Any | |
| DB_DIR = os.path.dirname(__file__) | |
| DB_PATH = os.path.join(DB_DIR, "logs.db") | |
| def get_connection(): | |
| """Returns a connection to the SQLite database.""" | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| """Initializes the audit logs table schema.""" | |
| os.makedirs(DB_DIR, exist_ok=True) | |
| with get_connection() as conn: | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS audit_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| prompt TEXT, | |
| response_text TEXT, | |
| safe INTEGER NOT NULL, | |
| risk_score REAL NOT NULL, | |
| category TEXT NOT NULL, | |
| matched_rule TEXT NOT NULL, | |
| details TEXT NOT NULL | |
| ) | |
| """) | |
| conn.commit() | |
| def log_event( | |
| prompt: str, | |
| response_text: str, | |
| safe: bool, | |
| risk_score: float, | |
| category: str, | |
| matched_rule: str, | |
| details: Dict[str, Any] | |
| ): | |
| """Inserts a new validation log entry.""" | |
| # Calculate UTC and IST (Indian Standard Time) timestamps | |
| utc_now = datetime.now(timezone.utc) | |
| ist = timezone(timedelta(hours=5, minutes=30)) | |
| ist_now = utc_now.astimezone(ist) | |
| # Format: YYYY-MM-DDTHH:MM:SS.mmmZ (YYYY-MM-DD HH:MM:SS AM/PM IST) | |
| timestamp_str = f"{utc_now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z ({ist_now.strftime('%Y-%m-%d %I:%M:%S %p')} IST)" | |
| with get_connection() as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO audit_logs (timestamp, prompt, response_text, safe, risk_score, category, matched_rule, details) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| timestamp_str, | |
| prompt, | |
| response_text, | |
| 1 if safe else 0, | |
| risk_score, | |
| category, | |
| matched_rule, | |
| json.dumps(details) | |
| ) | |
| ) | |
| conn.commit() | |
| def get_logs(limit: int = 50) -> List[Dict[str, Any]]: | |
| """Retrieves recent logs for audit verification.""" | |
| with get_connection() as conn: | |
| cursor = conn.execute( | |
| "SELECT * FROM audit_logs ORDER BY id DESC LIMIT ?", (limit,) | |
| ) | |
| rows = cursor.fetchall() | |
| logs = [] | |
| for row in rows: | |
| log_dict = dict(row) | |
| log_dict["safe"] = bool(log_dict["safe"]) | |
| try: | |
| log_dict["details"] = json.loads(log_dict["details"]) | |
| except Exception: | |
| pass | |
| logs.append(log_dict) | |
| return logs | |