#!/usr/bin/env python3 """ Simple sync script that works without full POS app context. Directly syncs MongoDB data to PostgreSQL using raw SQL. """ import asyncio import logging import motor.motor_asyncio import asyncpg import os from datetime import datetime from dotenv import load_dotenv # Load environment variables load_dotenv() # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) MERCHANT_ID = "company_cuatro_beauty_ltd" # Database connections from .env MONGODB_URI = os.getenv("MONGODB_URI") MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME") DATABASE_URL = os.getenv("DATABASE_URL") # Fix PostgreSQL URL for asyncpg (remove +asyncpg suffix) if DATABASE_URL and "+asyncpg" in DATABASE_URL: DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://") logger.info("Fixed PostgreSQL URL for asyncpg compatibility") async def ensure_postgres_schema(): """Ensure PostgreSQL schema and tables exist""" try: conn = await asyncpg.connect(DATABASE_URL) # Create schema await conn.execute("CREATE SCHEMA IF NOT EXISTS pos") # Create customer_ref table await conn.execute(""" CREATE TABLE IF NOT EXISTS pos.customer_ref ( customer_id VARCHAR PRIMARY KEY, merchant_id VARCHAR NOT NULL, name VARCHAR(150) NOT NULL, phone VARCHAR(20), email VARCHAR(255), notes TEXT, status VARCHAR(20) NOT NULL DEFAULT 'active', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ) """) # Create staff_ref table await conn.execute(""" CREATE TABLE IF NOT EXISTS pos.staff_ref ( staff_id VARCHAR PRIMARY KEY, merchant_id VARCHAR NOT NULL, name VARCHAR(150) NOT NULL, phone VARCHAR(20), email VARCHAR(255), role VARCHAR(50), specializations TEXT[], status VARCHAR(20) NOT NULL DEFAULT 'active', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ) """) # Create catalogue_service_ref table await conn.execute(""" CREATE TABLE IF NOT EXISTS pos.catalogue_service_ref ( service_id VARCHAR PRIMARY KEY, merchant_id VARCHAR NOT NULL, service_name VARCHAR(200) NOT NULL, service_code VARCHAR(50) NOT NULL, category_id VARCHAR, category_name VARCHAR(100), description TEXT, duration_mins INTEGER NOT NULL DEFAULT 30, price DECIMAL(10,2) NOT NULL DEFAULT 0.00, currency VARCHAR(3) NOT NULL DEFAULT 'INR', gst_rate DECIMAL(5,2) NOT NULL DEFAULT 18.00, status VARCHAR(20) NOT NULL DEFAULT 'active', sort_order INTEGER NOT NULL DEFAULT 0, pricing JSONB, category JSONB, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ) """) # Create indexes await conn.execute("CREATE INDEX IF NOT EXISTS idx_customer_ref_merchant_id ON pos.customer_ref(merchant_id)") await conn.execute("CREATE INDEX IF NOT EXISTS idx_staff_ref_merchant_id ON pos.staff_ref(merchant_id)") await conn.execute("CREATE INDEX IF NOT EXISTS idx_catalogue_service_ref_merchant_id ON pos.catalogue_service_ref(merchant_id)") await conn.close() logger.info("āœ… PostgreSQL schema and tables ensured") except Exception as e: logger.error(f"Error ensuring PostgreSQL schema: {e}") raise async def sync_customers(): """Sync customers from MongoDB to PostgreSQL""" try: # MongoDB connection mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) mongo_db = mongo_client[MONGODB_DB_NAME] # PostgreSQL connection pg_conn = await asyncpg.connect(DATABASE_URL) # Get customers from MongoDB customers = await mongo_db.pos_customers.find({"merchant_id": MERCHANT_ID}).to_list(length=None) if not customers: logger.warning("No customers found to sync") return 0 synced_count = 0 for customer in customers: try: # Insert or update customer in PostgreSQL await pg_conn.execute(""" INSERT INTO pos.customer_ref ( customer_id, merchant_id, name, phone, email, notes, status, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (customer_id) DO UPDATE SET name = EXCLUDED.name, phone = EXCLUDED.phone, email = EXCLUDED.email, notes = EXCLUDED.notes, status = EXCLUDED.status, updated_at = EXCLUDED.updated_at """, customer.get('customer_id'), customer.get('merchant_id'), customer.get('name'), customer.get('phone'), customer.get('email'), customer.get('notes'), customer.get('status', 'active'), customer.get('created_at', datetime.utcnow()), customer.get('updated_at', datetime.utcnow()) ) synced_count += 1 except Exception as e: logger.error(f"Error syncing customer {customer.get('customer_id')}: {e}") await pg_conn.close() mongo_client.close() logger.info(f"āœ… Synced {synced_count} customers") return synced_count except Exception as e: logger.error(f"Error in customer sync: {e}") raise async def sync_staff(): """Sync staff from MongoDB to PostgreSQL""" try: # MongoDB connection mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) mongo_db = mongo_client[MONGODB_DB_NAME] # PostgreSQL connection pg_conn = await asyncpg.connect(DATABASE_URL) # Get staff from MongoDB staff_list = await mongo_db.pos_staff.find({"merchant_id": MERCHANT_ID}).to_list(length=None) if not staff_list: logger.warning("No staff found to sync") return 0 synced_count = 0 for staff in staff_list: try: # Insert or update staff in PostgreSQL await pg_conn.execute(""" INSERT INTO pos.staff_ref ( staff_id, merchant_id, name, phone, email, role, specializations, status, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (staff_id) DO UPDATE SET name = EXCLUDED.name, phone = EXCLUDED.phone, email = EXCLUDED.email, role = EXCLUDED.role, specializations = EXCLUDED.specializations, status = EXCLUDED.status, updated_at = EXCLUDED.updated_at """, staff.get('_id'), staff.get('merchant_id'), staff.get('name'), staff.get('phone'), staff.get('email'), staff.get('role'), staff.get('specializations', []), staff.get('status', 'active'), staff.get('created_at', datetime.utcnow()), staff.get('updated_at', datetime.utcnow()) ) synced_count += 1 except Exception as e: logger.error(f"Error syncing staff {staff.get('_id')}: {e}") await pg_conn.close() mongo_client.close() logger.info(f"āœ… Synced {synced_count} staff members") return synced_count except Exception as e: logger.error(f"Error in staff sync: {e}") raise async def sync_catalogue_services(): """Sync catalogue services from MongoDB to PostgreSQL""" try: # MongoDB connection mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) mongo_db = mongo_client[MONGODB_DB_NAME] # PostgreSQL connection pg_conn = await asyncpg.connect(DATABASE_URL) # Get services from MongoDB services = await mongo_db.pos_catalogue_services.find({"merchant_id": MERCHANT_ID}).to_list(length=None) if not services: logger.warning("No catalogue services found to sync") return 0 synced_count = 0 for service in services: try: # Extract data pricing = service.get('pricing', {}) category = service.get('category', {}) # Insert or update service in PostgreSQL await pg_conn.execute(""" INSERT INTO pos.catalogue_service_ref ( service_id, merchant_id, service_name, service_code, category_id, category_name, description, duration_mins, price, currency, gst_rate, status, sort_order, pricing, category, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (service_id) DO UPDATE SET service_name = EXCLUDED.service_name, service_code = EXCLUDED.service_code, category_id = EXCLUDED.category_id, category_name = EXCLUDED.category_name, description = EXCLUDED.description, duration_mins = EXCLUDED.duration_mins, price = EXCLUDED.price, currency = EXCLUDED.currency, gst_rate = EXCLUDED.gst_rate, status = EXCLUDED.status, sort_order = EXCLUDED.sort_order, pricing = EXCLUDED.pricing, category = EXCLUDED.category, updated_at = EXCLUDED.updated_at """, service.get('_id'), service.get('merchant_id'), service.get('name'), service.get('code'), category.get('id') if category else None, category.get('name') if category else None, service.get('description'), service.get('duration_mins', 30), float(pricing.get('price', 0.0)), pricing.get('currency', 'INR'), float(pricing.get('gst_rate', 18.0)), service.get('status', 'active'), service.get('sort_order', 0), pricing if pricing else None, category if category else None, service.get('created_at', datetime.utcnow()), service.get('updated_at', datetime.utcnow()) ) synced_count += 1 except Exception as e: logger.error(f"Error syncing service {service.get('_id')}: {e}") await pg_conn.close() mongo_client.close() logger.info(f"āœ… Synced {synced_count} catalogue services") return synced_count except Exception as e: logger.error(f"Error in catalogue services sync: {e}") raise async def main(): """Main sync function""" logger.info("=" * 60) logger.info("SIMPLE POS SYNC TO POSTGRESQL") logger.info("=" * 60) logger.info(f"Merchant ID: {MERCHANT_ID}") logger.info(f"MongoDB: {MONGODB_DB_NAME}") logger.info(f"PostgreSQL: Connected") try: # Ensure PostgreSQL schema await ensure_postgres_schema() # Sync all entities logger.info("\nšŸ“Š Starting sync operations...") customer_count = await sync_customers() staff_count = await sync_staff() service_count = await sync_catalogue_services() # Summary logger.info("\n" + "=" * 60) logger.info("SYNC SUMMARY") logger.info("=" * 60) logger.info(f"Customers: {customer_count}") logger.info(f"Staff: {staff_count}") logger.info(f"Services: {service_count}") logger.info(f"Total: {customer_count + staff_count + service_count}") logger.info("=" * 60) logger.info("šŸŽ‰ Sync completed successfully!") return True except Exception as e: logger.error(f"āŒ Sync failed: {e}") return False if __name__ == "__main__": success = asyncio.run(main()) exit(0 if success else 1)