"""One-shot migration: local SQLite → Neon PostgreSQL. Usage: python backend/scripts/migrate_sqlite_to_pg.py \ --sqlite-path backend/math_wiki.db \ --database-url "postgresql://user:pass@host/dbname?sslmode=require" """ import argparse import asyncio import json import sqlite3 import sys from pathlib import Path async def _register_codecs(conn): import pgvector.asyncpg await pgvector.asyncpg.register_vector(conn) def _embed_batch(texts: list[str]) -> list[list[float]]: from FlagEmbedding import BGEM3FlagModel model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=False) prefixed = [f"passage: {t}" for t in texts] out = model.encode(prefixed, return_dense=True, return_sparse=False, return_colbert_vecs=False) return out["dense_vecs"].tolist() async def migrate(sqlite_path: str, database_url: str) -> None: import asyncpg sq = sqlite3.connect(sqlite_path) sq.row_factory = sqlite3.Row pool = await asyncpg.create_pool( database_url, min_size=1, max_size=5, statement_cache_size=0, init=_register_codecs, ) try: await _migrate_wiki_units(sq, pool) await _migrate_problems(sq, pool) await _migrate_table(sq, pool, "wiki_unit_history", "INSERT INTO wiki_unit_history (id,unit_id,version,content,edited_by,reason,edited_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING", lambda r: (r["id"], r["unit_id"], r["version"], r["content"], r["edited_by"], r["reason"], r["edited_at"])) await _migrate_table(sq, pool, "unit_feedback", "INSERT INTO unit_feedback (id,unit_id,problem_text,feedback_type,comment,resolved,created_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING", lambda r: (r["id"], r["unit_id"], r["problem_text"], r["feedback_type"], r["comment"], bool(r["resolved"]), r["created_at"])) await _migrate_table(sq, pool, "flagged_solutions", "INSERT INTO flagged_solutions (id,problem_text,problem_hash,solver_output,flag_reason,reviewed,flagged_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING", lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["solver_output"], r["flag_reason"], bool(r["reviewed"]), r["flagged_at"])) await _migrate_table(sq, pool, "wiki_drafts", "INSERT INTO wiki_drafts (draft_id,source_url,source_text,proposed_units_json," "final_units_json,topic_hint,status,reviewed_by,reviewed_at,created_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) ON CONFLICT (draft_id) DO NOTHING", lambda r: (r["draft_id"], r["source_url"], r["source_text"], r["proposed_units_json"], r["final_units_json"], r["topic_hint"], r["status"], r["reviewed_by"], r["reviewed_at"], r["created_at"])) await _migrate_table(sq, pool, "solution_logs", "INSERT INTO solution_logs (id,problem_text,problem_hash,classified_topic,retrieved_ids," "used_knowledge_ids,solver_confidence,validation_valid,validation_issues,wiki_assisted,created_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (id) DO NOTHING", lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["classified_topic"], r["retrieved_ids"], r["used_knowledge_ids"], r["solver_confidence"], bool(r["validation_valid"]), r["validation_issues"], bool(r["wiki_assisted"]), r["created_at"])) await _migrate_table(sq, pool, "staged_wiki_units", "INSERT INTO staged_wiki_units (staged_id,unit_data,source,source_url,status," "proposed_by,created_at,updated_at) " "VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (staged_id) DO NOTHING", lambda r: (r["staged_id"], r["unit_data"], r["source"], r["source_url"], r["status"], r["proposed_by"], r["created_at"], r["updated_at"])) finally: sq.close() await pool.close() async def _migrate_wiki_units(sq: sqlite3.Connection, pool) -> None: rows = sq.execute("SELECT * FROM wiki_units").fetchall() if not rows: print("wiki_units: 0 rows — skipping") return print(f"wiki_units: computing embeddings for {len(rows)} rows…") loop = asyncio.get_event_loop() texts = [r["content"] for r in rows] # Run CPU-bound embed in executor embeddings = await loop.run_in_executor(None, _embed_batch, texts) inserted = skipped = 0 async with pool.acquire() as conn: for row, emb in zip(rows, embeddings): result = await conn.execute( "INSERT INTO wiki_units " "(id,type,topic,subtopic,content,problem_ids,source,source_url," "deleted,version,last_edited_by,created_at,updated_at,embedding) " "VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) " "ON CONFLICT (id) DO NOTHING", row["id"], row["type"], row["topic"], row["subtopic"], row["content"], row["problem_ids"], row["source"] or "manual", row["source_url"], bool(row["deleted"]), row["version"], row["last_edited_by"], row["created_at"], row["updated_at"], emb, ) if result.split()[-1] == "1": inserted += 1 else: skipped += 1 print(f"wiki_units: {inserted} → Neon, {skipped} skipped (duplicates)") async def _migrate_problems(sq: sqlite3.Connection, pool) -> None: # Check if figure_svg column exists (added via migration in SQLite) cols = {r[1] for r in sq.execute("PRAGMA table_info(problems)").fetchall()} rows = sq.execute("SELECT * FROM problems").fetchall() if not rows: print("problems: 0 rows — skipping") return inserted = skipped = 0 async with pool.acquire() as conn: for row in rows: result = await conn.execute( "INSERT INTO problems " "(problem_id,problem_text,choices,correct_answer,topic,subtopic," "difficulty,problem_type,figure_svg,problem_hash,figure_type) " "VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) " "ON CONFLICT (problem_id) DO NOTHING", row["problem_id"], row["problem_text"], row["choices"] if "choices" in cols else None, row["correct_answer"] if "correct_answer" in cols else None, row["topic"], row["subtopic"], row["difficulty"], row["problem_type"], row["figure_svg"] if "figure_svg" in cols else None, row["problem_hash"] if "problem_hash" in cols else None, row["figure_type"] if "figure_type" in cols else "svg", ) if result.split()[-1] == "1": inserted += 1 else: skipped += 1 print(f"problems: {inserted} → Neon, {skipped} skipped (duplicates)") async def _migrate_table(sq, pool, table, query, row_fn) -> None: try: rows = sq.execute(f"SELECT * FROM {table}").fetchall() except sqlite3.OperationalError: print(f"{table}: table not found in SQLite — skipping") return if not rows: print(f"{table}: 0 rows — skipping") return inserted = skipped = 0 async with pool.acquire() as conn: for row in rows: result = await conn.execute(query, *row_fn(row)) if result.split()[-1] == "1": inserted += 1 else: skipped += 1 print(f"{table}: {inserted} → Neon, {skipped} skipped (duplicates)") def main(): parser = argparse.ArgumentParser(description="Migrate math_wiki SQLite → Neon PostgreSQL") parser.add_argument("--sqlite-path", required=True, help="Path to math_wiki.db") parser.add_argument("--database-url", required=True, help="Neon PostgreSQL DSN") args = parser.parse_args() if not Path(args.sqlite_path).exists(): print(f"ERROR: SQLite file not found: {args.sqlite_path}", file=sys.stderr) sys.exit(1) asyncio.run(migrate(args.sqlite_path, args.database_url)) print("Migration complete.") if __name__ == "__main__": main()