Spaces:
Running
Running
| """ | |
| data/db.py β SQLite persistence for feelin' | |
| """ | |
| from __future__ import annotations | |
| import pandas as pd | |
| import uuid | |
| import os | |
| from pathlib import Path | |
| # ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path("/data") | |
| POSTS_CSV = BASE_DIR / "posts.csv" | |
| PODCASTS_CSV = BASE_DIR / "podcasts.csv" | |
| SFX_DIR = BASE_DIR / "sfx" | |
| POD_DIR = BASE_DIR / "podcasts" | |
| SEG_DIR = BASE_DIR / "segments" | |
| for d in [SFX_DIR, POD_DIR, SEG_DIR]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| # ββ Schema defaults βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| POSTS_COLS = { | |
| "id": str, "text": str, "created_at": str, | |
| "is_brag": int, "brag_roast": str, "emotion": str, | |
| "score_hilarious": float, "score_tragic": float, | |
| "score_unhinged": float, "score_awkward": float, "score_chaotic": float, | |
| "top_category": str, "top_score": float, "commentary": str, | |
| } | |
| PODCASTS_COLS = { | |
| "id": str, "category": str, "script": str, | |
| "audio_path": str, "created_at": str, "post_count": int, | |
| } | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _now() -> str: | |
| from datetime import datetime, timezone | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") | |
| def _load(path: Path, cols: dict) -> pd.DataFrame: | |
| """Load CSV or return empty DataFrame with correct schema.""" | |
| if path.exists(): | |
| df = pd.read_csv(path, dtype=str) # read all as str first | |
| # cast to proper types, fill missing cols | |
| for col, typ in cols.items(): | |
| if col not in df.columns: | |
| df[col] = None | |
| try: | |
| df[col] = df[col].astype(typ) | |
| except (ValueError, TypeError): | |
| pass | |
| return df | |
| return pd.DataFrame({c: pd.Series(dtype=t) for c, t in cols.items()}) | |
| def _save(df: pd.DataFrame, path: Path): | |
| df.to_csv(path, index=False) | |
| # ββ Init (no-op for pandas, just ensures dirs exist) βββββββββββββββββββββββββ | |
| def init_db(): | |
| for d in [SFX_DIR, POD_DIR, SEG_DIR]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| # ββ Posts βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def insert_post(text: str) -> str: | |
| post_id = str(uuid.uuid4()) | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| new_row = {c: None for c in POSTS_COLS} | |
| new_row.update({"id": post_id, "text": text, "created_at": _now(), | |
| "is_brag": 0, "score_hilarious": 0.0, "score_tragic": 0.0, | |
| "score_unhinged": 0.0, "score_awkward": 0.0, "score_chaotic": 0.0, | |
| "top_score": 0.0}) | |
| df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
| _save(df, POSTS_CSV) | |
| return post_id | |
| def flag_brag(post_id: str, roast: str): | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| mask = df["id"] == post_id | |
| df.loc[mask, "is_brag"] = 1 | |
| df.loc[mask, "brag_roast"] = roast | |
| _save(df, POSTS_CSV) | |
| def update_post_scores(post_id: str, scores: dict, top_category: str, top_score: float, commentary: str): | |
| if not scores: | |
| return | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| mask = df["id"] == post_id | |
| for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]: | |
| df.loc[mask, f"score_{cat}"] = scores.get(cat, 0) | |
| df.loc[mask, "top_category"] = top_category | |
| df.loc[mask, "top_score"] = top_score | |
| df.loc[mask, "commentary"] = commentary | |
| _save(df, POSTS_CSV) | |
| def get_leaderboard(category: str, limit: int = 10) -> list[dict]: | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| col = f"score_{category}" | |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) | |
| df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) | |
| result = ( | |
| df[(df["is_brag"] == 0) & (df[col] > 0)] | |
| .sort_values(col, ascending=False) | |
| .head(limit) | |
| .copy() | |
| ) | |
| # Rename BEFORE selecting to avoid duplicate "score" column | |
| result = result.rename(columns={col: "score"}) | |
| keep = ["id", "text", "score", | |
| "score_hilarious", "score_tragic", "score_unhinged", | |
| "score_awkward", "score_chaotic", | |
| "top_category", "commentary", "created_at"] | |
| keep = [c for c in keep if c in result.columns] | |
| return result[keep].to_dict(orient="records") | |
| def get_top_posts_for_podcast(category: str, limit: int = 10) -> list[dict]: | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| col = f"score_{category}" | |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) | |
| df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) | |
| result = ( | |
| df[(df["is_brag"] == 0) & (df[col] >= 5.0)] | |
| .sort_values(col, ascending=False) | |
| .head(limit) | |
| ) | |
| return result[["id", "text", col, "commentary"]].rename(columns={col: "score"}).to_dict(orient="records") | |
| def get_stats() -> dict: | |
| df = _load(POSTS_CSV, POSTS_COLS) | |
| if df.empty: | |
| return {"total_posts": 0, "brags_caught": 0, | |
| "categories": {c: 0 for c in ["hilarious","tragic","unhinged","awkward","chaotic"]}} | |
| df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0) | |
| cats = {} | |
| for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]: | |
| col = f"score_{cat}" | |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) | |
| cats[cat] = int(((df["is_brag"] == 0) & (df[col] > 0)).sum()) | |
| return { | |
| "total_posts": len(df), | |
| "brags_caught": int((df["is_brag"] == 1).sum()), | |
| "categories": cats, | |
| } | |
| # ββ Podcasts ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_podcast(category: str, script: str, audio_path: str, post_count: int) -> str: | |
| pod_id = str(uuid.uuid4()) | |
| df = _load(PODCASTS_CSV, PODCASTS_COLS) | |
| new_row = {"id": pod_id, "category": category, "script": script, | |
| "audio_path": audio_path, "created_at": _now(), "post_count": post_count} | |
| df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
| _save(df, PODCASTS_CSV) | |
| return pod_id | |
| def get_latest_podcast(category: str) -> dict | None: | |
| df = _load(PODCASTS_CSV, PODCASTS_COLS) | |
| filtered = df[df["category"] == category] | |
| if filtered.empty: | |
| return None | |
| return filtered.sort_values("created_at", ascending=False).iloc[0].to_dict() | |