feelin / data /db.py
ArkenB's picture
Update data/db.py
9ad4ee8 verified
Raw
History Blame Contribute Delete
7.06 kB
"""
data/db.py β€” SQLite persistence for feelin'
"""
from __future__ import annotations
import pandas as pd
import uuid
import os
from pathlib import Path
# ── Paths ────────────────────────────────────────────────────────────────────
BASE_DIR = Path("/data")
POSTS_CSV = BASE_DIR / "posts.csv"
PODCASTS_CSV = BASE_DIR / "podcasts.csv"
SFX_DIR = BASE_DIR / "sfx"
POD_DIR = BASE_DIR / "podcasts"
SEG_DIR = BASE_DIR / "segments"
for d in [SFX_DIR, POD_DIR, SEG_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ── Schema defaults ───────────────────────────────────────────────────────────
POSTS_COLS = {
"id": str, "text": str, "created_at": str,
"is_brag": int, "brag_roast": str, "emotion": str,
"score_hilarious": float, "score_tragic": float,
"score_unhinged": float, "score_awkward": float, "score_chaotic": float,
"top_category": str, "top_score": float, "commentary": str,
}
PODCASTS_COLS = {
"id": str, "category": str, "script": str,
"audio_path": str, "created_at": str, "post_count": int,
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def _load(path: Path, cols: dict) -> pd.DataFrame:
"""Load CSV or return empty DataFrame with correct schema."""
if path.exists():
df = pd.read_csv(path, dtype=str) # read all as str first
# cast to proper types, fill missing cols
for col, typ in cols.items():
if col not in df.columns:
df[col] = None
try:
df[col] = df[col].astype(typ)
except (ValueError, TypeError):
pass
return df
return pd.DataFrame({c: pd.Series(dtype=t) for c, t in cols.items()})
def _save(df: pd.DataFrame, path: Path):
df.to_csv(path, index=False)
# ── Init (no-op for pandas, just ensures dirs exist) ─────────────────────────
def init_db():
for d in [SFX_DIR, POD_DIR, SEG_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ── Posts ─────────────────────────────────────────────────────────────────────
def insert_post(text: str) -> str:
post_id = str(uuid.uuid4())
df = _load(POSTS_CSV, POSTS_COLS)
new_row = {c: None for c in POSTS_COLS}
new_row.update({"id": post_id, "text": text, "created_at": _now(),
"is_brag": 0, "score_hilarious": 0.0, "score_tragic": 0.0,
"score_unhinged": 0.0, "score_awkward": 0.0, "score_chaotic": 0.0,
"top_score": 0.0})
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
_save(df, POSTS_CSV)
return post_id
def flag_brag(post_id: str, roast: str):
df = _load(POSTS_CSV, POSTS_COLS)
mask = df["id"] == post_id
df.loc[mask, "is_brag"] = 1
df.loc[mask, "brag_roast"] = roast
_save(df, POSTS_CSV)
def update_post_scores(post_id: str, scores: dict, top_category: str, top_score: float, commentary: str):
if not scores:
return
df = _load(POSTS_CSV, POSTS_COLS)
mask = df["id"] == post_id
for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]:
df.loc[mask, f"score_{cat}"] = scores.get(cat, 0)
df.loc[mask, "top_category"] = top_category
df.loc[mask, "top_score"] = top_score
df.loc[mask, "commentary"] = commentary
_save(df, POSTS_CSV)
def get_leaderboard(category: str, limit: int = 10) -> list[dict]:
df = _load(POSTS_CSV, POSTS_COLS)
col = f"score_{category}"
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0)
result = (
df[(df["is_brag"] == 0) & (df[col] > 0)]
.sort_values(col, ascending=False)
.head(limit)
.copy()
)
# Rename BEFORE selecting to avoid duplicate "score" column
result = result.rename(columns={col: "score"})
keep = ["id", "text", "score",
"score_hilarious", "score_tragic", "score_unhinged",
"score_awkward", "score_chaotic",
"top_category", "commentary", "created_at"]
keep = [c for c in keep if c in result.columns]
return result[keep].to_dict(orient="records")
def get_top_posts_for_podcast(category: str, limit: int = 10) -> list[dict]:
df = _load(POSTS_CSV, POSTS_COLS)
col = f"score_{category}"
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0)
result = (
df[(df["is_brag"] == 0) & (df[col] >= 5.0)]
.sort_values(col, ascending=False)
.head(limit)
)
return result[["id", "text", col, "commentary"]].rename(columns={col: "score"}).to_dict(orient="records")
def get_stats() -> dict:
df = _load(POSTS_CSV, POSTS_COLS)
if df.empty:
return {"total_posts": 0, "brags_caught": 0,
"categories": {c: 0 for c in ["hilarious","tragic","unhinged","awkward","chaotic"]}}
df["is_brag"] = pd.to_numeric(df["is_brag"], errors="coerce").fillna(0)
cats = {}
for cat in ["hilarious", "tragic", "unhinged", "awkward", "chaotic"]:
col = f"score_{cat}"
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
cats[cat] = int(((df["is_brag"] == 0) & (df[col] > 0)).sum())
return {
"total_posts": len(df),
"brags_caught": int((df["is_brag"] == 1).sum()),
"categories": cats,
}
# ── Podcasts ──────────────────────────────────────────────────────────────────
def save_podcast(category: str, script: str, audio_path: str, post_count: int) -> str:
pod_id = str(uuid.uuid4())
df = _load(PODCASTS_CSV, PODCASTS_COLS)
new_row = {"id": pod_id, "category": category, "script": script,
"audio_path": audio_path, "created_at": _now(), "post_count": post_count}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
_save(df, PODCASTS_CSV)
return pod_id
def get_latest_podcast(category: str) -> dict | None:
df = _load(PODCASTS_CSV, PODCASTS_COLS)
filtered = df[df["category"] == category]
if filtered.empty:
return None
return filtered.sort_values("created_at", ascending=False).iloc[0].to_dict()