accujuris-api / scripts /migrate_mongo_to_postgres.py
arnavam's picture
Initial HuggingFace deployment
cd3078d
import asyncio
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from bson import ObjectId
from pymongo import MongoClient
# Ensure "app" imports resolve when running this script directly.
BACKEND_ROOT = Path(__file__).resolve().parent.parent
if str(BACKEND_ROOT) not in sys.path:
sys.path.insert(0, str(BACKEND_ROOT))
from app.database import close_db, connect_db, get_db
COLLECTIONS = (
"users",
"documents",
"verifications",
"refresh_tokens",
"password_reset_tokens",
"revoked_access_tokens",
"notifications",
"document_versions",
)
def _normalize(value: Any) -> Any:
if isinstance(value, ObjectId):
return str(value)
if isinstance(value, datetime):
return value
if isinstance(value, list):
return [_normalize(item) for item in value]
if isinstance(value, dict):
return {key: _normalize(item) for key, item in value.items()}
return value
async def migrate() -> None:
mongodb_uri = os.getenv("MONGODB_URI")
db_name = os.getenv("MONGODB_DB_NAME") or os.getenv("DB_NAME") or "accujuris"
if not mongodb_uri:
raise RuntimeError("Set MONGODB_URI before running this migration script.")
mongo_client = MongoClient(mongodb_uri)
source_db = mongo_client[db_name]
await connect_db()
try:
target_db = get_db()
for name in COLLECTIONS:
source_collection = source_db[name]
target_collection = getattr(target_db, name)
moved = 0
for source_doc in source_collection.find({}):
normalized = _normalize(source_doc)
normalized["_id"] = str(source_doc["_id"])
await target_collection.insert_one(normalized)
moved += 1
print(f"{name}: migrated {moved} rows")
finally:
await close_db()
mongo_client.close()
if __name__ == "__main__":
asyncio.run(migrate())