Spaces:
Running
Running
| import os | |
| import sqlite3 | |
| import threading | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from config import CFG | |
| _WRITE_LOCK = threading.Lock() | |
| def _logs_dir() -> str: | |
| path = os.path.join("logs") | |
| os.makedirs(path, exist_ok=True) | |
| return path | |
| def _default_db_path() -> str: | |
| return os.path.join(_logs_dir(), "api_requests.db") | |
| def _connect(db_path: Optional[str] = None) -> sqlite3.Connection: | |
| conn = sqlite3.connect(db_path or _default_db_path(), timeout=30, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL;") | |
| conn.execute("PRAGMA synchronous=NORMAL;") | |
| conn.execute("PRAGMA foreign_keys=ON;") | |
| return conn | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _today_ymd() -> str: | |
| return datetime.now(timezone.utc).date().isoformat() | |
| def init_db(db_path: Optional[str] = None) -> None: | |
| with _WRITE_LOCK: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS requests ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| request_id TEXT UNIQUE NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| model_name TEXT NOT NULL, | |
| input_text TEXT NOT NULL, | |
| input_length INTEGER, | |
| predicted_label TEXT NOT NULL, | |
| predicted_label_id INTEGER NOT NULL, | |
| confidence REAL NOT NULL, | |
| is_low_confidence INTEGER NOT NULL DEFAULT 0, | |
| latency_ms REAL NOT NULL, | |
| is_batch INTEGER NOT NULL DEFAULT 0 | |
| ); | |
| """ | |
| ) | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS model_stats ( | |
| model_name TEXT NOT NULL, | |
| date TEXT NOT NULL, | |
| total_requests INTEGER DEFAULT 0, | |
| avg_confidence REAL DEFAULT 0.0, | |
| avg_latency_ms REAL DEFAULT 0.0, | |
| low_conf_count INTEGER DEFAULT 0, | |
| PRIMARY KEY (model_name, date) | |
| ); | |
| """ | |
| ) | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS low_confidence_flags ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| request_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| input_text TEXT NOT NULL, | |
| predicted_label TEXT NOT NULL, | |
| confidence REAL NOT NULL, | |
| reviewed INTEGER NOT NULL DEFAULT 0, | |
| review_note TEXT, | |
| FOREIGN KEY (request_id) REFERENCES requests(request_id) | |
| ); | |
| """ | |
| ) | |
| conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp);" | |
| ) | |
| conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_requests_model ON requests(model_name);" | |
| ) | |
| conn.execute( | |
| "CREATE INDEX IF NOT EXISTS idx_flags_reviewed ON low_confidence_flags(reviewed);" | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def new_request_id() -> str: | |
| return str(uuid.uuid4()) | |
| def log_request( | |
| request_id: str, | |
| model_name: str, | |
| input_text: str, | |
| predicted_label: str, | |
| predicted_label_id: int, | |
| confidence: float, | |
| latency_ms: float, | |
| is_batch: bool, | |
| db_path: Optional[str] = None, | |
| ) -> None: | |
| ts = _now_iso() | |
| original_len = len(input_text) | |
| stored_text = input_text[:500] | |
| is_low = 1 if float(confidence) < float(CFG.low_confidence_threshold) else 0 | |
| batch_int = 1 if is_batch else 0 | |
| with _WRITE_LOCK: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| conn.execute( | |
| """ | |
| INSERT INTO requests ( | |
| request_id, timestamp, model_name, input_text, input_length, | |
| predicted_label, predicted_label_id, confidence, is_low_confidence, | |
| latency_ms, is_batch | |
| ) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); | |
| """, | |
| ( | |
| request_id, | |
| ts, | |
| model_name, | |
| stored_text, | |
| original_len, | |
| predicted_label, | |
| int(predicted_label_id), | |
| float(confidence), | |
| int(is_low), | |
| float(latency_ms), | |
| int(batch_int), | |
| ), | |
| ) | |
| if is_low: | |
| conn.execute( | |
| """ | |
| INSERT INTO low_confidence_flags ( | |
| request_id, timestamp, input_text, predicted_label, confidence, reviewed, review_note | |
| ) | |
| VALUES (?, ?, ?, ?, ?, 0, NULL); | |
| """, | |
| (request_id, ts, stored_text, predicted_label, float(confidence)), | |
| ) | |
| date = _today_ymd() | |
| row = conn.execute( | |
| """ | |
| SELECT total_requests, avg_confidence, avg_latency_ms, low_conf_count | |
| FROM model_stats | |
| WHERE model_name=? AND date=?; | |
| """, | |
| (model_name, date), | |
| ).fetchone() | |
| if row is None: | |
| conn.execute( | |
| """ | |
| INSERT INTO model_stats ( | |
| model_name, date, total_requests, avg_confidence, avg_latency_ms, low_conf_count | |
| ) | |
| VALUES (?, ?, 1, ?, ?, ?); | |
| """, | |
| (model_name, date, float(confidence), float(latency_ms), int(is_low)), | |
| ) | |
| else: | |
| n = int(row["total_requests"]) | |
| new_n = n + 1 | |
| new_avg_conf = (float(row["avg_confidence"]) * n + float(confidence)) / new_n | |
| new_avg_lat = (float(row["avg_latency_ms"]) * n + float(latency_ms)) / new_n | |
| new_low = int(row["low_conf_count"]) + int(is_low) | |
| conn.execute( | |
| """ | |
| UPDATE model_stats | |
| SET total_requests=?, avg_confidence=?, avg_latency_ms=?, low_conf_count=? | |
| WHERE model_name=? AND date=?; | |
| """, | |
| (new_n, new_avg_conf, new_avg_lat, new_low, model_name, date), | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def get_request_history( | |
| db_path: Optional[str] = None, limit: int = 100, offset: int = 0 | |
| ) -> List[Dict[str, Any]]: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| rows = conn.execute( | |
| """ | |
| SELECT * | |
| FROM requests | |
| ORDER BY id DESC | |
| LIMIT ? OFFSET ?; | |
| """, | |
| (int(limit), int(offset)), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| finally: | |
| conn.close() | |
| def get_low_confidence_flags( | |
| db_path: Optional[str] = None, reviewed: bool = False, limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| rows = conn.execute( | |
| """ | |
| SELECT * | |
| FROM low_confidence_flags | |
| WHERE reviewed=? | |
| ORDER BY id DESC | |
| LIMIT ?; | |
| """, | |
| (1 if reviewed else 0, int(limit)), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| finally: | |
| conn.close() | |
| def mark_reviewed(request_id: str, note: Optional[str] = None, db_path: Optional[str] = None) -> None: | |
| with _WRITE_LOCK: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| conn.execute( | |
| """ | |
| UPDATE low_confidence_flags | |
| SET reviewed=1, review_note=? | |
| WHERE request_id=?; | |
| """, | |
| (note, request_id), | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def get_model_leaderboard(db_path: Optional[str] = None) -> List[Tuple[str, int, float, float]]: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| rows = conn.execute( | |
| """ | |
| SELECT | |
| model_name, | |
| COUNT(*) AS total_requests, | |
| AVG(confidence) AS avg_confidence, | |
| AVG(latency_ms) AS avg_latency_ms | |
| FROM requests | |
| GROUP BY model_name | |
| ORDER BY total_requests DESC; | |
| """ | |
| ).fetchall() | |
| return [ | |
| ( | |
| str(r["model_name"]), | |
| int(r["total_requests"]), | |
| float(r["avg_confidence"] or 0.0), | |
| float(r["avg_latency_ms"] or 0.0), | |
| ) | |
| for r in rows | |
| ] | |
| finally: | |
| conn.close() | |
| def get_summary( | |
| db_path: Optional[str] = None, model_name: Optional[str] = None, days: int = 7 | |
| ) -> Dict[str, Any]: | |
| conn = _connect(db_path=db_path) | |
| try: | |
| start_ts = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() | |
| params: List[Any] = [start_ts] | |
| where = "WHERE timestamp >= ?" | |
| if model_name: | |
| where += " AND model_name = ?" | |
| params.append(model_name) | |
| row = conn.execute( | |
| f""" | |
| SELECT | |
| COUNT(*) AS total_requests, | |
| AVG(confidence) AS avg_confidence, | |
| AVG(latency_ms) AS avg_latency_ms, | |
| SUM(is_low_confidence) AS low_confidence_count | |
| FROM requests | |
| {where}; | |
| """, | |
| tuple(params), | |
| ).fetchone() | |
| total_requests = int(row["total_requests"] or 0) | |
| avg_confidence = float(row["avg_confidence"] or 0.0) | |
| avg_latency_ms = float(row["avg_latency_ms"] or 0.0) | |
| low_conf_count = int(row["low_confidence_count"] or 0) | |
| rate = (low_conf_count / total_requests) * 100.0 if total_requests > 0 else 0.0 | |
| params2: List[Any] = list(params) | |
| where2 = where | |
| models = conn.execute( | |
| f""" | |
| SELECT DISTINCT model_name | |
| FROM requests | |
| {where2} | |
| ORDER BY model_name; | |
| """, | |
| tuple(params2), | |
| ).fetchall() | |
| models_used = [str(r["model_name"]) for r in models] | |
| label_rows = conn.execute( | |
| f""" | |
| SELECT predicted_label, COUNT(*) AS c | |
| FROM requests | |
| {where2} | |
| GROUP BY predicted_label; | |
| """, | |
| tuple(params2), | |
| ).fetchall() | |
| predictions_by_label = {str(r["predicted_label"]): int(r["c"]) for r in label_rows} | |
| return { | |
| "period_days": int(days), | |
| "total_requests": total_requests, | |
| "models_used": models_used, | |
| "avg_confidence": round(avg_confidence, 3), | |
| "avg_latency_ms": round(avg_latency_ms, 2), | |
| "low_confidence_count": low_conf_count, | |
| "low_confidence_rate": f"{rate:.2f}%", | |
| "predictions_by_label": predictions_by_label, | |
| } | |
| finally: | |
| conn.close() | |
| def export_low_confidence_to_folder( | |
| output_dir: str = os.path.join("logs", "low_confidence_review"), | |
| ) -> Dict[str, Any]: | |
| os.makedirs(output_dir, exist_ok=True) | |
| flags = get_low_confidence_flags(reviewed=False, limit=10_000) | |
| exported = 0 | |
| for f in flags: | |
| request_id = str(f["request_id"]) | |
| ts = str(f["timestamp"]).replace(":", "-") | |
| filename = f"{ts}_{request_id}.txt" | |
| path = os.path.join(output_dir, filename) | |
| if os.path.exists(path): | |
| continue | |
| content = "\n".join( | |
| [ | |
| f"request_id: {request_id}", | |
| f"timestamp: {f['timestamp']}", | |
| f"predicted_label: {f['predicted_label']}", | |
| f"confidence: {float(f['confidence']):.4f}", | |
| "", | |
| str(f["input_text"]), | |
| ] | |
| ) | |
| with open(path, "w", encoding="utf-8") as out: | |
| out.write(content) | |
| exported += 1 | |
| return {"exported": exported, "folder": output_dir} | |