cuatrolabs-scm-ms / docs /database /migrations /migration_add_inventory_column.py
MukeshKapoor25's picture
Refactor COUNT queries to use COUNT(1) for improved performance and consistency across various scripts and services.
0766a99
#!/usr/bin/env python3
"""
Migration: Add inventory JSON column and migrate data from MongoDB.
"""
import asyncio
import sys
import os
import json
from motor.motor_asyncio import AsyncIOMotorClient
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def run_migration():
"""Add inventory column and migrate data from MongoDB."""
print("\nπŸ”„ Migration: Adding Inventory Column to PostgreSQL")
print("=" * 60)
# Setup connections
mongo_client = AsyncIOMotorClient(os.getenv("MONGODB_URI"))
db = mongo_client[os.getenv("MONGODB_DB_NAME", "cuatrolabs")]
engine = create_async_engine(os.getenv("DATABASE_URL"))
try:
# Step 1: Add inventory column to PostgreSQL
print("πŸ“Š Step 1: Adding inventory column to PostgreSQL...")
async with engine.begin() as conn:
# Add inventory column as JSONB for better performance and indexing
await conn.execute(text("""
ALTER TABLE trans.catalogue_ref
ADD COLUMN IF NOT EXISTS inventory JSONB
"""))
# Add comment
await conn.execute(text("""
COMMENT ON COLUMN trans.catalogue_ref.inventory IS
'Multi-level inventory configuration with ncnf, cnf, distributor, and retail levels'
"""))
# Create GIN index for JSONB queries (JSONB supports GIN indexing)
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_catalogue_ref_inventory
ON trans.catalogue_ref USING GIN (inventory)
"""))
print("βœ… Inventory column added successfully")
# Step 2: Verify column exists
print("\nπŸ“‹ Step 2: Verifying column structure...")
async with engine.begin() as conn:
result = await conn.execute(text("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'trans'
AND table_name = 'catalogue_ref'
AND column_name = 'inventory'
"""))
column_info = result.fetchone()
if column_info:
print(f"βœ… Column verified: {column_info[0]} ({column_info[1]}, nullable: {column_info[2]})")
else:
print("❌ Column not found!")
return
# Step 3: Get inventory data from MongoDB
print("\nπŸ“₯ Step 3: Fetching inventory data from MongoDB...")
cursor = db.catalogues.find(
{"merchant_id": "company_cuatro_beauty_ltd"},
{"catalogue_id": 1, "inventory": 1, "catalogue_code": 1}
)
mongo_catalogues = await cursor.to_list(length=None)
print(f"βœ… Found {len(mongo_catalogues)} catalogues with inventory data")
# Step 4: Migrate inventory data to PostgreSQL
print("\nπŸ”„ Step 4: Migrating inventory data to PostgreSQL...")
updated_count = 0
failed_count = 0
async with engine.begin() as conn:
for i, catalogue in enumerate(mongo_catalogues, 1):
try:
catalogue_id = catalogue.get("catalogue_id")
inventory_data = catalogue.get("inventory", {})
catalogue_code = catalogue.get("catalogue_code", "unknown")
if not catalogue_id:
print(f" ⚠️ Skipping catalogue without ID: {catalogue_code}")
continue
# Convert inventory to JSON string
inventory_json = json.dumps(inventory_data) if inventory_data else None
# Update PostgreSQL record
result = await conn.execute(text("""
UPDATE trans.catalogue_ref
SET inventory = :inventory_json
WHERE catalogue_id = :catalogue_id
"""), {
"inventory_json": inventory_json,
"catalogue_id": catalogue_id
})
if result.rowcount > 0:
updated_count += 1
else:
print(f" ⚠️ No PostgreSQL record found for: {catalogue_code}")
if i % 10 == 0:
print(f" πŸ“Š Processed {i}/{len(mongo_catalogues)} catalogues...")
except Exception as e:
print(f" ❌ Error updating {catalogue_code}: {e}")
failed_count += 1
print(f"βœ… Migration completed: {updated_count} records updated, {failed_count} failed")
# Step 5: Verify migration results
print("\nπŸ” Step 5: Verifying migration results...")
async with engine.begin() as conn:
# Count records with inventory data
result = await conn.execute(text("""
SELECT COUNT(1)
FROM trans.catalogue_ref
WHERE inventory IS NOT NULL
AND 'company_cuatro_beauty_ltd' = ANY(merchant_id)
"""))
records_with_inventory = result.scalar()
# Get sample inventory data
result = await conn.execute(text("""
SELECT catalogue_code, catalogue_name, inventory
FROM trans.catalogue_ref
WHERE inventory IS NOT NULL
AND 'company_cuatro_beauty_ltd' = ANY(merchant_id)
ORDER BY catalogue_code
LIMIT 3
"""))
samples = result.fetchall()
print(f"πŸ“Š Records with inventory data: {records_with_inventory}")
if samples:
print(f"\nπŸ“‹ Sample inventory data:")
for sample in samples:
# PostgreSQL JSONB returns dict directly, not JSON string
inventory_data = sample[2] if sample[2] else {}
levels = list(inventory_data.get("levels", {}).keys()) if inventory_data else []
print(f" πŸ“„ {sample[0]}: {sample[1]}")
print(f" Unit: {inventory_data.get('unit', 'N/A')}")
print(f" Levels: {levels}")
if levels:
# Show sample level data
first_level = levels[0]
level_data = inventory_data["levels"][first_level]
print(f" {first_level}: Reorder {level_data.get('reorder_level', 0)}, Max {level_data.get('max_stock_level', 0)}")
# Step 6: Performance verification
print(f"\n⚑ Step 6: Testing JSON query performance...")
async with engine.begin() as conn:
# Test JSON query
result = await conn.execute(text("""
SELECT catalogue_code, inventory->>'unit' as unit
FROM trans.catalogue_ref
WHERE inventory->>'unit' = 'PCS'
AND 'company_cuatro_beauty_ltd' = ANY(merchant_id)
LIMIT 3
"""))
json_query_results = result.fetchall()
print(f"βœ… JSON query test: Found {len(json_query_results)} records with unit 'PCS'")
for result_row in json_query_results:
print(f" πŸ“¦ {result_row[0]}: Unit = {result_row[1]}")
print(f"\n" + "=" * 60)
print("βœ… Migration Completed Successfully!")
print("=" * 60)
print(f"\nπŸ“Š Summary:")
print(f" β€’ Column added: inventory (JSON)")
print(f" β€’ Records updated: {updated_count}")
print(f" β€’ Records with inventory: {records_with_inventory}")
print(f" β€’ Index created: idx_catalogue_ref_inventory (GIN)")
print(f" β€’ JSON queries: Working βœ…")
print(f"\n🎯 Benefits:")
print(f" β€’ Multi-level inventory data now in PostgreSQL")
print(f" β€’ Fast JSON queries with GIN index")
print(f" β€’ Consistent data across MongoDB and PostgreSQL")
print(f" β€’ Ready for inventory management features")
except Exception as e:
print(f"\n❌ Migration failed: {e}")
import traceback
traceback.print_exc()
finally:
mongo_client.close()
await engine.dispose()
if __name__ == "__main__":
asyncio.run(run_migration())