#!/usr/bin/env python3 """ Migration: Add inventory JSON column and migrate data from MongoDB. """ import asyncio import sys import os import json from motor.motor_asyncio import AsyncIOMotorClient from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text from dotenv import load_dotenv # Load environment variables load_dotenv() async def run_migration(): """Add inventory column and migrate data from MongoDB.""" print("\nšŸ”„ Migration: Adding Inventory Column to PostgreSQL") print("=" * 60) # Setup connections mongo_client = AsyncIOMotorClient(os.getenv("MONGODB_URI")) db = mongo_client[os.getenv("MONGODB_DB_NAME", "cuatrolabs")] engine = create_async_engine(os.getenv("DATABASE_URL")) try: # Step 1: Add inventory column to PostgreSQL print("šŸ“Š Step 1: Adding inventory column to PostgreSQL...") async with engine.begin() as conn: # Add inventory column as JSONB for better performance and indexing await conn.execute(text(""" ALTER TABLE trans.catalogue_ref ADD COLUMN IF NOT EXISTS inventory JSONB """)) # Add comment await conn.execute(text(""" COMMENT ON COLUMN trans.catalogue_ref.inventory IS 'Multi-level inventory configuration with ncnf, cnf, distributor, and retail levels' """)) # Create GIN index for JSONB queries (JSONB supports GIN indexing) await conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_catalogue_ref_inventory ON trans.catalogue_ref USING GIN (inventory) """)) print("āœ… Inventory column added successfully") # Step 2: Verify column exists print("\nšŸ“‹ Step 2: Verifying column structure...") async with engine.begin() as conn: result = await conn.execute(text(""" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'trans' AND table_name = 'catalogue_ref' AND column_name = 'inventory' """)) column_info = result.fetchone() if column_info: print(f"āœ… Column verified: {column_info[0]} ({column_info[1]}, nullable: {column_info[2]})") else: print("āŒ Column not found!") return # Step 3: Get inventory data from MongoDB print("\nšŸ“„ Step 3: Fetching inventory data from MongoDB...") cursor = db.catalogues.find( {"merchant_id": "company_cuatro_beauty_ltd"}, {"catalogue_id": 1, "inventory": 1, "catalogue_code": 1} ) mongo_catalogues = await cursor.to_list(length=None) print(f"āœ… Found {len(mongo_catalogues)} catalogues with inventory data") # Step 4: Migrate inventory data to PostgreSQL print("\nšŸ”„ Step 4: Migrating inventory data to PostgreSQL...") updated_count = 0 failed_count = 0 async with engine.begin() as conn: for i, catalogue in enumerate(mongo_catalogues, 1): try: catalogue_id = catalogue.get("catalogue_id") inventory_data = catalogue.get("inventory", {}) catalogue_code = catalogue.get("catalogue_code", "unknown") if not catalogue_id: print(f" āš ļø Skipping catalogue without ID: {catalogue_code}") continue # Convert inventory to JSON string inventory_json = json.dumps(inventory_data) if inventory_data else None # Update PostgreSQL record result = await conn.execute(text(""" UPDATE trans.catalogue_ref SET inventory = :inventory_json WHERE catalogue_id = :catalogue_id """), { "inventory_json": inventory_json, "catalogue_id": catalogue_id }) if result.rowcount > 0: updated_count += 1 else: print(f" āš ļø No PostgreSQL record found for: {catalogue_code}") if i % 10 == 0: print(f" šŸ“Š Processed {i}/{len(mongo_catalogues)} catalogues...") except Exception as e: print(f" āŒ Error updating {catalogue_code}: {e}") failed_count += 1 print(f"āœ… Migration completed: {updated_count} records updated, {failed_count} failed") # Step 5: Verify migration results print("\nšŸ” Step 5: Verifying migration results...") async with engine.begin() as conn: # Count records with inventory data result = await conn.execute(text(""" SELECT COUNT(1) FROM trans.catalogue_ref WHERE inventory IS NOT NULL AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) """)) records_with_inventory = result.scalar() # Get sample inventory data result = await conn.execute(text(""" SELECT catalogue_code, catalogue_name, inventory FROM trans.catalogue_ref WHERE inventory IS NOT NULL AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) ORDER BY catalogue_code LIMIT 3 """)) samples = result.fetchall() print(f"šŸ“Š Records with inventory data: {records_with_inventory}") if samples: print(f"\nšŸ“‹ Sample inventory data:") for sample in samples: # PostgreSQL JSONB returns dict directly, not JSON string inventory_data = sample[2] if sample[2] else {} levels = list(inventory_data.get("levels", {}).keys()) if inventory_data else [] print(f" šŸ“„ {sample[0]}: {sample[1]}") print(f" Unit: {inventory_data.get('unit', 'N/A')}") print(f" Levels: {levels}") if levels: # Show sample level data first_level = levels[0] level_data = inventory_data["levels"][first_level] print(f" {first_level}: Reorder {level_data.get('reorder_level', 0)}, Max {level_data.get('max_stock_level', 0)}") # Step 6: Performance verification print(f"\n⚔ Step 6: Testing JSON query performance...") async with engine.begin() as conn: # Test JSON query result = await conn.execute(text(""" SELECT catalogue_code, inventory->>'unit' as unit FROM trans.catalogue_ref WHERE inventory->>'unit' = 'PCS' AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) LIMIT 3 """)) json_query_results = result.fetchall() print(f"āœ… JSON query test: Found {len(json_query_results)} records with unit 'PCS'") for result_row in json_query_results: print(f" šŸ“¦ {result_row[0]}: Unit = {result_row[1]}") print(f"\n" + "=" * 60) print("āœ… Migration Completed Successfully!") print("=" * 60) print(f"\nšŸ“Š Summary:") print(f" • Column added: inventory (JSON)") print(f" • Records updated: {updated_count}") print(f" • Records with inventory: {records_with_inventory}") print(f" • Index created: idx_catalogue_ref_inventory (GIN)") print(f" • JSON queries: Working āœ…") print(f"\nšŸŽÆ Benefits:") print(f" • Multi-level inventory data now in PostgreSQL") print(f" • Fast JSON queries with GIN index") print(f" • Consistent data across MongoDB and PostgreSQL") print(f" • Ready for inventory management features") except Exception as e: print(f"\nāŒ Migration failed: {e}") import traceback traceback.print_exc() finally: mongo_client.close() await engine.dispose() if __name__ == "__main__": asyncio.run(run_migration())