#!/usr/bin/env python3 """ Migration script to add pricing_levels JSON column to catalogue_ref table. This migration adds the pricing_levels column to store enhanced pricing data as JSON in PostgreSQL for better performance and flexibility. Usage: python migration_add_pricing_levels_column.py """ import asyncio import asyncpg from insightfy_utils.logging import get_logger from app.core.config import settings logger = get_logger(__name__) async def add_pricing_levels_column(): """Add pricing_levels JSON column to trans.catalogue_ref table.""" # Use global settings instance try: # Connect to PostgreSQL conn = await asyncpg.connect( host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT, user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD, database=settings.POSTGRES_DB ) logger.info("Connected to PostgreSQL database") # Check if pricing_levels column already exists check_query = """ SELECT 1 FROM information_schema.columns WHERE table_schema = 'trans' AND table_name = 'catalogue_ref' AND column_name = 'pricing_levels' LIMIT 1 """ result = await conn.fetchval(check_query) if result: logger.info("pricing_levels column already exists in trans.catalogue_ref") return # Add pricing_levels JSON column alter_query = """ ALTER TABLE trans.catalogue_ref ADD COLUMN pricing_levels JSON """ await conn.execute(alter_query) logger.info("Successfully added pricing_levels JSON column to trans.catalogue_ref") # Create index on pricing_levels for better query performance index_queries = [ "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_catalogue_ref_pricing_levels_currency ON trans.catalogue_ref USING GIN ((pricing_levels->>'currency'))", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_catalogue_ref_pricing_levels_mrp ON trans.catalogue_ref USING BTREE (CAST(pricing_levels->>'mrp' AS NUMERIC))", ] for index_query in index_queries: try: await conn.execute(index_query) logger.info(f"Created index: {index_query.split('idx_')[1].split(' ')[0]}") except Exception as e: logger.warning(f"Failed to create index: {e}") logger.info("Migration completed successfully") except Exception as e: logger.error(f"Migration failed: {e}") raise finally: if 'conn' in locals(): await conn.close() logger.info("Database connection closed") async def main(): """Main migration function.""" logger.info("Starting pricing_levels column migration") await add_pricing_levels_column() logger.info("Migration completed") if __name__ == "__main__": asyncio.run(main())