cuatrolabs-tracker-ms / migrate_tasks.py
Michael-Antony's picture
feat: implement tasks API with GET today and PATCH status endpoints
32eb084
"""
Migration script to create tasks table and indexes.
Run this script to set up the database schema for task tracking.
"""
import asyncio
import asyncpg
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def run_migration():
"""Create tasks table and indexes"""
# Get database connection details
db_host = os.getenv("DB_HOST", "localhost")
db_port = int(os.getenv("DB_PORT", "5432"))
db_name = os.getenv("DB_NAME", "cuatrolabs")
db_user = os.getenv("DB_USER", "postgres")
db_password = os.getenv("DB_PASSWORD", "")
print("=" * 80)
print("TASKS TABLE MIGRATION")
print("=" * 80)
print(f"Host: {db_host}")
print(f"Port: {db_port}")
print(f"Database: {db_name}")
print(f"User: {db_user}")
print("=" * 80)
try:
# Connect to database
print("\n[1/3] Connecting to PostgreSQL...")
conn = await asyncpg.connect(
host=db_host,
port=db_port,
database=db_name,
user=db_user,
password=db_password
)
print("βœ… Connected successfully")
# Create tasks table
print("\n[3/4] Creating trans.scm_tasks table...")
create_table_sql = """
CREATE TABLE IF NOT EXISTS trans.scm_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
merchant_id UUID NOT NULL,
assigned_to UUID NOT NULL,
title TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'not_started',
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
address TEXT,
scheduled_at TIMESTAMP,
started_at BIGINT,
completed_at BIGINT,
created_at TIMESTAMP DEFAULT now(),
updated_at TIMESTAMP DEFAULT now(),
CONSTRAINT chk_status CHECK (status IN ('not_started', 'in_progress', 'completed'))
)
"""
await conn.execute(create_table_sql)
print("βœ… Table 'trans.scm_tasks' created/verified")
# Create indexes
print("\n[4/4] Creating indexes...")
# Index for assigned_to + scheduled_at lookups
index_sql = """
CREATE INDEX IF NOT EXISTS idx_scm_tasks_assigned_date
ON trans.scm_tasks (assigned_to, scheduled_at)
"""
await conn.execute(index_sql)
print("βœ… Index 'idx_scm_tasks_assigned_date' created")
# Index for merchant + status lookups
index_merchant_sql = """
CREATE INDEX IF NOT EXISTS idx_scm_tasks_merchant_status
ON trans.scm_tasks (merchant_id, status)
"""
await conn.execute(index_merchant_sql)
print("βœ… Index 'idx_scm_tasks_merchant_status' created")
# Index for status + scheduled_at
index_status_sql = """
CREATE INDEX IF NOT EXISTS idx_scm_tasks_status_scheduled
ON trans.scm_tasks (status, scheduled_at)
"""
await conn.execute(index_status_sql)
print("βœ… Index 'idx_scm_tasks_status_scheduled' created")
# Verify table structure
print("\n[VERIFICATION] Checking table structure...")
columns = await conn.fetch("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'trans' AND table_name = 'scm_tasks'
ORDER BY ordinal_position
""")
print("\nTable Structure:")
print("-" * 80)
for col in columns:
nullable = "NULL" if col['is_nullable'] == 'YES' else "NOT NULL"
default = f"DEFAULT {col['column_default']}" if col['column_default'] else ""
print(f" {col['column_name']:<20} {col['data_type']:<20} {nullable:<10} {default}")
print("-" * 80)
# Check indexes
indexes = await conn.fetch("""
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = 'trans' AND tablename = 'scm_tasks'
""")
print("\nIndexes:")
print("-" * 80)
for index in indexes:
print(f" {index['indexname']}")
print("-" * 80)
# Close connection
await conn.close()
print("\n" + "=" * 80)
print("βœ… MIGRATION COMPLETED SUCCESSFULLY")
print("=" * 80)
print("\nNext steps:")
print("1. Start the Tracker microservice")
print("2. Test the tasks endpoint: GET /tracker/tasks/today")
print("3. Check the API documentation: http://localhost:8003/docs")
print("=" * 80)
except Exception as e:
print(f"\n❌ Migration failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(run_migration())