"""Supabase REST client (via httpx) for blind-test storage. We call PostgREST endpoints directly to avoid pulling the full `supabase` Python SDK (which drags `realtime` → `websockets>=13`, conflicting with gradio-client 1.3's `websockets<13` pin). Env vars: SUPABASE_URL — project URL (e.g. https://xxx.supabase.co) SUPABASE_KEY — anon or service_role key If either is missing, all writes become no-ops. """ from __future__ import annotations import os from dataclasses import dataclass from typing import Any import httpx _URL = os.getenv("SUPABASE_URL", "").strip().rstrip("/") _KEY = os.getenv("SUPABASE_KEY", "").strip() _last_error: str | None = None print( f"[blindtest.db] httpx-rest" f" | SUPABASE_URL={'set' if _URL else 'MISSING'}" f" | SUPABASE_KEY={'set' if _KEY else 'MISSING'}" ) def is_configured() -> bool: return bool(_URL and _KEY) def last_error() -> str | None: return _last_error def _record_error(where: str, exc: Exception) -> None: global _last_error _last_error = f"{where}: {exc}" print(f"[blindtest.db] {_last_error}") def _headers(prefer_return: bool = True) -> dict[str, str]: h = { "apikey": _KEY, "Authorization": f"Bearer {_KEY}", "Content-Type": "application/json", } if prefer_return: h["Prefer"] = "return=representation" return h def _post(table: str, row: dict) -> list[dict]: url = f"{_URL}/rest/v1/{table}" with httpx.Client(timeout=10.0) as c: resp = c.post(url, headers=_headers(True), json=row) if resp.status_code >= 300: raise RuntimeError(f"{resp.status_code} {resp.text}") return resp.json() if resp.content else [] def _get(table: str, params: dict[str, str] | None = None) -> list[dict]: url = f"{_URL}/rest/v1/{table}" with httpx.Client(timeout=10.0) as c: resp = c.get(url, headers=_headers(False), params=params or {}) if resp.status_code >= 300: raise RuntimeError(f"{resp.status_code} {resp.text}") return resp.json() if resp.content else [] @dataclass class RunRecord: run_id: int | None pipeline_key: str output: str processing_time_s: float @dataclass class TaskRecord: task_id: int | None article_id: int | None slot_a_run: RunRecord slot_b_run: RunRecord def save_article(source_text: str) -> int | None: if not is_configured(): return None try: data = _post("articles", {"source_text": source_text}) return data[0]["id"] if data else None except Exception as exc: _record_error("save_article", exc) return None def save_pipeline_run( article_id: int | None, pipeline_key: str, prompt_key: str, model: str, output: str, processing_time_s: float, ) -> int | None: if not is_configured() or article_id is None: return None try: data = _post( "pipeline_runs", { "article_id": article_id, "pipeline_key": pipeline_key, "prompt_key": prompt_key, "model": model, "output": output, "processing_time_s": processing_time_s, }, ) return data[0]["id"] if data else None except Exception as exc: _record_error("save_pipeline_run", exc) return None def save_task( article_id: int | None, run_a_id: int | None, run_b_id: int | None, ) -> int | None: if not is_configured() or article_id is None: return None try: data = _post( "tasks", {"article_id": article_id, "run_a_id": run_a_id, "run_b_id": run_b_id}, ) return data[0]["id"] if data else None except Exception as exc: _record_error("save_task", exc) return None def save_rating(pipeline_run_id: int | None, rating: str, comment: str) -> bool: """Save Good / Not Bad / Critical rating for a single pipeline run.""" if not is_configured() or pipeline_run_id is None: return False try: _post( "ratings", { "pipeline_run_id": pipeline_run_id, "rating": rating, "comment": comment or None, }, ) return True except Exception as exc: _record_error("save_rating", exc) return False def fetch_rating_counts() -> dict[str, int]: if not is_configured(): return {} try: rows = _get("ratings", {"select": "rating"}) counts: dict[str, int] = {} for row in rows: counts[row["rating"]] = counts.get(row["rating"], 0) + 1 return counts except Exception as exc: _record_error("fetch_rating_counts", exc) return {} def save_vote(task_id: int | None, choice: str, comment: str) -> bool: if not is_configured() or task_id is None: return False try: _post( "votes", {"task_id": task_id, "choice": choice, "comment": comment or None}, ) return True except Exception as exc: _record_error("save_vote", exc) return False def fetch_summary() -> list[dict]: if not is_configured(): return [] try: return _get("vote_summary", {"select": "*"}) except Exception as exc: _record_error("fetch_summary", exc) return [] def fetch_vote_counts() -> dict[str, int]: if not is_configured(): return {} try: rows = _get("votes", {"select": "choice"}) counts: dict[str, int] = {} for row in rows: counts[row["choice"]] = counts.get(row["choice"], 0) + 1 return counts except Exception as exc: _record_error("fetch_vote_counts", exc) return {}