File size: 5,665 Bytes
090c1e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
SQLite database for storing post classification results.

ARCHITECTURE NOTE (interview talking point):
All persistence is isolated in this file. No other module imports sqlite3
directly. This means swapping SQLite for PostgreSQL or any other store
requires changing only this one file — the rest of the codebase is
completely unaware of how data is stored.
"""

import logging
import os
import sqlite3
from typing import Any

logger = logging.getLogger(__name__)

_temp_dir = os.environ.get("TMPDIR")
if not _temp_dir:
    _temp_dir = os.environ.get("TEMP") or os.environ.get("TMP") or "/tmp"
DB_PATH = os.environ.get(
    "ALGOSCOPE_DB_PATH",
    os.path.join(_temp_dir, "algoscope.db"),
)

_db_initialized = False


def _get_connection() -> sqlite3.Connection:
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_db() -> None:
    """Create tables if they don't exist. Safe to call multiple times."""
    with _get_connection() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS posts (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                text        TEXT    NOT NULL,
                label       TEXT    NOT NULL,
                score       REAL    NOT NULL,
                platform    TEXT    NOT NULL,
                query_term  TEXT    NOT NULL DEFAULT '',
                created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
        )
        try:
            conn.execute("ALTER TABLE posts ADD COLUMN query_term TEXT NOT NULL DEFAULT ''")
        except sqlite3.OperationalError:
            pass
        conn.commit()


def _ensure_init() -> None:
    """Initialize DB once per process, not on every call."""
    global _db_initialized
    if not _db_initialized:
        init_db()
        _db_initialized = True


def save_post(
    text: str,
    label: str,
    score: float,
    platform: str,
    query_term: str = "",
) -> None:
    """Insert a classified post into the posts table."""
    _ensure_init()
    with _get_connection() as conn:
        conn.execute(
            "INSERT INTO posts (text, label, score, platform, query_term) VALUES (?, ?, ?, ?, ?)",
            (text, label, score, platform, query_term),
        )
        conn.commit()


def get_recent_posts(limit: int = 100) -> list[dict[str, Any]]:
    """Return the most recent posts as a list of dicts, newest first."""
    _ensure_init()
    with _get_connection() as conn:
        cursor = conn.execute(
            """
            SELECT id, text, label, score, platform, query_term, created_at
            FROM posts
            ORDER BY created_at DESC
            LIMIT ?
            """,
            (limit,),
        )
        rows = cursor.fetchall()
    return [dict(row) for row in rows]


def get_post_count() -> int:
    """Return total number of posts in the DB."""
    _ensure_init()
    with _get_connection() as conn:
        return conn.execute("SELECT COUNT(*) FROM posts").fetchone()[0]


def seed_if_empty() -> None:
    """
    If the DB is empty (cold start or HF ephemeral filesystem wipe), fetch
    a small batch of real posts from Bluesky and classify them so the
    dashboard has data immediately without requiring the user to click FETCH.

    WHY this is safe now (it was disabled before):
    Previously this ran at module import time, triggering a model download
    before uvicorn bound to port 7860, killing the container with no logs.
    Now it is called from lifespan() AFTER the server is up and AFTER the
    classifier has loaded. A failure here is non-fatal.

    WHY 4 queries at limit=32 (not all queries at full limit):
    Seeding is best-effort background work. ~30 posts is enough to populate
    all dashboard widgets. Seeding all queries would add 10-30s to cold
    start time, unacceptable for a free-tier Space that restarts often.
    """
    _ensure_init()
    count = get_post_count()
    if count > 0:
        logger.info("seed_if_empty: DB has %d posts, skipping seed", count)
        return

    logger.info("seed_if_empty: DB is empty, seeding from Bluesky...")
    try:
        from app.ingestion import ALGOSPEAK_QUERIES, fetch_posts
        from app.model import ToxicityClassifier

        classifier = ToxicityClassifier()
        if classifier._pipeline is None:
            logger.warning("seed_if_empty: classifier not ready, skipping seed")
            return

        seed_queries = ALGOSPEAK_QUERIES[:4]
        posts = fetch_posts(query=seed_queries[0], limit=32, queries=seed_queries)
        if not posts:
            logger.warning("seed_if_empty: no posts returned from Bluesky")
            return

        texts = [t for t, _ in posts]
        timestamps = [ts for _, ts in posts]
        predictions = classifier.predict_batch(texts)

        for text, ts, pred in zip(texts, timestamps, predictions):
            score = float(pred.get("score", 0.0) or 0.0)
            label = "toxic" if score >= 0.70 else "non-toxic"
            matched = next(
                (q for q in seed_queries if q and q.lower() in text.lower()),
                seed_queries[0],
            )
            save_post(text=text, label=label, score=score, platform="bluesky", query_term=matched)

        logger.info("seed_if_empty: seeded %d posts", len(texts))
    except Exception as exc:
        # WHY catch-all: Bluesky credentials may not be set, the network may
        # be unavailable, or the model may not have loaded. The app must start
        # regardless - the user can always click FETCH manually.
        logger.warning("seed_if_empty: failed (non-fatal): %s", exc)