File size: 9,174 Bytes
1e4fc28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287

import sqlite3
import os
import datetime
from typing import Dict, Tuple, List, Optional
import threading

SCHEMA = """
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS predictions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ts TEXT NOT NULL,
    filename TEXT,
    image_path TEXT,
    emotion TEXT,
    confidence REAL
);

-- Indexes for better query performance
CREATE INDEX IF NOT EXISTS idx_predictions_ts ON predictions(ts DESC);
CREATE INDEX IF NOT EXISTS idx_predictions_emotion ON predictions(emotion);
CREATE INDEX IF NOT EXISTS idx_predictions_confidence ON predictions(confidence);
"""

# Connection pool for better performance
_db_lock = threading.Lock()
_connection_pool: Dict[str, sqlite3.Connection] = {}


def get_connection(db_path: str, timeout: int = 10) -> sqlite3.Connection:
    """
    Get a database connection with connection pooling.
    For SQLite, we use a simple per-thread connection approach.
    """
    thread_id = threading.get_ident()
    key = f"{db_path}_{thread_id}"
    
    with _db_lock:
        if key not in _connection_pool:
            conn = sqlite3.connect(db_path, timeout=timeout, check_same_thread=False)
            # Optimize SQLite settings
            conn.execute("PRAGMA journal_mode=WAL;")
            conn.execute("PRAGMA synchronous=NORMAL;")
            conn.execute("PRAGMA cache_size=10000;")
            conn.execute("PRAGMA temp_store=MEMORY;")
            _connection_pool[key] = conn
        return _connection_pool[key]


def init_db(db_path: str):
    db_dir = os.path.dirname(db_path)
    if db_dir and not os.path.exists(db_dir):
        os.makedirs(db_dir, exist_ok=True)

    conn = sqlite3.connect(db_path, timeout=10)
    try:
        conn.execute("PRAGMA journal_mode=WAL;")
        conn.execute("PRAGMA synchronous=NORMAL;")
        conn.execute("PRAGMA cache_size=10000;")
        conn.executescript(SCHEMA)
        conn.commit()
    finally:
        conn.close()

def log_prediction(db_path: str, filename: str, emotion: str, confidence: float, image_path: Optional[str] = None):
    """
    Logs a prediction row. This function ensures ts is a string and that
    values bound to SQLite are primitive types (no functions or callables).
    
    Args:
        db_path: Path to SQLite database
        filename: Original filename
        emotion: Detected emotion
        confidence: Confidence score
        image_path: Path to stored image file (optional)
    """
    # Defensive conversions
    try:
        ts = datetime.datetime.now(datetime.UTC).isoformat()
    except Exception:
        # fallback to str(datetime)
        ts = str(datetime.datetime.utcnow())

    if filename is None:
        filename = ""
    else:
        filename = str(filename)

    if emotion is None:
        emotion = ""
    else:
        emotion = str(emotion)

    if image_path is None:
        image_path = ""
    else:
        image_path = str(image_path)

    try:
        confidence_val = float(confidence or 0.0)
    except Exception:
        confidence_val = 0.0

    conn = get_connection(db_path)
    try:
        cur = conn.cursor()
        # Check if image_path column exists, if not, add it
        cur.execute("PRAGMA table_info(predictions)")
        columns = [row[1] for row in cur.fetchall()]
        
        if "image_path" not in columns:
            # Migrate schema - add image_path column
            cur.execute("ALTER TABLE predictions ADD COLUMN image_path TEXT")
            conn.commit()
        
        cur.execute(
            "INSERT INTO predictions (ts, filename, image_path, emotion, confidence) VALUES (?, ?, ?, ?, ?)",
            (ts, filename, image_path, emotion, confidence_val)
        )
        conn.commit()
        return cur.lastrowid
    except Exception:
        # On error, close connection and retry with new connection
        with _db_lock:
            thread_id = threading.get_ident()
            key = f"{db_path}_{thread_id}"
            if key in _connection_pool:
                try:
                    _connection_pool[key].close()
                except:
                    pass
                del _connection_pool[key]
        raise

