Spaces:
Running
Running
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99 | #!/usr/bin/env python3 | |
| """ | |
| Migration: Add inventory JSON column and migrate data from MongoDB. | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| import json | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from sqlalchemy.ext.asyncio import create_async_engine | |
| from sqlalchemy import text | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| async def run_migration(): | |
| """Add inventory column and migrate data from MongoDB.""" | |
| print("\nπ Migration: Adding Inventory Column to PostgreSQL") | |
| print("=" * 60) | |
| # Setup connections | |
| mongo_client = AsyncIOMotorClient(os.getenv("MONGODB_URI")) | |
| db = mongo_client[os.getenv("MONGODB_DB_NAME", "cuatrolabs")] | |
| engine = create_async_engine(os.getenv("DATABASE_URL")) | |
| try: | |
| # Step 1: Add inventory column to PostgreSQL | |
| print("π Step 1: Adding inventory column to PostgreSQL...") | |
| async with engine.begin() as conn: | |
| # Add inventory column as JSONB for better performance and indexing | |
| await conn.execute(text(""" | |
| ALTER TABLE trans.catalogue_ref | |
| ADD COLUMN IF NOT EXISTS inventory JSONB | |
| """)) | |
| # Add comment | |
| await conn.execute(text(""" | |
| COMMENT ON COLUMN trans.catalogue_ref.inventory IS | |
| 'Multi-level inventory configuration with ncnf, cnf, distributor, and retail levels' | |
| """)) | |
| # Create GIN index for JSONB queries (JSONB supports GIN indexing) | |
| await conn.execute(text(""" | |
| CREATE INDEX IF NOT EXISTS idx_catalogue_ref_inventory | |
| ON trans.catalogue_ref USING GIN (inventory) | |
| """)) | |
| print("β Inventory column added successfully") | |
| # Step 2: Verify column exists | |
| print("\nπ Step 2: Verifying column structure...") | |
| async with engine.begin() as conn: | |
| result = await conn.execute(text(""" | |
| SELECT column_name, data_type, is_nullable | |
| FROM information_schema.columns | |
| WHERE table_schema = 'trans' | |
| AND table_name = 'catalogue_ref' | |
| AND column_name = 'inventory' | |
| """)) | |
| column_info = result.fetchone() | |
| if column_info: | |
| print(f"β Column verified: {column_info[0]} ({column_info[1]}, nullable: {column_info[2]})") | |
| else: | |
| print("β Column not found!") | |
| return | |
| # Step 3: Get inventory data from MongoDB | |
| print("\nπ₯ Step 3: Fetching inventory data from MongoDB...") | |
| cursor = db.catalogues.find( | |
| {"merchant_id": "company_cuatro_beauty_ltd"}, | |
| {"catalogue_id": 1, "inventory": 1, "catalogue_code": 1} | |
| ) | |
| mongo_catalogues = await cursor.to_list(length=None) | |
| print(f"β Found {len(mongo_catalogues)} catalogues with inventory data") | |
| # Step 4: Migrate inventory data to PostgreSQL | |
| print("\nπ Step 4: Migrating inventory data to PostgreSQL...") | |
| updated_count = 0 | |
| failed_count = 0 | |
| async with engine.begin() as conn: | |
| for i, catalogue in enumerate(mongo_catalogues, 1): | |
| try: | |
| catalogue_id = catalogue.get("catalogue_id") | |
| inventory_data = catalogue.get("inventory", {}) | |
| catalogue_code = catalogue.get("catalogue_code", "unknown") | |
| if not catalogue_id: | |
| print(f" β οΈ Skipping catalogue without ID: {catalogue_code}") | |
| continue | |
| # Convert inventory to JSON string | |
| inventory_json = json.dumps(inventory_data) if inventory_data else None | |
| # Update PostgreSQL record | |
| result = await conn.execute(text(""" | |
| UPDATE trans.catalogue_ref | |
| SET inventory = :inventory_json | |
| WHERE catalogue_id = :catalogue_id | |
| """), { | |
| "inventory_json": inventory_json, | |
| "catalogue_id": catalogue_id | |
| }) | |
| if result.rowcount > 0: | |
| updated_count += 1 | |
| else: | |
| print(f" β οΈ No PostgreSQL record found for: {catalogue_code}") | |
| if i % 10 == 0: | |
| print(f" π Processed {i}/{len(mongo_catalogues)} catalogues...") | |
| except Exception as e: | |
| print(f" β Error updating {catalogue_code}: {e}") | |
| failed_count += 1 | |
| print(f"β Migration completed: {updated_count} records updated, {failed_count} failed") | |
| # Step 5: Verify migration results | |
| print("\nπ Step 5: Verifying migration results...") | |
| async with engine.begin() as conn: | |
| # Count records with inventory data | |
| result = await conn.execute(text(""" | |
| SELECT COUNT(1) | |
| FROM trans.catalogue_ref | |
| WHERE inventory IS NOT NULL | |
| AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) | |
| """)) | |
| records_with_inventory = result.scalar() | |
| # Get sample inventory data | |
| result = await conn.execute(text(""" | |
| SELECT catalogue_code, catalogue_name, inventory | |
| FROM trans.catalogue_ref | |
| WHERE inventory IS NOT NULL | |
| AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) | |
| ORDER BY catalogue_code | |
| LIMIT 3 | |
| """)) | |
| samples = result.fetchall() | |
| print(f"π Records with inventory data: {records_with_inventory}") | |
| if samples: | |
| print(f"\nπ Sample inventory data:") | |
| for sample in samples: | |
| # PostgreSQL JSONB returns dict directly, not JSON string | |
| inventory_data = sample[2] if sample[2] else {} | |
| levels = list(inventory_data.get("levels", {}).keys()) if inventory_data else [] | |
| print(f" π {sample[0]}: {sample[1]}") | |
| print(f" Unit: {inventory_data.get('unit', 'N/A')}") | |
| print(f" Levels: {levels}") | |
| if levels: | |
| # Show sample level data | |
| first_level = levels[0] | |
| level_data = inventory_data["levels"][first_level] | |
| print(f" {first_level}: Reorder {level_data.get('reorder_level', 0)}, Max {level_data.get('max_stock_level', 0)}") | |
| # Step 6: Performance verification | |
| print(f"\nβ‘ Step 6: Testing JSON query performance...") | |
| async with engine.begin() as conn: | |
| # Test JSON query | |
| result = await conn.execute(text(""" | |
| SELECT catalogue_code, inventory->>'unit' as unit | |
| FROM trans.catalogue_ref | |
| WHERE inventory->>'unit' = 'PCS' | |
| AND 'company_cuatro_beauty_ltd' = ANY(merchant_id) | |
| LIMIT 3 | |
| """)) | |
| json_query_results = result.fetchall() | |
| print(f"β JSON query test: Found {len(json_query_results)} records with unit 'PCS'") | |
| for result_row in json_query_results: | |
| print(f" π¦ {result_row[0]}: Unit = {result_row[1]}") | |
| print(f"\n" + "=" * 60) | |
| print("β Migration Completed Successfully!") | |
| print("=" * 60) | |
| print(f"\nπ Summary:") | |
| print(f" β’ Column added: inventory (JSON)") | |
| print(f" β’ Records updated: {updated_count}") | |
| print(f" β’ Records with inventory: {records_with_inventory}") | |
| print(f" β’ Index created: idx_catalogue_ref_inventory (GIN)") | |
| print(f" β’ JSON queries: Working β ") | |
| print(f"\nπ― Benefits:") | |
| print(f" β’ Multi-level inventory data now in PostgreSQL") | |
| print(f" β’ Fast JSON queries with GIN index") | |
| print(f" β’ Consistent data across MongoDB and PostgreSQL") | |
| print(f" β’ Ready for inventory management features") | |
| except Exception as e: | |
| print(f"\nβ Migration failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| mongo_client.close() | |
| await engine.dispose() | |
| if __name__ == "__main__": | |
| asyncio.run(run_migration()) |