Spaces:
Running
Running
File size: 8,471 Bytes
ae227b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | """One-shot migration: local SQLite → Neon PostgreSQL.
Usage:
python backend/scripts/migrate_sqlite_to_pg.py \
--sqlite-path backend/math_wiki.db \
--database-url "postgresql://user:pass@host/dbname?sslmode=require"
"""
import argparse
import asyncio
import json
import sqlite3
import sys
from pathlib import Path
async def _register_codecs(conn):
import pgvector.asyncpg
await pgvector.asyncpg.register_vector(conn)
def _embed_batch(texts: list[str]) -> list[list[float]]:
from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=False)
prefixed = [f"passage: {t}" for t in texts]
out = model.encode(prefixed, return_dense=True, return_sparse=False, return_colbert_vecs=False)
return out["dense_vecs"].tolist()
async def migrate(sqlite_path: str, database_url: str) -> None:
import asyncpg
sq = sqlite3.connect(sqlite_path)
sq.row_factory = sqlite3.Row
pool = await asyncpg.create_pool(
database_url,
min_size=1,
max_size=5,
statement_cache_size=0,
init=_register_codecs,
)
try:
await _migrate_wiki_units(sq, pool)
await _migrate_problems(sq, pool)
await _migrate_table(sq, pool, "wiki_unit_history",
"INSERT INTO wiki_unit_history (id,unit_id,version,content,edited_by,reason,edited_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["unit_id"], r["version"], r["content"],
r["edited_by"], r["reason"], r["edited_at"]))
await _migrate_table(sq, pool, "unit_feedback",
"INSERT INTO unit_feedback (id,unit_id,problem_text,feedback_type,comment,resolved,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["unit_id"], r["problem_text"], r["feedback_type"],
r["comment"], bool(r["resolved"]), r["created_at"]))
await _migrate_table(sq, pool, "flagged_solutions",
"INSERT INTO flagged_solutions (id,problem_text,problem_hash,solver_output,flag_reason,reviewed,flagged_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["solver_output"],
r["flag_reason"], bool(r["reviewed"]), r["flagged_at"]))
await _migrate_table(sq, pool, "wiki_drafts",
"INSERT INTO wiki_drafts (draft_id,source_url,source_text,proposed_units_json,"
"final_units_json,topic_hint,status,reviewed_by,reviewed_at,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) ON CONFLICT (draft_id) DO NOTHING",
lambda r: (r["draft_id"], r["source_url"], r["source_text"], r["proposed_units_json"],
r["final_units_json"], r["topic_hint"], r["status"],
r["reviewed_by"], r["reviewed_at"], r["created_at"]))
await _migrate_table(sq, pool, "solution_logs",
"INSERT INTO solution_logs (id,problem_text,problem_hash,classified_topic,retrieved_ids,"
"used_knowledge_ids,solver_confidence,validation_valid,validation_issues,wiki_assisted,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["classified_topic"],
r["retrieved_ids"], r["used_knowledge_ids"], r["solver_confidence"],
bool(r["validation_valid"]), r["validation_issues"],
bool(r["wiki_assisted"]), r["created_at"]))
await _migrate_table(sq, pool, "staged_wiki_units",
"INSERT INTO staged_wiki_units (staged_id,unit_data,source,source_url,status,"
"proposed_by,created_at,updated_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (staged_id) DO NOTHING",
lambda r: (r["staged_id"], r["unit_data"], r["source"], r["source_url"],
r["status"], r["proposed_by"], r["created_at"], r["updated_at"]))
finally:
sq.close()
await pool.close()
async def _migrate_wiki_units(sq: sqlite3.Connection, pool) -> None:
rows = sq.execute("SELECT * FROM wiki_units").fetchall()
if not rows:
print("wiki_units: 0 rows — skipping")
return
print(f"wiki_units: computing embeddings for {len(rows)} rows…")
loop = asyncio.get_event_loop()
texts = [r["content"] for r in rows]
# Run CPU-bound embed in executor
embeddings = await loop.run_in_executor(None, _embed_batch, texts)
inserted = skipped = 0
async with pool.acquire() as conn:
for row, emb in zip(rows, embeddings):
result = await conn.execute(
"INSERT INTO wiki_units "
"(id,type,topic,subtopic,content,problem_ids,source,source_url,"
"deleted,version,last_edited_by,created_at,updated_at,embedding) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) "
"ON CONFLICT (id) DO NOTHING",
row["id"], row["type"], row["topic"], row["subtopic"], row["content"],
row["problem_ids"], row["source"] or "manual", row["source_url"],
bool(row["deleted"]), row["version"], row["last_edited_by"],
row["created_at"], row["updated_at"], emb,
)
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"wiki_units: {inserted} → Neon, {skipped} skipped (duplicates)")
async def _migrate_problems(sq: sqlite3.Connection, pool) -> None:
# Check if figure_svg column exists (added via migration in SQLite)
cols = {r[1] for r in sq.execute("PRAGMA table_info(problems)").fetchall()}
rows = sq.execute("SELECT * FROM problems").fetchall()
if not rows:
print("problems: 0 rows — skipping")
return
inserted = skipped = 0
async with pool.acquire() as conn:
for row in rows:
result = await conn.execute(
"INSERT INTO problems "
"(problem_id,problem_text,choices,correct_answer,topic,subtopic,"
"difficulty,problem_type,figure_svg,problem_hash,figure_type) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) "
"ON CONFLICT (problem_id) DO NOTHING",
row["problem_id"], row["problem_text"],
row["choices"] if "choices" in cols else None,
row["correct_answer"] if "correct_answer" in cols else None,
row["topic"], row["subtopic"], row["difficulty"], row["problem_type"],
row["figure_svg"] if "figure_svg" in cols else None,
row["problem_hash"] if "problem_hash" in cols else None,
row["figure_type"] if "figure_type" in cols else "svg",
)
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"problems: {inserted} → Neon, {skipped} skipped (duplicates)")
async def _migrate_table(sq, pool, table, query, row_fn) -> None:
try:
rows = sq.execute(f"SELECT * FROM {table}").fetchall()
except sqlite3.OperationalError:
print(f"{table}: table not found in SQLite — skipping")
return
if not rows:
print(f"{table}: 0 rows — skipping")
return
inserted = skipped = 0
async with pool.acquire() as conn:
for row in rows:
result = await conn.execute(query, *row_fn(row))
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"{table}: {inserted} → Neon, {skipped} skipped (duplicates)")
def main():
parser = argparse.ArgumentParser(description="Migrate math_wiki SQLite → Neon PostgreSQL")
parser.add_argument("--sqlite-path", required=True, help="Path to math_wiki.db")
parser.add_argument("--database-url", required=True, help="Neon PostgreSQL DSN")
args = parser.parse_args()
if not Path(args.sqlite_path).exists():
print(f"ERROR: SQLite file not found: {args.sqlite_path}", file=sys.stderr)
sys.exit(1)
asyncio.run(migrate(args.sqlite_path, args.database_url))
print("Migration complete.")
if __name__ == "__main__":
main()
|