Algoscope / app /database.py
GitHub Actions
Deploy to HuggingFace Spaces
090c1e7
"""
SQLite database for storing post classification results.
ARCHITECTURE NOTE (interview talking point):
All persistence is isolated in this file. No other module imports sqlite3
directly. This means swapping SQLite for PostgreSQL or any other store
requires changing only this one file — the rest of the codebase is
completely unaware of how data is stored.
"""
import logging
import os
import sqlite3
from typing import Any
logger = logging.getLogger(__name__)
_temp_dir = os.environ.get("TMPDIR")
if not _temp_dir:
_temp_dir = os.environ.get("TEMP") or os.environ.get("TMP") or "/tmp"
DB_PATH = os.environ.get(
"ALGOSCOPE_DB_PATH",
os.path.join(_temp_dir, "algoscope.db"),
)
_db_initialized = False
def _get_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
"""Create tables if they don't exist. Safe to call multiple times."""
with _get_connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
text TEXT NOT NULL,
label TEXT NOT NULL,
score REAL NOT NULL,
platform TEXT NOT NULL,
query_term TEXT NOT NULL DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
try:
conn.execute("ALTER TABLE posts ADD COLUMN query_term TEXT NOT NULL DEFAULT ''")
except sqlite3.OperationalError:
pass
conn.commit()
def _ensure_init() -> None:
"""Initialize DB once per process, not on every call."""
global _db_initialized
if not _db_initialized:
init_db()
_db_initialized = True
def save_post(
text: str,
label: str,
score: float,
platform: str,
query_term: str = "",
) -> None:
"""Insert a classified post into the posts table."""
_ensure_init()
with _get_connection() as conn:
conn.execute(
"INSERT INTO posts (text, label, score, platform, query_term) VALUES (?, ?, ?, ?, ?)",
(text, label, score, platform, query_term),
)
conn.commit()
def get_recent_posts(limit: int = 100) -> list[dict[str, Any]]:
"""Return the most recent posts as a list of dicts, newest first."""
_ensure_init()
with _get_connection() as conn:
cursor = conn.execute(
"""
SELECT id, text, label, score, platform, query_term, created_at
FROM posts
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_post_count() -> int:
"""Return total number of posts in the DB."""
_ensure_init()
with _get_connection() as conn:
return conn.execute("SELECT COUNT(*) FROM posts").fetchone()[0]
def seed_if_empty() -> None:
"""
If the DB is empty (cold start or HF ephemeral filesystem wipe), fetch
a small batch of real posts from Bluesky and classify them so the
dashboard has data immediately without requiring the user to click FETCH.
WHY this is safe now (it was disabled before):
Previously this ran at module import time, triggering a model download
before uvicorn bound to port 7860, killing the container with no logs.
Now it is called from lifespan() AFTER the server is up and AFTER the
classifier has loaded. A failure here is non-fatal.
WHY 4 queries at limit=32 (not all queries at full limit):
Seeding is best-effort background work. ~30 posts is enough to populate
all dashboard widgets. Seeding all queries would add 10-30s to cold
start time, unacceptable for a free-tier Space that restarts often.
"""
_ensure_init()
count = get_post_count()
if count > 0:
logger.info("seed_if_empty: DB has %d posts, skipping seed", count)
return
logger.info("seed_if_empty: DB is empty, seeding from Bluesky...")
try:
from app.ingestion import ALGOSPEAK_QUERIES, fetch_posts
from app.model import ToxicityClassifier
classifier = ToxicityClassifier()
if classifier._pipeline is None:
logger.warning("seed_if_empty: classifier not ready, skipping seed")
return
seed_queries = ALGOSPEAK_QUERIES[:4]
posts = fetch_posts(query=seed_queries[0], limit=32, queries=seed_queries)
if not posts:
logger.warning("seed_if_empty: no posts returned from Bluesky")
return
texts = [t for t, _ in posts]
timestamps = [ts for _, ts in posts]
predictions = classifier.predict_batch(texts)
for text, ts, pred in zip(texts, timestamps, predictions):
score = float(pred.get("score", 0.0) or 0.0)
label = "toxic" if score >= 0.70 else "non-toxic"
matched = next(
(q for q in seed_queries if q and q.lower() in text.lower()),
seed_queries[0],
)
save_post(text=text, label=label, score=score, platform="bluesky", query_term=matched)
logger.info("seed_if_empty: seeded %d posts", len(texts))
except Exception as exc:
# WHY catch-all: Bluesky credentials may not be set, the network may
# be unavailable, or the model may not have loaded. The app must start
# regardless - the user can always click FETCH manually.
logger.warning("seed_if_empty: failed (non-fatal): %s", exc)