Spaces:
Sleeping
Sleeping
| # ============================================================================ | |
| # database.py -- Supabase PostgreSQL + pgvector persistence layer | |
| # ============================================================================ | |
| # | |
| # PURPOSE | |
| # ------- | |
| # Single module that owns ALL database interaction for the workbench. | |
| # Every other module (vectorstore, phase2_agent, phase3_themes, etc.) | |
| # imports from here. No other module should import psycopg2 directly. | |
| # | |
| # CONNECTION | |
| # ---------- | |
| # Reads SUPABASE_DB_URL from environment (set as HF Space secret). | |
| # Uses Session Pooler URL (IPv4 compatible with HuggingFace Spaces). | |
| # | |
| # TABLES | |
| # ------ | |
| # corpus -- uploaded sentences + MiniLM embeddings (vector 384) | |
| # codebook -- Phase 2 codebook (code_name, definition, ...) | |
| # coded_sentences -- Phase 2 per-sentence codes | |
| # themes -- Phase 3 candidate themes | |
| # theme_reviews -- Phase 4 reviewer verdicts | |
| # | |
| # DESIGN | |
| # ------ | |
| # + All tables have session_id (TEXT) so multiple researchers can share | |
| # one Supabase project without data collision. | |
| # + create_tables() is idempotent -- safe to call on every startup. | |
| # + All functions return plain Python dicts/lists -- no psycopg2 objects | |
| # leak out of this module. | |
| # + Graceful degradation: if SUPABASE_DB_URL is not set, all functions | |
| # return empty results and log a warning. The app keeps running. | |
| # ============================================================================ | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| # ---------------------------------------------------------------- | |
| # Connection | |
| # ---------------------------------------------------------------- | |
| _DB_URL = os.environ.get("SUPABASE_DB_URL", "") | |
| _conn_cache = None | |
| def _get_conn(): | |
| """Return a live psycopg2 connection (cached, auto-reconnect).""" | |
| global _conn_cache | |
| if not _DB_URL: | |
| raise RuntimeError( | |
| "SUPABASE_DB_URL not set. Add it as a Space secret." | |
| ) | |
| try: | |
| import psycopg2 | |
| import psycopg2.extras | |
| if _conn_cache is None or _conn_cache.closed: | |
| _conn_cache = psycopg2.connect(_DB_URL, connect_timeout=10) | |
| _conn_cache.autocommit = False | |
| # Ping to check liveness | |
| _conn_cache.cursor().execute("SELECT 1") | |
| return _conn_cache | |
| except Exception: | |
| # Force reconnect on next call | |
| _conn_cache = None | |
| import psycopg2 | |
| import psycopg2.extras | |
| _conn_cache = psycopg2.connect(_DB_URL, connect_timeout=10) | |
| _conn_cache.autocommit = False | |
| return _conn_cache | |
| def is_available() -> bool: | |
| """True if database is reachable.""" | |
| if not _DB_URL: | |
| return False | |
| try: | |
| conn = _get_conn() | |
| conn.cursor().execute("SELECT 1") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"[database] not available: {e}") | |
| return False | |
| # ---------------------------------------------------------------- | |
| # Schema bootstrap -- call once on startup | |
| # ---------------------------------------------------------------- | |
| CREATE_TABLES_SQL = """ | |
| CREATE EXTENSION IF NOT EXISTS vector; | |
| CREATE TABLE IF NOT EXISTS corpus ( | |
| id SERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL DEFAULT 'default', | |
| L1 TEXT, | |
| L2 TEXT, | |
| L3 TEXT, | |
| L4 TEXT, | |
| sentence_id TEXT, | |
| sentence TEXT NOT NULL, | |
| label TEXT, | |
| embedding vector(384), | |
| created_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS codebook ( | |
| id SERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL DEFAULT 'default', | |
| code_name TEXT NOT NULL, | |
| definition TEXT, | |
| provenance TEXT, | |
| sentence_count INT DEFAULT 1, | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| updated_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS coded_sentences ( | |
| id SERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL DEFAULT 'default', | |
| sentence_idx INT, | |
| sentence TEXT, | |
| ai_code_iter1 TEXT, | |
| ai_code_iter2 TEXT, | |
| ai_code_iter3 TEXT, | |
| human_code_iter1 TEXT, | |
| human_code_iter2 TEXT, | |
| human_code_iter3 TEXT, | |
| final_code TEXT, | |
| orientation TEXT, | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| updated_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS themes ( | |
| id SERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL DEFAULT 'default', | |
| theme_id INT, | |
| candidate_theme_name TEXT, | |
| description TEXT, | |
| rationale TEXT, | |
| member_codes TEXT, | |
| code_count INT, | |
| researcher_theme_name TEXT, | |
| researcher_notes TEXT, | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| updated_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS theme_reviews ( | |
| id SERIAL PRIMARY KEY, | |
| session_id TEXT NOT NULL DEFAULT 'default', | |
| theme_id INT, | |
| theme_name TEXT, | |
| member_codes TEXT, | |
| code_count INT, | |
| member_sentence_count INT, | |
| within_cohesion FLOAT, | |
| llm_verdict TEXT, | |
| llm_reasoning TEXT, | |
| llm_action_suggestion TEXT, | |
| researcher_verdict TEXT, | |
| researcher_action_notes TEXT, | |
| created_at TIMESTAMPTZ DEFAULT NOW(), | |
| updated_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS chats ( | |
| id SERIAL PRIMARY KEY, | |
| title TEXT, | |
| user_message TEXT, | |
| bot_message TEXT, | |
| topics_json JSONB, | |
| created_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE TABLE IF NOT EXISTS papers ( | |
| id SERIAL PRIMARY KEY, | |
| chat_id INT REFERENCES chats(id) ON DELETE CASCADE, | |
| title TEXT, | |
| abstract TEXT, | |
| doi TEXT, | |
| date_of_publication TEXT, | |
| journal TEXT, | |
| no_of_citations INT, | |
| web_link TEXT, | |
| authors TEXT, | |
| keywords TEXT, | |
| confidence_score FLOAT, | |
| paper_type TEXT, | |
| topic_label TEXT, | |
| embedding vector(384), | |
| created_at TIMESTAMPTZ DEFAULT NOW() | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_corpus_session ON corpus(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_codebook_session ON codebook(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_coded_session ON coded_sentences(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_themes_session ON themes(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_reviews_session ON theme_reviews(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_papers_chat ON papers(chat_id); | |
| CREATE INDEX IF NOT EXISTS idx_papers_topic ON papers(topic_label); | |
| """ | |
| def create_tables() -> bool: | |
| """Create all tables if they don't exist. Safe to call on every startup.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute(CREATE_TABLES_SQL) | |
| conn.commit() | |
| logger.info("[database] Tables ready.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"[database] create_tables error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return False | |
| # ---------------------------------------------------------------- | |
| # Corpus | |
| # ---------------------------------------------------------------- | |
| def save_corpus(rows: list[dict], session_id: str = "default") -> int: | |
| """ | |
| Save corpus sentences to database. | |
| Clears existing corpus for this session first (fresh load). | |
| Returns number of rows saved. | |
| """ | |
| if not rows: | |
| return 0 | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute("DELETE FROM corpus WHERE session_id = %s", (session_id,)) | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| """INSERT INTO corpus (session_id, L1, L2, L3, L4, sentence_id, sentence, label) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", | |
| [ | |
| ( | |
| session_id, | |
| r.get("L1", ""), | |
| r.get("L2", ""), | |
| r.get("L3", ""), | |
| r.get("L4", ""), | |
| r.get("sentence_id", ""), | |
| r.get("sentence", ""), | |
| r.get("label", ""), | |
| ) | |
| for r in rows | |
| ], | |
| ) | |
| conn.commit() | |
| return len(rows) | |
| except Exception as e: | |
| logger.error(f"[database] save_corpus error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def load_corpus(session_id: str = "default") -> list[dict]: | |
| """Load corpus for a session.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| "SELECT L1, L2, L3, L4, sentence_id, sentence, label " | |
| "FROM corpus WHERE session_id = %s ORDER BY id", | |
| (session_id,), | |
| ) | |
| cols = ["L1", "L2", "L3", "L4", "sentence_id", "sentence", "label"] | |
| return [dict(zip(cols, row)) for row in cur.fetchall()] | |
| except Exception as e: | |
| logger.error(f"[database] load_corpus error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Corpus embeddings (pgvector) | |
| # ---------------------------------------------------------------- | |
| def save_embeddings(sentence_embeddings: list[tuple[str, list[float]]], session_id: str = "default") -> int: | |
| """ | |
| Save sentence embeddings to corpus table. | |
| sentence_embeddings: list of (sentence_text, embedding_list) | |
| """ | |
| if not sentence_embeddings: | |
| return 0 | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| "UPDATE corpus SET embedding = %s::vector WHERE session_id = %s AND sentence = %s", | |
| [(json.dumps(emb), session_id, sent) for sent, emb in sentence_embeddings], | |
| ) | |
| conn.commit() | |
| return len(sentence_embeddings) | |
| except Exception as e: | |
| logger.error(f"[database] save_embeddings error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def similarity_search(query_embedding: list[float], session_id: str = "default", top_k: int = 5) -> list[dict]: | |
| """ | |
| Find top_k most similar sentences using pgvector cosine similarity. | |
| Returns list of dicts with sentence, label, similarity. | |
| """ | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """SELECT sentence, label, | |
| 1 - (embedding <=> %s::vector) AS similarity | |
| FROM corpus | |
| WHERE session_id = %s AND embedding IS NOT NULL | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s""", | |
| (json.dumps(query_embedding), session_id, json.dumps(query_embedding), top_k), | |
| ) | |
| return [ | |
| {"sentence": row[0], "label": row[1], "similarity": float(row[2])} | |
| for row in cur.fetchall() | |
| ] | |
| except Exception as e: | |
| logger.error(f"[database] similarity_search error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Phase 2 -- Codebook | |
| # ---------------------------------------------------------------- | |
| def save_codebook(codebook_rows: list[dict], session_id: str = "default") -> int: | |
| """Save full codebook (replaces existing for this session).""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute("DELETE FROM codebook WHERE session_id = %s", (session_id,)) | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| """INSERT INTO codebook (session_id, code_name, definition, provenance, sentence_count) | |
| VALUES (%s, %s, %s, %s, %s)""", | |
| [ | |
| ( | |
| session_id, | |
| r.get("code_name", ""), | |
| r.get("definition", ""), | |
| r.get("provenance", ""), | |
| int(r.get("sentence_count", 1)), | |
| ) | |
| for r in codebook_rows | |
| ], | |
| ) | |
| conn.commit() | |
| return len(codebook_rows) | |
| except Exception as e: | |
| logger.error(f"[database] save_codebook error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def load_codebook(session_id: str = "default") -> list[dict]: | |
| """Load codebook for a session.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| "SELECT code_name, definition, provenance, sentence_count " | |
| "FROM codebook WHERE session_id = %s ORDER BY id", | |
| (session_id,), | |
| ) | |
| cols = ["code_name", "definition", "provenance", "sentence_count"] | |
| return [dict(zip(cols, row)) for row in cur.fetchall()] | |
| except Exception as e: | |
| logger.error(f"[database] load_codebook error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Phase 2 -- Coded sentences | |
| # ---------------------------------------------------------------- | |
| def save_coded_sentences(coded_rows: list[dict], session_id: str = "default") -> int: | |
| """Save Phase 2 coded sentences (replaces existing for this session).""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute("DELETE FROM coded_sentences WHERE session_id = %s", (session_id,)) | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| """INSERT INTO coded_sentences | |
| (session_id, sentence_idx, sentence, | |
| ai_code_iter1, ai_code_iter2, ai_code_iter3, | |
| human_code_iter1, human_code_iter2, human_code_iter3, | |
| final_code, orientation) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", | |
| [ | |
| ( | |
| session_id, | |
| i, | |
| r.get("sentence", ""), | |
| r.get("ai_code_iter1", ""), | |
| r.get("ai_code_iter2", ""), | |
| r.get("ai_code_iter3", ""), | |
| r.get("human_code_iter1", ""), | |
| r.get("human_code_iter2", ""), | |
| r.get("human_code_iter3", ""), | |
| r.get("final_code", ""), | |
| r.get("orientation", "semantic"), | |
| ) | |
| for i, r in enumerate(coded_rows) | |
| ], | |
| ) | |
| conn.commit() | |
| return len(coded_rows) | |
| except Exception as e: | |
| logger.error(f"[database] save_coded_sentences error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def load_coded_sentences(session_id: str = "default") -> list[dict]: | |
| """Load Phase 2 coded sentences for a session.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """SELECT sentence_idx, sentence, | |
| ai_code_iter1, ai_code_iter2, ai_code_iter3, | |
| human_code_iter1, human_code_iter2, human_code_iter3, | |
| final_code, orientation | |
| FROM coded_sentences WHERE session_id = %s ORDER BY sentence_idx""", | |
| (session_id,), | |
| ) | |
| cols = [ | |
| "sentence_idx", "sentence", | |
| "ai_code_iter1", "ai_code_iter2", "ai_code_iter3", | |
| "human_code_iter1", "human_code_iter2", "human_code_iter3", | |
| "final_code", "orientation", | |
| ] | |
| return [dict(zip(cols, row)) for row in cur.fetchall()] | |
| except Exception as e: | |
| logger.error(f"[database] load_coded_sentences error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Phase 3 -- Themes | |
| # ---------------------------------------------------------------- | |
| def save_themes(themes_rows: list[dict], session_id: str = "default") -> int: | |
| """Save Phase 3 themes (replaces existing for this session).""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute("DELETE FROM themes WHERE session_id = %s", (session_id,)) | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| """INSERT INTO themes | |
| (session_id, theme_id, candidate_theme_name, description, | |
| rationale, member_codes, code_count, | |
| researcher_theme_name, researcher_notes) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""", | |
| [ | |
| ( | |
| session_id, | |
| int(r.get("theme_id", 0)), | |
| r.get("candidate_theme_name", ""), | |
| r.get("description", ""), | |
| r.get("rationale", ""), | |
| r.get("member_codes", ""), | |
| int(r.get("code_count", 0)), | |
| r.get("researcher_theme_name", ""), | |
| r.get("researcher_notes", ""), | |
| ) | |
| for r in themes_rows | |
| ], | |
| ) | |
| conn.commit() | |
| return len(themes_rows) | |
| except Exception as e: | |
| logger.error(f"[database] save_themes error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def load_themes(session_id: str = "default") -> list[dict]: | |
| """Load Phase 3 themes for a session.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """SELECT theme_id, candidate_theme_name, description, rationale, | |
| member_codes, code_count, researcher_theme_name, researcher_notes | |
| FROM themes WHERE session_id = %s ORDER BY theme_id""", | |
| (session_id,), | |
| ) | |
| cols = [ | |
| "theme_id", "candidate_theme_name", "description", "rationale", | |
| "member_codes", "code_count", "researcher_theme_name", "researcher_notes", | |
| ] | |
| return [dict(zip(cols, row)) for row in cur.fetchall()] | |
| except Exception as e: | |
| logger.error(f"[database] load_themes error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Phase 4 -- Theme reviews | |
| # ---------------------------------------------------------------- | |
| def save_theme_reviews(review_rows: list[dict], session_id: str = "default") -> int: | |
| """Save Phase 4 theme reviews (replaces existing for this session).""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute("DELETE FROM theme_reviews WHERE session_id = %s", (session_id,)) | |
| import psycopg2.extras | |
| psycopg2.extras.execute_batch( | |
| cur, | |
| """INSERT INTO theme_reviews | |
| (session_id, theme_id, theme_name, member_codes, code_count, | |
| member_sentence_count, within_cohesion, | |
| llm_verdict, llm_reasoning, llm_action_suggestion, | |
| researcher_verdict, researcher_action_notes) | |
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", | |
| [ | |
| ( | |
| session_id, | |
| int(r.get("theme_id", 0)), | |
| r.get("theme_name", ""), | |
| r.get("member_codes", ""), | |
| int(r.get("code_count", 0)), | |
| int(r.get("member_sentence_count", 0)), | |
| float(r.get("within_cohesion", 0.0)), | |
| r.get("llm_verdict", ""), | |
| r.get("llm_reasoning", ""), | |
| r.get("llm_action_suggestion", ""), | |
| r.get("researcher_verdict", ""), | |
| r.get("researcher_action_notes", ""), | |
| ) | |
| for r in review_rows | |
| ], | |
| ) | |
| conn.commit() | |
| return len(review_rows) | |
| except Exception as e: | |
| logger.error(f"[database] save_theme_reviews error: {e}") | |
| try: | |
| _get_conn().rollback() | |
| except Exception: | |
| pass | |
| return 0 | |
| def load_theme_reviews(session_id: str = "default") -> list[dict]: | |
| """Load Phase 4 theme reviews for a session.""" | |
| try: | |
| conn = _get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """SELECT theme_id, theme_name, member_codes, code_count, | |
| member_sentence_count, within_cohesion, | |
| llm_verdict, llm_reasoning, llm_action_suggestion, | |
| researcher_verdict, researcher_action_notes | |
| FROM theme_reviews WHERE session_id = %s ORDER BY theme_id""", | |
| (session_id,), | |
| ) | |
| cols = [ | |
| "theme_id", "theme_name", "member_codes", "code_count", | |
| "member_sentence_count", "within_cohesion", | |
| "llm_verdict", "llm_reasoning", "llm_action_suggestion", | |
| "researcher_verdict", "researcher_action_notes", | |
| ] | |
| return [dict(zip(cols, row)) for row in cur.fetchall()] | |
| except Exception as e: | |
| logger.error(f"[database] load_theme_reviews error: {e}") | |
| return [] | |
| # ---------------------------------------------------------------- | |
| # Startup check | |
| # ---------------------------------------------------------------- | |
| def startup_check() -> dict: | |
| """Run on app startup. Returns status dict for display in UI.""" | |
| status = {"db_available": False, "tables_created": False, "error": None} | |
| try: | |
| status["db_available"] = is_available() | |
| if status["db_available"]: | |
| status["tables_created"] = create_tables() | |
| except Exception as e: | |
| status["error"] = str(e) | |
| return status | |