ai-textbook-backend / src /scripts /migrate_neon.py
Naveedtechlab's picture
Add full AI Native Textbook project source code
db7c1e8
#!/usr/bin/env python3
"""
Database Migration Script for AI Backend with RAG + Authentication
This script creates all required tables in the Neon Postgres database
"""
import asyncio
import sys
import logging
import os
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
# Add the src directory to the path so we can import our modules
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
src_dir = os.path.join(project_root, "src")
sys.path.insert(0, src_dir)
from db.base import Base
from config.settings import settings
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
async def create_database_tables():
"""Create all required database tables"""
logger.info("Starting database migration...")
# Create async engine
engine = create_async_engine(
settings.neon_db_url,
echo=True, # Set to True to log SQL queries during migration
pool_pre_ping=True,
pool_size=5,
max_overflow=10,
)
try:
logger.info("Connecting to database...")
# Create all tables
async with engine.begin() as conn:
logger.info("Creating tables...")
await conn.run_sync(Base.metadata.create_all)
logger.info("Tables created successfully")
# Verify tables exist by checking if we can access them
tables = Base.metadata.tables.keys()
logger.info(f"Created {len(tables)} tables: {list(tables)}")
# Test connection by running a simple query
result = await conn.execute(text("SELECT 1"))
logger.info("Database connection test successful")
logger.info("Database migration completed successfully!")
return True
except Exception as e:
logger.error(f"Database migration failed: {e}")
return False
finally:
# Dispose of the engine to close all connections
await engine.dispose()
async def check_existing_tables():
"""Check if tables already exist in the database"""
engine = create_async_engine(
settings.neon_db_url,
pool_pre_ping=True,
)
try:
async with engine.begin() as conn:
# Get list of existing tables
result = await conn.execute(text("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
"""))
existing_tables = [row[0] for row in result]
logger.info(f"Existing tables in database: {existing_tables}")
# Check which of our models' tables already exist
our_tables = [table.name for table in Base.metadata.tables.values()]
existing_our_tables = [table for table in our_tables if table in existing_tables]
if existing_our_tables:
logger.warning(f"The following tables already exist: {existing_our_tables}")
return existing_our_tables
else:
logger.info("No existing tables from our models found.")
return []
except Exception as e:
logger.error(f"Error checking existing tables: {e}")
return []
finally:
await engine.dispose()
async def run_migration():
"""Main function to run the migration"""
logger.info("Migration script started")
# Check for existing tables
existing_tables = await check_existing_tables()
if existing_tables:
logger.info("Some tables already exist. Migration will continue and handle existing tables gracefully.")
else:
logger.info("No existing tables found. Proceeding with fresh migration.")
# Create tables
success = await create_database_tables()
if success:
logger.info("Migration completed successfully!")
return True
else:
logger.error("Migration failed!")
return False
if __name__ == "__main__":
# Check if we're running in a virtual environment or have the required dependencies
try:
import sqlalchemy
import asyncpg
except ImportError as e:
logger.error(f"Required dependencies not found: {e}")
logger.error("Please make sure you've installed the requirements: pip install -r requirements.txt")
sys.exit(1)
# Run the migration
success = asyncio.run(run_migration())
if success:
logger.info("Migration script completed successfully!")
sys.exit(0)
else:
logger.error("Migration script failed!")
sys.exit(1)