import sqlite3 import os import datetime from typing import Dict, Tuple, List, Optional import threading SCHEMA = """ PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS predictions ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, filename TEXT, image_path TEXT, emotion TEXT, confidence REAL ); -- Indexes for better query performance CREATE INDEX IF NOT EXISTS idx_predictions_ts ON predictions(ts DESC); CREATE INDEX IF NOT EXISTS idx_predictions_emotion ON predictions(emotion); CREATE INDEX IF NOT EXISTS idx_predictions_confidence ON predictions(confidence); """ # Connection pool for better performance _db_lock = threading.Lock() _connection_pool: Dict[str, sqlite3.Connection] = {} def get_connection(db_path: str, timeout: int = 10) -> sqlite3.Connection: """ Get a database connection with connection pooling. For SQLite, we use a simple per-thread connection approach. """ thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" with _db_lock: if key not in _connection_pool: conn = sqlite3.connect(db_path, timeout=timeout, check_same_thread=False) # Optimize SQLite settings conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA cache_size=10000;") conn.execute("PRAGMA temp_store=MEMORY;") _connection_pool[key] = conn return _connection_pool[key] def init_db(db_path: str): db_dir = os.path.dirname(db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) conn = sqlite3.connect(db_path, timeout=10) try: conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA cache_size=10000;") conn.executescript(SCHEMA) conn.commit() finally: conn.close() def log_prediction(db_path: str, filename: str, emotion: str, confidence: float, image_path: Optional[str] = None): """ Logs a prediction row. This function ensures ts is a string and that values bound to SQLite are primitive types (no functions or callables). Args: db_path: Path to SQLite database filename: Original filename emotion: Detected emotion confidence: Confidence score image_path: Path to stored image file (optional) """ # Defensive conversions try: ts = datetime.datetime.now(datetime.UTC).isoformat() except Exception: # fallback to str(datetime) ts = str(datetime.datetime.utcnow()) if filename is None: filename = "" else: filename = str(filename) if emotion is None: emotion = "" else: emotion = str(emotion) if image_path is None: image_path = "" else: image_path = str(image_path) try: confidence_val = float(confidence or 0.0) except Exception: confidence_val = 0.0 conn = get_connection(db_path) try: cur = conn.cursor() # Check if image_path column exists, if not, add it cur.execute("PRAGMA table_info(predictions)") columns = [row[1] for row in cur.fetchall()] if "image_path" not in columns: # Migrate schema - add image_path column cur.execute("ALTER TABLE predictions ADD COLUMN image_path TEXT") conn.commit() cur.execute( "INSERT INTO predictions (ts, filename, image_path, emotion, confidence) VALUES (?, ?, ?, ?, ?)", (ts, filename, image_path, emotion, confidence_val) ) conn.commit() return cur.lastrowid except Exception: # On error, close connection and retry with new connection with _db_lock: thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" if key in _connection_pool: try: _connection_pool[key].close() except: pass del _connection_pool[key] raise def get_metrics(db_path: str) -> Dict: conn = get_connection(db_path) try: cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM predictions") total = cur.fetchone()[0] or 0 cur.execute("SELECT emotion, COUNT(*) FROM predictions GROUP BY emotion") rows = cur.fetchall() by_label = {r[0]: r[1] for r in rows} return {"total": total, "by_label": by_label} except Exception: with _db_lock: thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" if key in _connection_pool: try: _connection_pool[key].close() except: pass del _connection_pool[key] raise def tail_rows(db_path: str, limit: int = 10, offset: int = 0, emotion_filter: Optional[str] = None, min_confidence: Optional[float] = None, max_confidence: Optional[float] = None, date_from: Optional[str] = None, date_to: Optional[str] = None) -> Tuple: """ Fetch rows from predictions table with filtering and pagination. Returns: List of tuples: (id, ts, filename, image_path, emotion, confidence) or (ts, filename, image_path, emotion, confidence) depending on query """ conn = get_connection(db_path) try: cur = conn.cursor() # Build query with filters query = "SELECT id, ts, filename, image_path, emotion, confidence FROM predictions WHERE 1=1" params = [] if emotion_filter: query += " AND emotion = ?" params.append(emotion_filter) if min_confidence is not None: query += " AND confidence >= ?" params.append(min_confidence) if max_confidence is not None: query += " AND confidence <= ?" params.append(max_confidence) if date_from: query += " AND ts >= ?" params.append(date_from) if date_to: query += " AND ts <= ?" params.append(date_to) query += " ORDER BY id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cur.execute(query, params) return cur.fetchall() except Exception: with _db_lock: thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" if key in _connection_pool: try: _connection_pool[key].close() except: pass del _connection_pool[key] raise def delete_prediction(db_path: str, prediction_id: int) -> bool: """ Delete a prediction by ID. Args: db_path: Path to SQLite database prediction_id: ID of prediction to delete Returns: True if deleted, False otherwise """ conn = get_connection(db_path) try: cur = conn.cursor() cur.execute("DELETE FROM predictions WHERE id = ?", (prediction_id,)) conn.commit() return cur.rowcount > 0 except Exception: with _db_lock: thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" if key in _connection_pool: try: _connection_pool[key].close() except: pass del _connection_pool[key] raise def get_total_count(db_path: str, emotion_filter: Optional[str] = None, min_confidence: Optional[float] = None, max_confidence: Optional[float] = None, date_from: Optional[str] = None, date_to: Optional[str] = None) -> int: """Get total count of predictions matching filters.""" conn = get_connection(db_path) try: cur = conn.cursor() query = "SELECT COUNT(*) FROM predictions WHERE 1=1" params = [] if emotion_filter: query += " AND emotion = ?" params.append(emotion_filter) if min_confidence is not None: query += " AND confidence >= ?" params.append(min_confidence) if max_confidence is not None: query += " AND confidence <= ?" params.append(max_confidence) if date_from: query += " AND ts >= ?" params.append(date_from) if date_to: query += " AND ts <= ?" params.append(date_to) cur.execute(query, params) return cur.fetchone()[0] or 0 except Exception: with _db_lock: thread_id = threading.get_ident() key = f"{db_path}_{thread_id}" if key in _connection_pool: try: _connection_pool[key].close() except: pass del _connection_pool[key] raise