""" data/db.py — SQLite persistence for feelin' """ from __future__ import annotations import pandas as pd import uuid import os from pathlib import Path # ── Paths ──────────────────────────────────────────────────────────────────── BASE_DIR = Path("/data") POSTS_CSV = BASE_DIR / "posts.csv" PODCASTS_CSV = BASE_DIR / "podcasts.csv" SFX_DIR = BASE_DIR / "sfx" POD_DIR = BASE_DIR / "podcasts" SEG_DIR = BASE_DIR / "segments" for d in [SFX_DIR, POD_DIR, SEG_DIR]: d.mkdir(parents=True, exist_ok=True) # ── Schema defaults ─────────────────────────────────────────────────────────── POSTS_COLS = { "id": str, "text": str, "created_at": str, "is_brag": int, "brag_roast": str, "emotion": str, "score_hilarious": float, "score_tragic": float, "score_unhinged": float, "score_awkward": float, "score_chaotic": float, "top_category": str, "top_score": float, "commentary": str, } PODCASTS_COLS = { "id": str, "category": str, "script": str, "audio_path": str, "created_at": str, "post_count": int, } # ── Helpers ─────────────────────────────────────────────────────────────────── def _now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") def _load(path: Path, cols: dict) -> pd.DataFrame: """Load CSV or return empty DataFrame with correct schema.""" if path.exists(): df = pd.read_csv(path, dtype=str) # read all as str first # cast to proper types, fill missing cols for col, typ in cols.items(): if col not in df.columns: df[col] = None try: df[col] = df[col].astype(typ) except (ValueError, TypeError): pass return df return pd.DataFrame({c: pd.Series(dtype=t) for c, t in cols.items()}) def _save(df: pd.DataFrame, path: Path): df.to_csv(path, index=False) # ── Init (no-op for pandas, just ensures dirs exist) ───────────────────────── def init_db(): for d in [SFX_DIR, POD_DIR, SEG_DIR]: d.mkdir(parents=True, exist_ok=True) # ── Posts ───────────────────────────────────────────────────────────────────── def insert_post(text: str) -> str: post_id = str(uuid.uuid4()) df = _load(POSTS_CSV, POSTS_COLS) new_row = {c: None for c in POSTS_COLS} new_row.update({"id": post_id, "text": text, "created_at": _now(), "is_brag": 0, "score_hilarious": 0.0, "score_tragic": 0.0, "score_unhinged": 0.0, "score_awkward": 0.0, "score_chaotic": 0.0, "top_score": 0.0}) df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) _save(df, POSTS_CSV) return post_id def flag_brag(post_id: str, roast: str): df = _load(POSTS_CSV, POSTS_COLS) mask = df["id"] == post_id df.loc[mask, "is_brag"] = 1 df.loc[mask, "brag_roast"] = roast _save(df, POSTS_CSV) def update_post_scores(post_id: str, scores: dict, top_category: str, top_score: float, commentary: str): if not scores: return df = _load(POSTS_CSV, POSTS_COLS) mask = df["id"] == post_id for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]: df.loc[mask, f"score_{cat}"] = scores.get(cat, 0) df.loc[mask, "top_category"] = top_category df.loc[mask, "top_score"] = top_score df.loc[mask, "commentary"] = commentary _save(df, POSTS_CSV) def get_leaderboard(category: str, limit: int = 10) -> list[dict]: df = _load(POSTS_CSV, POSTS_COLS) col = f"score_{category}" df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) result = ( df[(df["is_brag"] == 0) & (df[col] > 0)] .sort_values(col, ascending=False) .head(limit) .copy() ) # Rename BEFORE selecting to avoid duplicate "score" column result = result.rename(columns={col: "score"}) keep = ["id", "text", "score", "score_hilarious", "score_tragic", "score_unhinged", "score_awkward", "score_chaotic", "top_category", "commentary", "created_at"] keep = [c for c in keep if c in result.columns] return result[keep].to_dict(orient="records") def get_top_posts_for_podcast(category: str, limit: int = 10) -> list[dict]: df = _load(POSTS_CSV, POSTS_COLS) col = f"score_{category}" df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) result = ( df[(df["is_brag"] == 0) & (df[col] >= 5.0)] .sort_values(col, ascending=False) .head(limit) ) return result[["id", "text", col, "commentary"]].rename(columns={col: "score"}).to_dict(orient="records") def get_stats() -> dict: df = _load(POSTS_CSV, POSTS_COLS) if df.empty: return {"total_posts": 0, "brags_caught": 0, "categories": {c: 0 for c in ["hilarious","tragic","unhinged","awkward","chaotic"]}} df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) cats = {} for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]: col = f"score_{cat}" df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) cats[cat] = int(((df["is_brag"] == 0) & (df[col] > 0)).sum()) return { "total_posts": len(df), "brags_caught": int((df["is_brag"] == 1).sum()), "categories": cats, } # ── Podcasts ────────────────────────────────────────────────────────────────── def save_podcast(category: str, script: str, audio_path: str, post_count: int) -> str: pod_id = str(uuid.uuid4()) df = _load(PODCASTS_CSV, PODCASTS_COLS) new_row = {"id": pod_id, "category": category, "script": script, "audio_path": audio_path, "created_at": _now(), "post_count": post_count} df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) _save(df, PODCASTS_CSV) return pod_id def get_latest_podcast(category: str) -> dict | None: df = _load(PODCASTS_CSV, PODCASTS_COLS) filtered = df[df["category"] == category] if filtered.empty: return None return filtered.sort_values("created_at", ascending=False).iloc[0].to_dict()