""" Migration script to create tasks table and indexes. Run this script to set up the database schema for task tracking. """ import asyncio import asyncpg import os from dotenv import load_dotenv # Load environment variables load_dotenv() async def run_migration(): """Create tasks table and indexes""" # Get database connection details db_host = os.getenv("DB_HOST", "localhost") db_port = int(os.getenv("DB_PORT", "5432")) db_name = os.getenv("DB_NAME", "cuatrolabs") db_user = os.getenv("DB_USER", "postgres") db_password = os.getenv("DB_PASSWORD", "") print("=" * 80) print("TASKS TABLE MIGRATION") print("=" * 80) print(f"Host: {db_host}") print(f"Port: {db_port}") print(f"Database: {db_name}") print(f"User: {db_user}") print("=" * 80) try: # Connect to database print("\n[1/3] Connecting to PostgreSQL...") conn = await asyncpg.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password ) print("✅ Connected successfully") # Create tasks table print("\n[3/4] Creating trans.scm_tasks table...") create_table_sql = """ CREATE TABLE IF NOT EXISTS trans.scm_tasks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), merchant_id UUID NOT NULL, assigned_to UUID NOT NULL, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'not_started', latitude DOUBLE PRECISION, longitude DOUBLE PRECISION, address TEXT, scheduled_at TIMESTAMP, started_at BIGINT, completed_at BIGINT, created_at TIMESTAMP DEFAULT now(), updated_at TIMESTAMP DEFAULT now(), CONSTRAINT chk_status CHECK (status IN ('not_started', 'in_progress', 'completed')) ) """ await conn.execute(create_table_sql) print("✅ Table 'trans.scm_tasks' created/verified") # Create indexes print("\n[4/4] Creating indexes...") # Index for assigned_to + scheduled_at lookups index_sql = """ CREATE INDEX IF NOT EXISTS idx_scm_tasks_assigned_date ON trans.scm_tasks (assigned_to, scheduled_at) """ await conn.execute(index_sql) print("✅ Index 'idx_scm_tasks_assigned_date' created") # Index for merchant + status lookups index_merchant_sql = """ CREATE INDEX IF NOT EXISTS idx_scm_tasks_merchant_status ON trans.scm_tasks (merchant_id, status) """ await conn.execute(index_merchant_sql) print("✅ Index 'idx_scm_tasks_merchant_status' created") # Index for status + scheduled_at index_status_sql = """ CREATE INDEX IF NOT EXISTS idx_scm_tasks_status_scheduled ON trans.scm_tasks (status, scheduled_at) """ await conn.execute(index_status_sql) print("✅ Index 'idx_scm_tasks_status_scheduled' created") # Verify table structure print("\n[VERIFICATION] Checking table structure...") columns = await conn.fetch(""" SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'trans' AND table_name = 'scm_tasks' ORDER BY ordinal_position """) print("\nTable Structure:") print("-" * 80) for col in columns: nullable = "NULL" if col['is_nullable'] == 'YES' else "NOT NULL" default = f"DEFAULT {col['column_default']}" if col['column_default'] else "" print(f" {col['column_name']:<20} {col['data_type']:<20} {nullable:<10} {default}") print("-" * 80) # Check indexes indexes = await conn.fetch(""" SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = 'trans' AND tablename = 'scm_tasks' """) print("\nIndexes:") print("-" * 80) for index in indexes: print(f" {index['indexname']}") print("-" * 80) # Close connection await conn.close() print("\n" + "=" * 80) print("✅ MIGRATION COMPLETED SUCCESSFULLY") print("=" * 80) print("\nNext steps:") print("1. Start the Tracker microservice") print("2. Test the tasks endpoint: GET /tracker/tasks/today") print("3. Check the API documentation: http://localhost:8003/docs") print("=" * 80) except Exception as e: print(f"\n❌ Migration failed: {e}") raise if __name__ == "__main__": asyncio.run(run_migration())