def get_metrics(db_path: str) -> Dict:
    conn = get_connection(db_path)
    try:
        cur = conn.cursor()
        cur.execute("SELECT COUNT(*) FROM predictions")
        total = cur.fetchone()[0] or 0
        cur.execute("SELECT emotion, COUNT(*) FROM predictions GROUP BY emotion")
        rows = cur.fetchall()
        by_label = {r[0]: r[1] for r in rows}
        return {"total": total, "by_label": by_label}
    except Exception:
        with _db_lock:
            thread_id = threading.get_ident()
            key = f"{db_path}_{thread_id}"
            if key in _connection_pool:
                try:
                    _connection_pool[key].close()
                except:
                    pass
                del _connection_pool[key]
        raise

def tail_rows(db_path: str, limit: int = 10, offset: int = 0, emotion_filter: Optional[str] = None, 
              min_confidence: Optional[float] = None, max_confidence: Optional[float] = None,
              date_from: Optional[str] = None, date_to: Optional[str] = None) -> Tuple:
    """
    Fetch rows from predictions table with filtering and pagination.
    
    Returns:
        List of tuples: (id, ts, filename, image_path, emotion, confidence) or 
        (ts, filename, image_path, emotion, confidence) depending on query
    """
    conn = get_connection(db_path)
    try:
        cur = conn.cursor()
        
        # Build query with filters
        query = "SELECT id, ts, filename, image_path, emotion, confidence FROM predictions WHERE 1=1"
        params = []
        
        if emotion_filter:
            query += " AND emotion = ?"
            params.append(emotion_filter)
        
        if min_confidence is not None:
            query += " AND confidence >= ?"
            params.append(min_confidence)
        
        if max_confidence is not None:
            query += " AND confidence <= ?"
            params.append(max_confidence)
        
        if date_from:
            query += " AND ts >= ?"
            params.append(date_from)
        
        if date_to:
            query += " AND ts <= ?"
            params.append(date_to)
        
        query += " ORDER BY id DESC LIMIT ? OFFSET ?"
        params.extend([limit, offset])
        
        cur.execute(query, params)
        return cur.fetchall()
    except Exception:
        with _db_lock:
            thread_id = threading.get_ident()
            key = f"{db_path}_{thread_id}"
            if key in _connection_pool:
                try:
                    _connection_pool[key].close()
                except:
                    pass
                del _connection_pool[key]
        raise


def delete_prediction(db_path: str, prediction_id: int) -> bool:
    """
    Delete a prediction by ID.
    
    Args:
        db_path: Path to SQLite database
        prediction_id: ID of prediction to delete
    
    Returns:
        True if deleted, False otherwise
    """
    conn = get_connection(db_path)
    try:
        cur = conn.cursor()
        cur.execute("DELETE FROM predictions WHERE id = ?", (prediction_id,))
        conn.commit()
        return cur.rowcount > 0
    except Exception:
        with _db_lock:
            thread_id = threading.get_ident()
            key = f"{db_path}_{thread_id}"
            if key in _connection_pool:
                try:
                    _connection_pool[key].close()
                except:
                    pass
                del _connection_pool[key]
        raise


def get_total_count(db_path: str, emotion_filter: Optional[str] = None,
                   min_confidence: Optional[float] = None, max_confidence: Optional[float] = None,
                   date_from: Optional[str] = None, date_to: Optional[str] = None) -> int:
    """Get total count of predictions matching filters."""
    conn = get_connection(db_path)
    try:
        cur = conn.cursor()
        
        query = "SELECT COUNT(*) FROM predictions WHERE 1=1"
        params = []
        
        if emotion_filter:
            query += " AND emotion = ?"
            params.append(emotion_filter)
        
        if min_confidence is not None:
            query += " AND confidence >= ?"
            params.append(min_confidence)
        
        if max_confidence is not None:
            query += " AND confidence <= ?"
            params.append(max_confidence)
        
        if date_from:
            query += " AND ts >= ?"
            params.append(date_from)
        
        if date_to:
            query += " AND ts <= ?"
            params.append(date_to)
        
        cur.execute(query, params)
        return cur.fetchone()[0] or 0
    except Exception:
        with _db_lock:
            thread_id = threading.get_ident()
            key = f"{db_path}_{thread_id}"
            if key in _connection_pool:
                try:
                    _connection_pool[key].close()
                except:
                    pass
                del _connection_pool[key]
        raise