cuatrolabs-scm-ms / docs /database /migrations /migration_add_pricing_levels_column.py
MukeshKapoor25's picture
refactor(database): reorganize database scripts and examples into docs directory
f24ee1d
#!/usr/bin/env python3
"""
Migration script to add pricing_levels JSON column to catalogue_ref table.
This migration adds the pricing_levels column to store enhanced pricing data
as JSON in PostgreSQL for better performance and flexibility.
Usage:
python migration_add_pricing_levels_column.py
"""
import asyncio
import asyncpg
from insightfy_utils.logging import get_logger
from app.core.config import settings
logger = get_logger(__name__)
async def add_pricing_levels_column():
"""Add pricing_levels JSON column to trans.catalogue_ref table."""
# Use global settings instance
try:
# Connect to PostgreSQL
conn = await asyncpg.connect(
host=settings.POSTGRES_HOST,
port=settings.POSTGRES_PORT,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB
)
logger.info("Connected to PostgreSQL database")
# Check if pricing_levels column already exists
check_query = """
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'trans'
AND table_name = 'catalogue_ref'
AND column_name = 'pricing_levels'
LIMIT 1
"""
result = await conn.fetchval(check_query)
if result:
logger.info("pricing_levels column already exists in trans.catalogue_ref")
return
# Add pricing_levels JSON column
alter_query = """
ALTER TABLE trans.catalogue_ref
ADD COLUMN pricing_levels JSON
"""
await conn.execute(alter_query)
logger.info("Successfully added pricing_levels JSON column to trans.catalogue_ref")
# Create index on pricing_levels for better query performance
index_queries = [
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_catalogue_ref_pricing_levels_currency ON trans.catalogue_ref USING GIN ((pricing_levels->>'currency'))",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_catalogue_ref_pricing_levels_mrp ON trans.catalogue_ref USING BTREE (CAST(pricing_levels->>'mrp' AS NUMERIC))",
]
for index_query in index_queries:
try:
await conn.execute(index_query)
logger.info(f"Created index: {index_query.split('idx_')[1].split(' ')[0]}")
except Exception as e:
logger.warning(f"Failed to create index: {e}")
logger.info("Migration completed successfully")
except Exception as e:
logger.error(f"Migration failed: {e}")
raise
finally:
if 'conn' in locals():
await conn.close()
logger.info("Database connection closed")
async def main():
"""Main migration function."""
logger.info("Starting pricing_levels column migration")
await add_pricing_levels_column()
logger.info("Migration completed")
if __name__ == "__main__":
asyncio.run(main())