nexa-classify-api / database.py
Prototype6239's picture
Upload folder using huggingface_hub
a229747 verified
Raw
History Blame Contribute Delete
12.4 kB
import os
import sqlite3
import threading
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
from config import CFG
_WRITE_LOCK = threading.Lock()
def _logs_dir() -> str:
path = os.path.join("logs")
os.makedirs(path, exist_ok=True)
return path
def _default_db_path() -> str:
return os.path.join(_logs_dir(), "api_requests.db")
def _connect(db_path: Optional[str] = None) -> sqlite3.Connection:
conn = sqlite3.connect(db_path or _default_db_path(), timeout=30, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA foreign_keys=ON;")
return conn
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _today_ymd() -> str:
return datetime.now(timezone.utc).date().isoformat()
def init_db(db_path: Optional[str] = None) -> None:
with _WRITE_LOCK:
conn = _connect(db_path=db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT UNIQUE NOT NULL,
timestamp TEXT NOT NULL,
model_name TEXT NOT NULL,
input_text TEXT NOT NULL,
input_length INTEGER,
predicted_label TEXT NOT NULL,
predicted_label_id INTEGER NOT NULL,
confidence REAL NOT NULL,
is_low_confidence INTEGER NOT NULL DEFAULT 0,
latency_ms REAL NOT NULL,
is_batch INTEGER NOT NULL DEFAULT 0
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS model_stats (
model_name TEXT NOT NULL,
date TEXT NOT NULL,
total_requests INTEGER DEFAULT 0,
avg_confidence REAL DEFAULT 0.0,
avg_latency_ms REAL DEFAULT 0.0,
low_conf_count INTEGER DEFAULT 0,
PRIMARY KEY (model_name, date)
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS low_confidence_flags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
input_text TEXT NOT NULL,
predicted_label TEXT NOT NULL,
confidence REAL NOT NULL,
reviewed INTEGER NOT NULL DEFAULT 0,
review_note TEXT,
FOREIGN KEY (request_id) REFERENCES requests(request_id)
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp);"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_requests_model ON requests(model_name);"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_flags_reviewed ON low_confidence_flags(reviewed);"
)
conn.commit()
finally:
conn.close()
def new_request_id() -> str:
return str(uuid.uuid4())
def log_request(
request_id: str,
model_name: str,
input_text: str,
predicted_label: str,
predicted_label_id: int,
confidence: float,
latency_ms: float,
is_batch: bool,
db_path: Optional[str] = None,
) -> None:
ts = _now_iso()
original_len = len(input_text)
stored_text = input_text[:500]
is_low = 1 if float(confidence) < float(CFG.low_confidence_threshold) else 0
batch_int = 1 if is_batch else 0
with _WRITE_LOCK:
conn = _connect(db_path=db_path)
try:
conn.execute(
"""
INSERT INTO requests (
request_id, timestamp, model_name, input_text, input_length,
predicted_label, predicted_label_id, confidence, is_low_confidence,
latency_ms, is_batch
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
""",
(
request_id,
ts,
model_name,
stored_text,
original_len,
predicted_label,
int(predicted_label_id),
float(confidence),
int(is_low),
float(latency_ms),
int(batch_int),
),
)
if is_low:
conn.execute(
"""
INSERT INTO low_confidence_flags (
request_id, timestamp, input_text, predicted_label, confidence, reviewed, review_note
)
VALUES (?, ?, ?, ?, ?, 0, NULL);
""",
(request_id, ts, stored_text, predicted_label, float(confidence)),
)
date = _today_ymd()
row = conn.execute(
"""
SELECT total_requests, avg_confidence, avg_latency_ms, low_conf_count
FROM model_stats
WHERE model_name=? AND date=?;
""",
(model_name, date),
).fetchone()
if row is None:
conn.execute(
"""
INSERT INTO model_stats (
model_name, date, total_requests, avg_confidence, avg_latency_ms, low_conf_count
)
VALUES (?, ?, 1, ?, ?, ?);
""",
(model_name, date, float(confidence), float(latency_ms), int(is_low)),
)
else:
n = int(row["total_requests"])
new_n = n + 1
new_avg_conf = (float(row["avg_confidence"]) * n + float(confidence)) / new_n
new_avg_lat = (float(row["avg_latency_ms"]) * n + float(latency_ms)) / new_n
new_low = int(row["low_conf_count"]) + int(is_low)
conn.execute(
"""
UPDATE model_stats
SET total_requests=?, avg_confidence=?, avg_latency_ms=?, low_conf_count=?
WHERE model_name=? AND date=?;
""",
(new_n, new_avg_conf, new_avg_lat, new_low, model_name, date),
)
conn.commit()
finally:
conn.close()
def get_request_history(
db_path: Optional[str] = None, limit: int = 100, offset: int = 0
) -> List[Dict[str, Any]]:
conn = _connect(db_path=db_path)
try:
rows = conn.execute(
"""
SELECT *
FROM requests
ORDER BY id DESC
LIMIT ? OFFSET ?;
""",
(int(limit), int(offset)),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_low_confidence_flags(
db_path: Optional[str] = None, reviewed: bool = False, limit: int = 50
) -> List[Dict[str, Any]]:
conn = _connect(db_path=db_path)
try:
rows = conn.execute(
"""
SELECT *
FROM low_confidence_flags
WHERE reviewed=?
ORDER BY id DESC
LIMIT ?;
""",
(1 if reviewed else 0, int(limit)),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def mark_reviewed(request_id: str, note: Optional[str] = None, db_path: Optional[str] = None) -> None:
with _WRITE_LOCK:
conn = _connect(db_path=db_path)
try:
conn.execute(
"""
UPDATE low_confidence_flags
SET reviewed=1, review_note=?
WHERE request_id=?;
""",
(note, request_id),
)
conn.commit()
finally:
conn.close()
def get_model_leaderboard(db_path: Optional[str] = None) -> List[Tuple[str, int, float, float]]:
conn = _connect(db_path=db_path)
try:
rows = conn.execute(
"""
SELECT
model_name,
COUNT(*) AS total_requests,
AVG(confidence) AS avg_confidence,
AVG(latency_ms) AS avg_latency_ms
FROM requests
GROUP BY model_name
ORDER BY total_requests DESC;
"""
).fetchall()
return [
(
str(r["model_name"]),
int(r["total_requests"]),
float(r["avg_confidence"] or 0.0),
float(r["avg_latency_ms"] or 0.0),
)
for r in rows
]
finally:
conn.close()
def get_summary(
db_path: Optional[str] = None, model_name: Optional[str] = None, days: int = 7
) -> Dict[str, Any]:
conn = _connect(db_path=db_path)
try:
start_ts = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat()
params: List[Any] = [start_ts]
where = "WHERE timestamp >= ?"
if model_name:
where += " AND model_name = ?"
params.append(model_name)
row = conn.execute(
f"""
SELECT
COUNT(*) AS total_requests,
AVG(confidence) AS avg_confidence,
AVG(latency_ms) AS avg_latency_ms,
SUM(is_low_confidence) AS low_confidence_count
FROM requests
{where};
""",
tuple(params),
).fetchone()
total_requests = int(row["total_requests"] or 0)
avg_confidence = float(row["avg_confidence"] or 0.0)
avg_latency_ms = float(row["avg_latency_ms"] or 0.0)
low_conf_count = int(row["low_confidence_count"] or 0)
rate = (low_conf_count / total_requests) * 100.0 if total_requests > 0 else 0.0
params2: List[Any] = list(params)
where2 = where
models = conn.execute(
f"""
SELECT DISTINCT model_name
FROM requests
{where2}
ORDER BY model_name;
""",
tuple(params2),
).fetchall()
models_used = [str(r["model_name"]) for r in models]
label_rows = conn.execute(
f"""
SELECT predicted_label, COUNT(*) AS c
FROM requests
{where2}
GROUP BY predicted_label;
""",
tuple(params2),
).fetchall()
predictions_by_label = {str(r["predicted_label"]): int(r["c"]) for r in label_rows}
return {
"period_days": int(days),
"total_requests": total_requests,
"models_used": models_used,
"avg_confidence": round(avg_confidence, 3),
"avg_latency_ms": round(avg_latency_ms, 2),
"low_confidence_count": low_conf_count,
"low_confidence_rate": f"{rate:.2f}%",
"predictions_by_label": predictions_by_label,
}
finally:
conn.close()
def export_low_confidence_to_folder(
output_dir: str = os.path.join("logs", "low_confidence_review"),
) -> Dict[str, Any]:
os.makedirs(output_dir, exist_ok=True)
flags = get_low_confidence_flags(reviewed=False, limit=10_000)
exported = 0
for f in flags:
request_id = str(f["request_id"])
ts = str(f["timestamp"]).replace(":", "-")
filename = f"{ts}_{request_id}.txt"
path = os.path.join(output_dir, filename)
if os.path.exists(path):
continue
content = "\n".join(
[
f"request_id: {request_id}",
f"timestamp: {f['timestamp']}",
f"predicted_label: {f['predicted_label']}",
f"confidence: {float(f['confidence']):.4f}",
"",
str(f["input_text"]),
]
)
with open(path, "w", encoding="utf-8") as out:
out.write(content)
exported += 1
return {"exported": exported, "folder": output_dir}