import os import sqlite3 import threading import uuid from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple from config import CFG _WRITE_LOCK = threading.Lock() def _logs_dir() -> str: path = os.path.join("logs") os.makedirs(path, exist_ok=True) return path def _default_db_path() -> str: return os.path.join(_logs_dir(), "api_requests.db") def _connect(db_path: Optional[str] = None) -> sqlite3.Connection: conn = sqlite3.connect(db_path or _default_db_path(), timeout=30, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA foreign_keys=ON;") return conn def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _today_ymd() -> str: return datetime.now(timezone.utc).date().isoformat() def init_db(db_path: Optional[str] = None) -> None: with _WRITE_LOCK: conn = _connect(db_path=db_path) try: conn.execute( """ CREATE TABLE IF NOT EXISTS requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT UNIQUE NOT NULL, timestamp TEXT NOT NULL, model_name TEXT NOT NULL, input_text TEXT NOT NULL, input_length INTEGER, predicted_label TEXT NOT NULL, predicted_label_id INTEGER NOT NULL, confidence REAL NOT NULL, is_low_confidence INTEGER NOT NULL DEFAULT 0, latency_ms REAL NOT NULL, is_batch INTEGER NOT NULL DEFAULT 0 ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS model_stats ( model_name TEXT NOT NULL, date TEXT NOT NULL, total_requests INTEGER DEFAULT 0, avg_confidence REAL DEFAULT 0.0, avg_latency_ms REAL DEFAULT 0.0, low_conf_count INTEGER DEFAULT 0, PRIMARY KEY (model_name, date) ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS low_confidence_flags ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, timestamp TEXT NOT NULL, input_text TEXT NOT NULL, predicted_label TEXT NOT NULL, confidence REAL NOT NULL, reviewed INTEGER NOT NULL DEFAULT 0, review_note TEXT, FOREIGN KEY (request_id) REFERENCES requests(request_id) ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp);" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_requests_model ON requests(model_name);" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_flags_reviewed ON low_confidence_flags(reviewed);" ) conn.commit() finally: conn.close() def new_request_id() -> str: return str(uuid.uuid4()) def log_request( request_id: str, model_name: str, input_text: str, predicted_label: str, predicted_label_id: int, confidence: float, latency_ms: float, is_batch: bool, db_path: Optional[str] = None, ) -> None: ts = _now_iso() original_len = len(input_text) stored_text = input_text[:500] is_low = 1 if float(confidence) < float(CFG.low_confidence_threshold) else 0 batch_int = 1 if is_batch else 0 with _WRITE_LOCK: conn = _connect(db_path=db_path) try: conn.execute( """ INSERT INTO requests ( request_id, timestamp, model_name, input_text, input_length, predicted_label, predicted_label_id, confidence, is_low_confidence, latency_ms, is_batch ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( request_id, ts, model_name, stored_text, original_len, predicted_label, int(predicted_label_id), float(confidence), int(is_low), float(latency_ms), int(batch_int), ), ) if is_low: conn.execute( """ INSERT INTO low_confidence_flags ( request_id, timestamp, input_text, predicted_label, confidence, reviewed, review_note ) VALUES (?, ?, ?, ?, ?, 0, NULL); """, (request_id, ts, stored_text, predicted_label, float(confidence)), ) date = _today_ymd() row = conn.execute( """ SELECT total_requests, avg_confidence, avg_latency_ms, low_conf_count FROM model_stats WHERE model_name=? AND date=?; """, (model_name, date), ).fetchone() if row is None: conn.execute( """ INSERT INTO model_stats ( model_name, date, total_requests, avg_confidence, avg_latency_ms, low_conf_count ) VALUES (?, ?, 1, ?, ?, ?); """, (model_name, date, float(confidence), float(latency_ms), int(is_low)), ) else: n = int(row["total_requests"]) new_n = n + 1 new_avg_conf = (float(row["avg_confidence"]) * n + float(confidence)) / new_n new_avg_lat = (float(row["avg_latency_ms"]) * n + float(latency_ms)) / new_n new_low = int(row["low_conf_count"]) + int(is_low) conn.execute( """ UPDATE model_stats SET total_requests=?, avg_confidence=?, avg_latency_ms=?, low_conf_count=? WHERE model_name=? AND date=?; """, (new_n, new_avg_conf, new_avg_lat, new_low, model_name, date), ) conn.commit() finally: conn.close() def get_request_history( db_path: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: conn = _connect(db_path=db_path) try: rows = conn.execute( """ SELECT * FROM requests ORDER BY id DESC LIMIT ? OFFSET ?; """, (int(limit), int(offset)), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def get_low_confidence_flags( db_path: Optional[str] = None, reviewed: bool = False, limit: int = 50 ) -> List[Dict[str, Any]]: conn = _connect(db_path=db_path) try: rows = conn.execute( """ SELECT * FROM low_confidence_flags WHERE reviewed=? ORDER BY id DESC LIMIT ?; """, (1 if reviewed else 0, int(limit)), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def mark_reviewed(request_id: str, note: Optional[str] = None, db_path: Optional[str] = None) -> None: with _WRITE_LOCK: conn = _connect(db_path=db_path) try: conn.execute( """ UPDATE low_confidence_flags SET reviewed=1, review_note=? WHERE request_id=?; """, (note, request_id), ) conn.commit() finally: conn.close() def get_model_leaderboard(db_path: Optional[str] = None) -> List[Tuple[str, int, float, float]]: conn = _connect(db_path=db_path) try: rows = conn.execute( """ SELECT model_name, COUNT(*) AS total_requests, AVG(confidence) AS avg_confidence, AVG(latency_ms) AS avg_latency_ms FROM requests GROUP BY model_name ORDER BY total_requests DESC; """ ).fetchall() return [ ( str(r["model_name"]), int(r["total_requests"]), float(r["avg_confidence"] or 0.0), float(r["avg_latency_ms"] or 0.0), ) for r in rows ] finally: conn.close() def get_summary( db_path: Optional[str] = None, model_name: Optional[str] = None, days: int = 7 ) -> Dict[str, Any]: conn = _connect(db_path=db_path) try: start_ts = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() params: List[Any] = [start_ts] where = "WHERE timestamp >= ?" if model_name: where += " AND model_name = ?" params.append(model_name) row = conn.execute( f""" SELECT COUNT(*) AS total_requests, AVG(confidence) AS avg_confidence, AVG(latency_ms) AS avg_latency_ms, SUM(is_low_confidence) AS low_confidence_count FROM requests {where}; """, tuple(params), ).fetchone() total_requests = int(row["total_requests"] or 0) avg_confidence = float(row["avg_confidence"] or 0.0) avg_latency_ms = float(row["avg_latency_ms"] or 0.0) low_conf_count = int(row["low_confidence_count"] or 0) rate = (low_conf_count / total_requests) * 100.0 if total_requests > 0 else 0.0 params2: List[Any] = list(params) where2 = where models = conn.execute( f""" SELECT DISTINCT model_name FROM requests {where2} ORDER BY model_name; """, tuple(params2), ).fetchall() models_used = [str(r["model_name"]) for r in models] label_rows = conn.execute( f""" SELECT predicted_label, COUNT(*) AS c FROM requests {where2} GROUP BY predicted_label; """, tuple(params2), ).fetchall() predictions_by_label = {str(r["predicted_label"]): int(r["c"]) for r in label_rows} return { "period_days": int(days), "total_requests": total_requests, "models_used": models_used, "avg_confidence": round(avg_confidence, 3), "avg_latency_ms": round(avg_latency_ms, 2), "low_confidence_count": low_conf_count, "low_confidence_rate": f"{rate:.2f}%", "predictions_by_label": predictions_by_label, } finally: conn.close() def export_low_confidence_to_folder( output_dir: str = os.path.join("logs", "low_confidence_review"), ) -> Dict[str, Any]: os.makedirs(output_dir, exist_ok=True) flags = get_low_confidence_flags(reviewed=False, limit=10_000) exported = 0 for f in flags: request_id = str(f["request_id"]) ts = str(f["timestamp"]).replace(":", "-") filename = f"{ts}_{request_id}.txt" path = os.path.join(output_dir, filename) if os.path.exists(path): continue content = "\n".join( [ f"request_id: {request_id}", f"timestamp: {f['timestamp']}", f"predicted_label: {f['predicted_label']}", f"confidence: {float(f['confidence']):.4f}", "", str(f["input_text"]), ] ) with open(path, "w", encoding="utf-8") as out: out.write(content) exported += 1 return {"exported": exported, "folder": output_dir}