ai-agent-app / backend /scripts /migrate_sqlite_to_pg.py
MinhTai's picture
deploy: 98604cf
ae227b2
"""One-shot migration: local SQLite → Neon PostgreSQL.
Usage:
python backend/scripts/migrate_sqlite_to_pg.py \
--sqlite-path backend/math_wiki.db \
--database-url "postgresql://user:pass@host/dbname?sslmode=require"
"""
import argparse
import asyncio
import json
import sqlite3
import sys
from pathlib import Path
async def _register_codecs(conn):
import pgvector.asyncpg
await pgvector.asyncpg.register_vector(conn)
def _embed_batch(texts: list[str]) -> list[list[float]]:
from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=False)
prefixed = [f"passage: {t}" for t in texts]
out = model.encode(prefixed, return_dense=True, return_sparse=False, return_colbert_vecs=False)
return out["dense_vecs"].tolist()
async def migrate(sqlite_path: str, database_url: str) -> None:
import asyncpg
sq = sqlite3.connect(sqlite_path)
sq.row_factory = sqlite3.Row
pool = await asyncpg.create_pool(
database_url,
min_size=1,
max_size=5,
statement_cache_size=0,
init=_register_codecs,
)
try:
await _migrate_wiki_units(sq, pool)
await _migrate_problems(sq, pool)
await _migrate_table(sq, pool, "wiki_unit_history",
"INSERT INTO wiki_unit_history (id,unit_id,version,content,edited_by,reason,edited_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["unit_id"], r["version"], r["content"],
r["edited_by"], r["reason"], r["edited_at"]))
await _migrate_table(sq, pool, "unit_feedback",
"INSERT INTO unit_feedback (id,unit_id,problem_text,feedback_type,comment,resolved,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["unit_id"], r["problem_text"], r["feedback_type"],
r["comment"], bool(r["resolved"]), r["created_at"]))
await _migrate_table(sq, pool, "flagged_solutions",
"INSERT INTO flagged_solutions (id,problem_text,problem_hash,solver_output,flag_reason,reviewed,flagged_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["solver_output"],
r["flag_reason"], bool(r["reviewed"]), r["flagged_at"]))
await _migrate_table(sq, pool, "wiki_drafts",
"INSERT INTO wiki_drafts (draft_id,source_url,source_text,proposed_units_json,"
"final_units_json,topic_hint,status,reviewed_by,reviewed_at,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) ON CONFLICT (draft_id) DO NOTHING",
lambda r: (r["draft_id"], r["source_url"], r["source_text"], r["proposed_units_json"],
r["final_units_json"], r["topic_hint"], r["status"],
r["reviewed_by"], r["reviewed_at"], r["created_at"]))
await _migrate_table(sq, pool, "solution_logs",
"INSERT INTO solution_logs (id,problem_text,problem_hash,classified_topic,retrieved_ids,"
"used_knowledge_ids,solver_confidence,validation_valid,validation_issues,wiki_assisted,created_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (id) DO NOTHING",
lambda r: (r["id"], r["problem_text"], r["problem_hash"], r["classified_topic"],
r["retrieved_ids"], r["used_knowledge_ids"], r["solver_confidence"],
bool(r["validation_valid"]), r["validation_issues"],
bool(r["wiki_assisted"]), r["created_at"]))
await _migrate_table(sq, pool, "staged_wiki_units",
"INSERT INTO staged_wiki_units (staged_id,unit_data,source,source_url,status,"
"proposed_by,created_at,updated_at) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT (staged_id) DO NOTHING",
lambda r: (r["staged_id"], r["unit_data"], r["source"], r["source_url"],
r["status"], r["proposed_by"], r["created_at"], r["updated_at"]))
finally:
sq.close()
await pool.close()
async def _migrate_wiki_units(sq: sqlite3.Connection, pool) -> None:
rows = sq.execute("SELECT * FROM wiki_units").fetchall()
if not rows:
print("wiki_units: 0 rows — skipping")
return
print(f"wiki_units: computing embeddings for {len(rows)} rows…")
loop = asyncio.get_event_loop()
texts = [r["content"] for r in rows]
# Run CPU-bound embed in executor
embeddings = await loop.run_in_executor(None, _embed_batch, texts)
inserted = skipped = 0
async with pool.acquire() as conn:
for row, emb in zip(rows, embeddings):
result = await conn.execute(
"INSERT INTO wiki_units "
"(id,type,topic,subtopic,content,problem_ids,source,source_url,"
"deleted,version,last_edited_by,created_at,updated_at,embedding) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) "
"ON CONFLICT (id) DO NOTHING",
row["id"], row["type"], row["topic"], row["subtopic"], row["content"],
row["problem_ids"], row["source"] or "manual", row["source_url"],
bool(row["deleted"]), row["version"], row["last_edited_by"],
row["created_at"], row["updated_at"], emb,
)
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"wiki_units: {inserted} → Neon, {skipped} skipped (duplicates)")
async def _migrate_problems(sq: sqlite3.Connection, pool) -> None:
# Check if figure_svg column exists (added via migration in SQLite)
cols = {r[1] for r in sq.execute("PRAGMA table_info(problems)").fetchall()}
rows = sq.execute("SELECT * FROM problems").fetchall()
if not rows:
print("problems: 0 rows — skipping")
return
inserted = skipped = 0
async with pool.acquire() as conn:
for row in rows:
result = await conn.execute(
"INSERT INTO problems "
"(problem_id,problem_text,choices,correct_answer,topic,subtopic,"
"difficulty,problem_type,figure_svg,problem_hash,figure_type) "
"VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) "
"ON CONFLICT (problem_id) DO NOTHING",
row["problem_id"], row["problem_text"],
row["choices"] if "choices" in cols else None,
row["correct_answer"] if "correct_answer" in cols else None,
row["topic"], row["subtopic"], row["difficulty"], row["problem_type"],
row["figure_svg"] if "figure_svg" in cols else None,
row["problem_hash"] if "problem_hash" in cols else None,
row["figure_type"] if "figure_type" in cols else "svg",
)
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"problems: {inserted} → Neon, {skipped} skipped (duplicates)")
async def _migrate_table(sq, pool, table, query, row_fn) -> None:
try:
rows = sq.execute(f"SELECT * FROM {table}").fetchall()
except sqlite3.OperationalError:
print(f"{table}: table not found in SQLite — skipping")
return
if not rows:
print(f"{table}: 0 rows — skipping")
return
inserted = skipped = 0
async with pool.acquire() as conn:
for row in rows:
result = await conn.execute(query, *row_fn(row))
if result.split()[-1] == "1":
inserted += 1
else:
skipped += 1
print(f"{table}: {inserted} → Neon, {skipped} skipped (duplicates)")
def main():
parser = argparse.ArgumentParser(description="Migrate math_wiki SQLite → Neon PostgreSQL")
parser.add_argument("--sqlite-path", required=True, help="Path to math_wiki.db")
parser.add_argument("--database-url", required=True, help="Neon PostgreSQL DSN")
args = parser.parse_args()
if not Path(args.sqlite_path).exists():
print(f"ERROR: SQLite file not found: {args.sqlite_path}", file=sys.stderr)
sys.exit(1)
asyncio.run(migrate(args.sqlite_path, args.database_url))
print("Migration complete.")
if __name__ == "__main__":
main()