|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| import os
|
| import json
|
| import logging
|
| from datetime import datetime
|
| from typing import Optional
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
| _DB_URL = os.environ.get("SUPABASE_DB_URL", "")
|
| _conn_cache = None
|
|
|
|
|
| def _get_conn():
|
| """Return a live psycopg2 connection (cached, auto-reconnect)."""
|
| global _conn_cache
|
| if not _DB_URL:
|
| raise RuntimeError(
|
| "SUPABASE_DB_URL not set. Add it as a Space secret."
|
| )
|
| try:
|
| import psycopg2
|
| import psycopg2.extras
|
| if _conn_cache is None or _conn_cache.closed:
|
| _conn_cache = psycopg2.connect(_DB_URL, connect_timeout=30)
|
| _conn_cache.autocommit = False
|
|
|
| _conn_cache.cursor().execute("SELECT 1")
|
| return _conn_cache
|
| except Exception:
|
|
|
| _conn_cache = None
|
| import psycopg2
|
| import psycopg2.extras
|
| _conn_cache = psycopg2.connect(_DB_URL, connect_timeout=30)
|
| _conn_cache.autocommit = False
|
| return _conn_cache
|
|
|
|
|
| def is_available() -> bool:
|
| """True if database is reachable."""
|
| if not _DB_URL:
|
| return False
|
| try:
|
| conn = _get_conn()
|
| conn.cursor().execute("SELECT 1")
|
| return True
|
| except Exception as e:
|
| logger.warning(f"[database] not available: {e}")
|
| return False
|
|
|
|
|
|
|
|
|
|
|
| CREATE_TABLES_SQL = """
|
| CREATE EXTENSION IF NOT EXISTS vector;
|
|
|
| CREATE TABLE IF NOT EXISTS corpus (
|
| id SERIAL PRIMARY KEY,
|
| session_id TEXT NOT NULL DEFAULT 'default',
|
| L1 TEXT,
|
| L2 TEXT,
|
| L3 TEXT,
|
| L4 TEXT,
|
| sentence_id TEXT,
|
| sentence TEXT NOT NULL,
|
| label TEXT,
|
| embedding vector(384),
|
| created_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS codebook (
|
| id SERIAL PRIMARY KEY,
|
| session_id TEXT NOT NULL DEFAULT 'default',
|
| code_name TEXT NOT NULL,
|
| definition TEXT,
|
| provenance TEXT,
|
| sentence_count INT DEFAULT 1,
|
| created_at TIMESTAMPTZ DEFAULT NOW(),
|
| updated_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS coded_sentences (
|
| id SERIAL PRIMARY KEY,
|
| session_id TEXT NOT NULL DEFAULT 'default',
|
| sentence_idx INT,
|
| sentence TEXT,
|
| ai_code_iter1 TEXT,
|
| ai_code_iter2 TEXT,
|
| ai_code_iter3 TEXT,
|
| human_code_iter1 TEXT,
|
| human_code_iter2 TEXT,
|
| human_code_iter3 TEXT,
|
| final_code TEXT,
|
| orientation TEXT,
|
| created_at TIMESTAMPTZ DEFAULT NOW(),
|
| updated_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS themes (
|
| id SERIAL PRIMARY KEY,
|
| session_id TEXT NOT NULL DEFAULT 'default',
|
| theme_id INT,
|
| candidate_theme_name TEXT,
|
| description TEXT,
|
| rationale TEXT,
|
| member_codes TEXT,
|
| code_count INT,
|
| researcher_theme_name TEXT,
|
| researcher_notes TEXT,
|
| created_at TIMESTAMPTZ DEFAULT NOW(),
|
| updated_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS theme_reviews (
|
| id SERIAL PRIMARY KEY,
|
| session_id TEXT NOT NULL DEFAULT 'default',
|
| theme_id INT,
|
| theme_name TEXT,
|
| member_codes TEXT,
|
| code_count INT,
|
| member_sentence_count INT,
|
| within_cohesion FLOAT,
|
| llm_verdict TEXT,
|
| llm_reasoning TEXT,
|
| llm_action_suggestion TEXT,
|
| researcher_verdict TEXT,
|
| researcher_action_notes TEXT,
|
| created_at TIMESTAMPTZ DEFAULT NOW(),
|
| updated_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS chats (
|
| id SERIAL PRIMARY KEY,
|
| title TEXT,
|
| user_message TEXT,
|
| bot_message TEXT,
|
| topics_json JSONB,
|
| created_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE TABLE IF NOT EXISTS papers (
|
| id SERIAL PRIMARY KEY,
|
| chat_id INT REFERENCES chats(id) ON DELETE CASCADE,
|
| title TEXT,
|
| abstract TEXT,
|
| doi TEXT,
|
| date_of_publication TEXT,
|
| journal TEXT,
|
| no_of_citations INT,
|
| web_link TEXT,
|
| authors TEXT,
|
| keywords TEXT,
|
| confidence_score FLOAT,
|
| paper_type TEXT,
|
| topic_label TEXT,
|
| embedding vector(384),
|
| created_at TIMESTAMPTZ DEFAULT NOW()
|
| );
|
|
|
| CREATE INDEX IF NOT EXISTS idx_corpus_session ON corpus(session_id);
|
| CREATE INDEX IF NOT EXISTS idx_codebook_session ON codebook(session_id);
|
| CREATE INDEX IF NOT EXISTS idx_coded_session ON coded_sentences(session_id);
|
| CREATE INDEX IF NOT EXISTS idx_themes_session ON themes(session_id);
|
| CREATE INDEX IF NOT EXISTS idx_reviews_session ON theme_reviews(session_id);
|
| CREATE INDEX IF NOT EXISTS idx_papers_chat ON papers(chat_id);
|
| CREATE INDEX IF NOT EXISTS idx_papers_topic ON papers(topic_label);
|
| """
|
|
|
|
|
| def create_tables() -> bool:
|
| """Create all tables if they don't exist. Safe to call on every startup."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(CREATE_TABLES_SQL)
|
| conn.commit()
|
| logger.info("[database] Tables ready.")
|
| return True
|
| except Exception as e:
|
| logger.error(f"[database] create_tables error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return False
|
|
|
|
|
|
|
|
|
|
|
| def save_corpus(rows: list[dict], session_id: str = "default") -> int:
|
| """
|
| Save corpus sentences to database.
|
| Clears existing corpus for this session first (fresh load).
|
| Returns number of rows saved.
|
| """
|
| if not rows:
|
| return 0
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute("DELETE FROM corpus WHERE session_id = %s", (session_id,))
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| """INSERT INTO corpus (session_id, L1, L2, L3, L4, sentence_id, sentence, label)
|
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
|
| [
|
| (
|
| session_id,
|
| r.get("L1", ""),
|
| r.get("L2", ""),
|
| r.get("L3", ""),
|
| r.get("L4", ""),
|
| r.get("sentence_id", ""),
|
| r.get("sentence", ""),
|
| r.get("label", ""),
|
| )
|
| for r in rows
|
| ],
|
| )
|
| conn.commit()
|
| return len(rows)
|
| except Exception as e:
|
| logger.error(f"[database] save_corpus error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def load_corpus(session_id: str = "default") -> list[dict]:
|
| """Load corpus for a session."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| "SELECT L1, L2, L3, L4, sentence_id, sentence, label "
|
| "FROM corpus WHERE session_id = %s ORDER BY id",
|
| (session_id,),
|
| )
|
| cols = ["L1", "L2", "L3", "L4", "sentence_id", "sentence", "label"]
|
| return [dict(zip(cols, row)) for row in cur.fetchall()]
|
| except Exception as e:
|
| logger.error(f"[database] load_corpus error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def save_embeddings(sentence_embeddings: list[tuple[str, list[float]]], session_id: str = "default") -> int:
|
| """
|
| Save sentence embeddings to corpus table.
|
| sentence_embeddings: list of (sentence_text, embedding_list)
|
| """
|
| if not sentence_embeddings:
|
| return 0
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| "UPDATE corpus SET embedding = %s::vector WHERE session_id = %s AND sentence = %s",
|
| [(json.dumps(emb), session_id, sent) for sent, emb in sentence_embeddings],
|
| )
|
| conn.commit()
|
| return len(sentence_embeddings)
|
| except Exception as e:
|
| logger.error(f"[database] save_embeddings error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def similarity_search(query_embedding: list[float], session_id: str = "default", top_k: int = 5) -> list[dict]:
|
| """
|
| Find top_k most similar sentences using pgvector cosine similarity.
|
| Returns list of dicts with sentence, label, similarity.
|
| """
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| """SELECT sentence, label,
|
| 1 - (embedding <=> %s::vector) AS similarity
|
| FROM corpus
|
| WHERE session_id = %s AND embedding IS NOT NULL
|
| ORDER BY embedding <=> %s::vector
|
| LIMIT %s""",
|
| (json.dumps(query_embedding), session_id, json.dumps(query_embedding), top_k),
|
| )
|
| return [
|
| {"sentence": row[0], "label": row[1], "similarity": float(row[2])}
|
| for row in cur.fetchall()
|
| ]
|
| except Exception as e:
|
| logger.error(f"[database] similarity_search error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def save_codebook(codebook_rows: list[dict], session_id: str = "default") -> int:
|
| """Save full codebook (replaces existing for this session)."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute("DELETE FROM codebook WHERE session_id = %s", (session_id,))
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| """INSERT INTO codebook (session_id, code_name, definition, provenance, sentence_count)
|
| VALUES (%s, %s, %s, %s, %s)""",
|
| [
|
| (
|
| session_id,
|
| r.get("code_name", ""),
|
| r.get("definition", ""),
|
| r.get("provenance", ""),
|
| int(r.get("sentence_count", 1)),
|
| )
|
| for r in codebook_rows
|
| ],
|
| )
|
| conn.commit()
|
| return len(codebook_rows)
|
| except Exception as e:
|
| logger.error(f"[database] save_codebook error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def load_codebook(session_id: str = "default") -> list[dict]:
|
| """Load codebook for a session."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| "SELECT code_name, definition, provenance, sentence_count "
|
| "FROM codebook WHERE session_id = %s ORDER BY id",
|
| (session_id,),
|
| )
|
| cols = ["code_name", "definition", "provenance", "sentence_count"]
|
| return [dict(zip(cols, row)) for row in cur.fetchall()]
|
| except Exception as e:
|
| logger.error(f"[database] load_codebook error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def save_coded_sentences(coded_rows: list[dict], session_id: str = "default") -> int:
|
| """Save Phase 2 coded sentences (replaces existing for this session)."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute("DELETE FROM coded_sentences WHERE session_id = %s", (session_id,))
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| """INSERT INTO coded_sentences
|
| (session_id, sentence_idx, sentence,
|
| ai_code_iter1, ai_code_iter2, ai_code_iter3,
|
| human_code_iter1, human_code_iter2, human_code_iter3,
|
| final_code, orientation)
|
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
| [
|
| (
|
| session_id,
|
| i,
|
| r.get("sentence", ""),
|
| r.get("ai_code_iter1", ""),
|
| r.get("ai_code_iter2", ""),
|
| r.get("ai_code_iter3", ""),
|
| r.get("human_code_iter1", ""),
|
| r.get("human_code_iter2", ""),
|
| r.get("human_code_iter3", ""),
|
| r.get("final_code", ""),
|
| r.get("orientation", "semantic"),
|
| )
|
| for i, r in enumerate(coded_rows)
|
| ],
|
| )
|
| conn.commit()
|
| return len(coded_rows)
|
| except Exception as e:
|
| logger.error(f"[database] save_coded_sentences error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def load_coded_sentences(session_id: str = "default") -> list[dict]:
|
| """Load Phase 2 coded sentences for a session."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| """SELECT sentence_idx, sentence,
|
| ai_code_iter1, ai_code_iter2, ai_code_iter3,
|
| human_code_iter1, human_code_iter2, human_code_iter3,
|
| final_code, orientation
|
| FROM coded_sentences WHERE session_id = %s ORDER BY sentence_idx""",
|
| (session_id,),
|
| )
|
| cols = [
|
| "sentence_idx", "sentence",
|
| "ai_code_iter1", "ai_code_iter2", "ai_code_iter3",
|
| "human_code_iter1", "human_code_iter2", "human_code_iter3",
|
| "final_code", "orientation",
|
| ]
|
| return [dict(zip(cols, row)) for row in cur.fetchall()]
|
| except Exception as e:
|
| logger.error(f"[database] load_coded_sentences error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def save_themes(themes_rows: list[dict], session_id: str = "default") -> int:
|
| """Save Phase 3 themes (replaces existing for this session)."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute("DELETE FROM themes WHERE session_id = %s", (session_id,))
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| """INSERT INTO themes
|
| (session_id, theme_id, candidate_theme_name, description,
|
| rationale, member_codes, code_count,
|
| researcher_theme_name, researcher_notes)
|
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
| [
|
| (
|
| session_id,
|
| int(r.get("theme_id", 0)),
|
| r.get("candidate_theme_name", ""),
|
| r.get("description", ""),
|
| r.get("rationale", ""),
|
| r.get("member_codes", ""),
|
| int(r.get("code_count", 0)),
|
| r.get("researcher_theme_name", ""),
|
| r.get("researcher_notes", ""),
|
| )
|
| for r in themes_rows
|
| ],
|
| )
|
| conn.commit()
|
| return len(themes_rows)
|
| except Exception as e:
|
| logger.error(f"[database] save_themes error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def load_themes(session_id: str = "default") -> list[dict]:
|
| """Load Phase 3 themes for a session."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| """SELECT theme_id, candidate_theme_name, description, rationale,
|
| member_codes, code_count, researcher_theme_name, researcher_notes
|
| FROM themes WHERE session_id = %s ORDER BY theme_id""",
|
| (session_id,),
|
| )
|
| cols = [
|
| "theme_id", "candidate_theme_name", "description", "rationale",
|
| "member_codes", "code_count", "researcher_theme_name", "researcher_notes",
|
| ]
|
| return [dict(zip(cols, row)) for row in cur.fetchall()]
|
| except Exception as e:
|
| logger.error(f"[database] load_themes error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def save_theme_reviews(review_rows: list[dict], session_id: str = "default") -> int:
|
| """Save Phase 4 theme reviews (replaces existing for this session)."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute("DELETE FROM theme_reviews WHERE session_id = %s", (session_id,))
|
| import psycopg2.extras
|
| psycopg2.extras.execute_batch(
|
| cur,
|
| """INSERT INTO theme_reviews
|
| (session_id, theme_id, theme_name, member_codes, code_count,
|
| member_sentence_count, within_cohesion,
|
| llm_verdict, llm_reasoning, llm_action_suggestion,
|
| researcher_verdict, researcher_action_notes)
|
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
| [
|
| (
|
| session_id,
|
| int(r.get("theme_id", 0)),
|
| r.get("theme_name", ""),
|
| r.get("member_codes", ""),
|
| int(r.get("code_count", 0)),
|
| int(r.get("member_sentence_count", 0)),
|
| float(r.get("within_cohesion", 0.0)),
|
| r.get("llm_verdict", ""),
|
| r.get("llm_reasoning", ""),
|
| r.get("llm_action_suggestion", ""),
|
| r.get("researcher_verdict", ""),
|
| r.get("researcher_action_notes", ""),
|
| )
|
| for r in review_rows
|
| ],
|
| )
|
| conn.commit()
|
| return len(review_rows)
|
| except Exception as e:
|
| logger.error(f"[database] save_theme_reviews error: {e}")
|
| try:
|
| _get_conn().rollback()
|
| except Exception:
|
| pass
|
| return 0
|
|
|
|
|
| def load_theme_reviews(session_id: str = "default") -> list[dict]:
|
| """Load Phase 4 theme reviews for a session."""
|
| try:
|
| conn = _get_conn()
|
| cur = conn.cursor()
|
| cur.execute(
|
| """SELECT theme_id, theme_name, member_codes, code_count,
|
| member_sentence_count, within_cohesion,
|
| llm_verdict, llm_reasoning, llm_action_suggestion,
|
| researcher_verdict, researcher_action_notes
|
| FROM theme_reviews WHERE session_id = %s ORDER BY theme_id""",
|
| (session_id,),
|
| )
|
| cols = [
|
| "theme_id", "theme_name", "member_codes", "code_count",
|
| "member_sentence_count", "within_cohesion",
|
| "llm_verdict", "llm_reasoning", "llm_action_suggestion",
|
| "researcher_verdict", "researcher_action_notes",
|
| ]
|
| return [dict(zip(cols, row)) for row in cur.fetchall()]
|
| except Exception as e:
|
| logger.error(f"[database] load_theme_reviews error: {e}")
|
| return []
|
|
|
|
|
|
|
|
|
|
|
| def startup_check() -> dict:
|
| """Run on app startup. Returns status dict for display in UI."""
|
| status = {"db_available": False, "tables_created": False, "error": None}
|
| try:
|
| status["db_available"] = is_available()
|
| if status["db_available"]:
|
| status["tables_created"] = create_tables()
|
| except Exception as e:
|
| status["error"] = str(e)
|
| return status
|
|
|