import os import sqlite3 import json from contextlib import contextmanager import httpx DB_PATH = os.environ.get("DB_PATH", os.path.join(os.path.dirname(__file__), "..", "data", "boredcv.db")) TURSO_URL = os.environ.get("TURSO_DATABASE_URL", "") TURSO_TOKEN = os.environ.get("TURSO_AUTH_TOKEN", "") USE_TURSO = bool(TURSO_URL and TURSO_TOKEN) # --------------------------------------------------------------------------- # Turso HTTP adapter — uses httpx (already a dependency), zero new packages # --------------------------------------------------------------------------- def _turso_http_url(): """Convert libsql:// URL to https:// for the HTTP API.""" url = TURSO_URL if url.startswith("libsql://"): url = "https://" + url[len("libsql://"):] return url.rstrip("/") def _turso_execute(stmts: list[dict]) -> list[dict]: """Execute statements via Turso HTTP pipeline API.""" url = f"{_turso_http_url()}/v3/pipeline" requests = [] for s in stmts: req = {"type": "execute", "stmt": {"sql": s["sql"]}} if s.get("args"): req["stmt"]["args"] = [{"type": "text", "value": str(v)} if isinstance(v, str) else {"type": "integer", "value": str(v)} if isinstance(v, int) else {"type": "null"} if v is None else {"type": "text", "value": str(v)} for v in s["args"]] requests.append(req) requests.append({"type": "close"}) resp = httpx.post(url, json={"requests": requests}, headers={"Authorization": f"Bearer {TURSO_TOKEN}"}, timeout=30.0) resp.raise_for_status() return resp.json().get("results", []) class TursoRow: """Dict-like row — supports row["col"] and dict(row).""" def __init__(self, columns: list[str], values: list): self._data = {} for i, col in enumerate(columns): val = values[i] # Turso HTTP API returns typed objects: {"type": "integer", "value": "42"} if isinstance(val, dict) and "value" in val: raw = val["value"] vtype = val.get("type", "") if vtype == "integer": self._data[col] = int(raw) elif vtype == "float": self._data[col] = float(raw) else: self._data[col] = raw elif isinstance(val, dict) and val.get("type") == "null": self._data[col] = None else: self._data[col] = val def __getitem__(self, key): return self._data[key] def keys(self): return self._data.keys() def __iter__(self): return iter(self._data.values()) class TursoResult: """Wraps Turso HTTP response to match sqlite3.Cursor interface.""" def __init__(self, result: dict): resp = result.get("response", {}).get("result", {}) cols_raw = resp.get("cols", []) self._columns = [c.get("name", "") if isinstance(c, dict) else c for c in cols_raw] self._rows = [TursoRow(self._columns, r) for r in resp.get("rows", [])] self.lastrowid = resp.get("last_insert_rowid") def fetchall(self): return self._rows def fetchone(self): return self._rows[0] if self._rows else None class TursoConnection: """Sync connection wrapping Turso HTTP API — matches sqlite3.Connection interface.""" def execute(self, sql: str, params=None): args = list(params) if params else [] results = _turso_execute([{"sql": sql, "args": args}]) return TursoResult(results[0]) if results else TursoResult({}) def executescript(self, sql: str): statements = [s.strip() for s in sql.split(";") if s.strip()] stmts = [{"sql": s} for s in statements] results = _turso_execute(stmts) return TursoResult(results[-1]) if results else TursoResult({}) def commit(self): pass # Turso auto-commits def close(self): pass # HTTP — no persistent connection # --------------------------------------------------------------------------- # DB path helper (SQLite only) # --------------------------------------------------------------------------- def _get_local_db_path(): if os.path.isdir("/data"): return "/data/boredcv.db" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) return DB_PATH # --------------------------------------------------------------------------- # Context manager # --------------------------------------------------------------------------- @contextmanager def get_db(): if USE_TURSO: conn = TursoConnection() try: yield conn finally: conn.close() else: conn = sqlite3.connect(_get_local_db_path()) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") try: yield conn conn.commit() finally: conn.close() # --------------------------------------------------------------------------- # Schema init # --------------------------------------------------------------------------- def init_db(): with get_db() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, email TEXT NOT NULL, provider TEXT NOT NULL, marketing_consent INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS knowledge ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), company TEXT NOT NULL, company_context TEXT DEFAULT '', title TEXT NOT NULL, dates TEXT DEFAULT '', description TEXT DEFAULT '', facts TEXT DEFAULT '{}', best_bullets TEXT DEFAULT '[]', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), name TEXT NOT NULL, offer_title TEXT DEFAULT '', offer_url TEXT DEFAULT '', offer_data TEXT DEFAULT '{}', profile_data TEXT DEFAULT '{}', gap_analysis TEXT DEFAULT '{}', cv_data TEXT DEFAULT '{}', messages TEXT DEFAULT '[]', match_score INTEGER DEFAULT 0, template TEXT DEFAULT 'clean', tone TEXT DEFAULT 'startup', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS facts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL REFERENCES users(id), knowledge_id INTEGER REFERENCES knowledge(id), key TEXT NOT NULL, value TEXT NOT NULL, source_project_id INTEGER REFERENCES projects(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS rate_limits ( ip TEXT NOT NULL, timestamp REAL NOT NULL ); CREATE TABLE IF NOT EXISTS daily_budget ( day TEXT PRIMARY KEY, tokens_used INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_rate_limits_ip ON rate_limits(ip, timestamp); CREATE INDEX IF NOT EXISTS idx_knowledge_user ON knowledge(user_id); CREATE INDEX IF NOT EXISTS idx_projects_user ON projects(user_id); CREATE INDEX IF NOT EXISTS idx_facts_user ON facts(user_id); CREATE INDEX IF NOT EXISTS idx_facts_knowledge ON facts(knowledge_id) """) # Migration: add marketing_consent column if missing try: conn.execute("ALTER TABLE users ADD COLUMN marketing_consent INTEGER DEFAULT 0") except Exception: pass # Column already exists # Migration: add password_hash column for email/password auth. # NULL means "no local password" — those users only auth via OAuth. try: conn.execute("ALTER TABLE users ADD COLUMN password_hash TEXT DEFAULT NULL") except Exception: pass # Column already exists _migrate_namespace_user_ids() def _migrate_namespace_user_ids(): """Rewrite users.id from raw email to ``provider:email``. Why: the old schema used the email as primary key, which means a GitHub login with username ``foo@bar.com`` would land on the same row as the Google account for ``foo@bar.com`` — full account takeover via OAuth collision. Namespacing by provider closes that. Idempotent — uses ``LIKE '%:%'`` to detect already-migrated rows. Safe to call on every boot. Inserts new user row first, repoints all FK children, then deletes the old row, so no FK constraint is ever violated mid-migration. """ with get_db() as conn: try: row = conn.execute( "SELECT COUNT(*) as cnt FROM users WHERE id NOT LIKE '%:%'" ).fetchone() count = row["cnt"] if row else 0 if count == 0: return except Exception: return # users table missing — schema init will handle on next call rows = conn.execute( "SELECT id, email, provider, marketing_consent, created_at " "FROM users WHERE id NOT LIKE '%:%'" ).fetchall() for r in rows: old_id = r["id"] provider = r["provider"] or "" if not provider: # No provider — can't safely namespace. Skip and let manual # cleanup handle it. continue new_id = f"{provider}:{old_id}" # 1. Create new user row before any FK repoint, so children always # have a valid parent to point at. conn.execute( "INSERT OR IGNORE INTO users (id, email, provider, marketing_consent, created_at) " "VALUES (?, ?, ?, ?, ?)", (new_id, r["email"], provider, r["marketing_consent"], r["created_at"]), ) # 2. Repoint FK children. conn.execute("UPDATE knowledge SET user_id = ? WHERE user_id = ?", (new_id, old_id)) conn.execute("UPDATE projects SET user_id = ? WHERE user_id = ?", (new_id, old_id)) conn.execute("UPDATE facts SET user_id = ? WHERE user_id = ?", (new_id, old_id)) # snapshots has user_id but no FK constraint — repoint anyway for consistency. try: conn.execute("UPDATE snapshots SET user_id = ? WHERE user_id = ?", (new_id, old_id)) except Exception: pass # snapshots table may not exist yet # 3. Drop the old row. conn.execute("DELETE FROM users WHERE id = ?", (old_id,)) # Initialize on import init_db()