YashVardhan-coder
Initial clean deploy
b8c035c
Raw
History Blame Contribute Delete
2.74 kB
import sqlite3
import os
import json
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any
DB_DIR = os.path.dirname(__file__)
DB_PATH = os.path.join(DB_DIR, "logs.db")
def get_connection():
"""Returns a connection to the SQLite database."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initializes the audit logs table schema."""
os.makedirs(DB_DIR, exist_ok=True)
with get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
prompt TEXT,
response_text TEXT,
safe INTEGER NOT NULL,
risk_score REAL NOT NULL,
category TEXT NOT NULL,
matched_rule TEXT NOT NULL,
details TEXT NOT NULL
)
""")
conn.commit()
def log_event(
prompt: str,
response_text: str,
safe: bool,
risk_score: float,
category: str,
matched_rule: str,
details: Dict[str, Any]
):
"""Inserts a new validation log entry."""
# Calculate UTC and IST (Indian Standard Time) timestamps
utc_now = datetime.now(timezone.utc)
ist = timezone(timedelta(hours=5, minutes=30))
ist_now = utc_now.astimezone(ist)
# Format: YYYY-MM-DDTHH:MM:SS.mmmZ (YYYY-MM-DD HH:MM:SS AM/PM IST)
timestamp_str = f"{utc_now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z ({ist_now.strftime('%Y-%m-%d %I:%M:%S %p')} IST)"
with get_connection() as conn:
conn.execute(
"""
INSERT INTO audit_logs (timestamp, prompt, response_text, safe, risk_score, category, matched_rule, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
timestamp_str,
prompt,
response_text,
1 if safe else 0,
risk_score,
category,
matched_rule,
json.dumps(details)
)
)
conn.commit()
def get_logs(limit: int = 50) -> List[Dict[str, Any]]:
"""Retrieves recent logs for audit verification."""
with get_connection() as conn:
cursor = conn.execute(
"SELECT * FROM audit_logs ORDER BY id DESC LIMIT ?", (limit,)
)
rows = cursor.fetchall()
logs = []
for row in rows:
log_dict = dict(row)
log_dict["safe"] = bool(log_dict["safe"])
try:
log_dict["details"] = json.loads(log_dict["details"])
except Exception:
pass
logs.append(log_dict)
return logs