Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Simple sync script that works without full POS app context. | |
| Directly syncs MongoDB data to PostgreSQL using raw SQL. | |
| """ | |
| import asyncio | |
| import logging | |
| import motor.motor_asyncio | |
| import asyncpg | |
| import os | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| MERCHANT_ID = "company_cuatro_beauty_ltd" | |
| # Database connections from .env | |
| MONGODB_URI = os.getenv("MONGODB_URI") | |
| MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME") | |
| DATABASE_URL = os.getenv("DATABASE_URL") | |
| # Fix PostgreSQL URL for asyncpg (remove +asyncpg suffix) | |
| if DATABASE_URL and "+asyncpg" in DATABASE_URL: | |
| DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://") | |
| logger.info("Fixed PostgreSQL URL for asyncpg compatibility") | |
| async def ensure_postgres_schema(): | |
| """Ensure PostgreSQL schema and tables exist""" | |
| try: | |
| conn = await asyncpg.connect(DATABASE_URL) | |
| # Create schema | |
| await conn.execute("CREATE SCHEMA IF NOT EXISTS pos") | |
| # Create customer_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS pos.customer_ref ( | |
| customer_id VARCHAR PRIMARY KEY, | |
| merchant_id VARCHAR NOT NULL, | |
| name VARCHAR(150) NOT NULL, | |
| phone VARCHAR(20), | |
| email VARCHAR(255), | |
| notes TEXT, | |
| status VARCHAR(20) NOT NULL DEFAULT 'active', | |
| created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Create staff_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS pos.staff_ref ( | |
| staff_id VARCHAR PRIMARY KEY, | |
| merchant_id VARCHAR NOT NULL, | |
| name VARCHAR(150) NOT NULL, | |
| phone VARCHAR(20), | |
| email VARCHAR(255), | |
| role VARCHAR(50), | |
| specializations TEXT[], | |
| status VARCHAR(20) NOT NULL DEFAULT 'active', | |
| created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Create catalogue_service_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS pos.catalogue_service_ref ( | |
| service_id VARCHAR PRIMARY KEY, | |
| merchant_id VARCHAR NOT NULL, | |
| service_name VARCHAR(200) NOT NULL, | |
| service_code VARCHAR(50) NOT NULL, | |
| category_id VARCHAR, | |
| category_name VARCHAR(100), | |
| description TEXT, | |
| duration_mins INTEGER NOT NULL DEFAULT 30, | |
| price DECIMAL(10,2) NOT NULL DEFAULT 0.00, | |
| currency VARCHAR(3) NOT NULL DEFAULT 'INR', | |
| gst_rate DECIMAL(5,2) NOT NULL DEFAULT 18.00, | |
| status VARCHAR(20) NOT NULL DEFAULT 'active', | |
| sort_order INTEGER NOT NULL DEFAULT 0, | |
| pricing JSONB, | |
| category JSONB, | |
| created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Create indexes | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_customer_ref_merchant_id ON pos.customer_ref(merchant_id)") | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_staff_ref_merchant_id ON pos.staff_ref(merchant_id)") | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_catalogue_service_ref_merchant_id ON pos.catalogue_service_ref(merchant_id)") | |
| await conn.close() | |
| logger.info("β PostgreSQL schema and tables ensured") | |
| except Exception as e: | |
| logger.error(f"Error ensuring PostgreSQL schema: {e}") | |
| raise | |
| async def sync_customers(): | |
| """Sync customers from MongoDB to PostgreSQL""" | |
| try: | |
| # MongoDB connection | |
| mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) | |
| mongo_db = mongo_client[MONGODB_DB_NAME] | |
| # PostgreSQL connection | |
| pg_conn = await asyncpg.connect(DATABASE_URL) | |
| # Get customers from MongoDB | |
| customers = await mongo_db.pos_customers.find({"merchant_id": MERCHANT_ID}).to_list(length=None) | |
| if not customers: | |
| logger.warning("No customers found to sync") | |
| return 0 | |
| synced_count = 0 | |
| for customer in customers: | |
| try: | |
| # Insert or update customer in PostgreSQL | |
| await pg_conn.execute(""" | |
| INSERT INTO pos.customer_ref ( | |
| customer_id, merchant_id, name, phone, email, notes, status, created_at, updated_at | |
| ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) | |
| ON CONFLICT (customer_id) DO UPDATE SET | |
| name = EXCLUDED.name, | |
| phone = EXCLUDED.phone, | |
| email = EXCLUDED.email, | |
| notes = EXCLUDED.notes, | |
| status = EXCLUDED.status, | |
| updated_at = EXCLUDED.updated_at | |
| """, | |
| customer.get('customer_id'), | |
| customer.get('merchant_id'), | |
| customer.get('name'), | |
| customer.get('phone'), | |
| customer.get('email'), | |
| customer.get('notes'), | |
| customer.get('status', 'active'), | |
| customer.get('created_at', datetime.utcnow()), | |
| customer.get('updated_at', datetime.utcnow()) | |
| ) | |
| synced_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error syncing customer {customer.get('customer_id')}: {e}") | |
| await pg_conn.close() | |
| mongo_client.close() | |
| logger.info(f"β Synced {synced_count} customers") | |
| return synced_count | |
| except Exception as e: | |
| logger.error(f"Error in customer sync: {e}") | |
| raise | |
| async def sync_staff(): | |
| """Sync staff from MongoDB to PostgreSQL""" | |
| try: | |
| # MongoDB connection | |
| mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) | |
| mongo_db = mongo_client[MONGODB_DB_NAME] | |
| # PostgreSQL connection | |
| pg_conn = await asyncpg.connect(DATABASE_URL) | |
| # Get staff from MongoDB | |
| staff_list = await mongo_db.pos_staff.find({"merchant_id": MERCHANT_ID}).to_list(length=None) | |
| if not staff_list: | |
| logger.warning("No staff found to sync") | |
| return 0 | |
| synced_count = 0 | |
| for staff in staff_list: | |
| try: | |
| # Insert or update staff in PostgreSQL | |
| await pg_conn.execute(""" | |
| INSERT INTO pos.staff_ref ( | |
| staff_id, merchant_id, name, phone, email, role, specializations, status, created_at, updated_at | |
| ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) | |
| ON CONFLICT (staff_id) DO UPDATE SET | |
| name = EXCLUDED.name, | |
| phone = EXCLUDED.phone, | |
| email = EXCLUDED.email, | |
| role = EXCLUDED.role, | |
| specializations = EXCLUDED.specializations, | |
| status = EXCLUDED.status, | |
| updated_at = EXCLUDED.updated_at | |
| """, | |
| staff.get('_id'), | |
| staff.get('merchant_id'), | |
| staff.get('name'), | |
| staff.get('phone'), | |
| staff.get('email'), | |
| staff.get('role'), | |
| staff.get('specializations', []), | |
| staff.get('status', 'active'), | |
| staff.get('created_at', datetime.utcnow()), | |
| staff.get('updated_at', datetime.utcnow()) | |
| ) | |
| synced_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error syncing staff {staff.get('_id')}: {e}") | |
| await pg_conn.close() | |
| mongo_client.close() | |
| logger.info(f"β Synced {synced_count} staff members") | |
| return synced_count | |
| except Exception as e: | |
| logger.error(f"Error in staff sync: {e}") | |
| raise | |
| async def sync_catalogue_services(): | |
| """Sync catalogue services from MongoDB to PostgreSQL""" | |
| try: | |
| # MongoDB connection | |
| mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) | |
| mongo_db = mongo_client[MONGODB_DB_NAME] | |
| # PostgreSQL connection | |
| pg_conn = await asyncpg.connect(DATABASE_URL) | |
| # Get services from MongoDB | |
| services = await mongo_db.pos_catalogue_services.find({"merchant_id": MERCHANT_ID}).to_list(length=None) | |
| if not services: | |
| logger.warning("No catalogue services found to sync") | |
| return 0 | |
| synced_count = 0 | |
| for service in services: | |
| try: | |
| # Extract data | |
| pricing = service.get('pricing', {}) | |
| category = service.get('category', {}) | |
| # Insert or update service in PostgreSQL | |
| await pg_conn.execute(""" | |
| INSERT INTO pos.catalogue_service_ref ( | |
| service_id, merchant_id, service_name, service_code, category_id, category_name, | |
| description, duration_mins, price, currency, gst_rate, status, sort_order, | |
| pricing, category, created_at, updated_at | |
| ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) | |
| ON CONFLICT (service_id) DO UPDATE SET | |
| service_name = EXCLUDED.service_name, | |
| service_code = EXCLUDED.service_code, | |
| category_id = EXCLUDED.category_id, | |
| category_name = EXCLUDED.category_name, | |
| description = EXCLUDED.description, | |
| duration_mins = EXCLUDED.duration_mins, | |
| price = EXCLUDED.price, | |
| currency = EXCLUDED.currency, | |
| gst_rate = EXCLUDED.gst_rate, | |
| status = EXCLUDED.status, | |
| sort_order = EXCLUDED.sort_order, | |
| pricing = EXCLUDED.pricing, | |
| category = EXCLUDED.category, | |
| updated_at = EXCLUDED.updated_at | |
| """, | |
| service.get('_id'), | |
| service.get('merchant_id'), | |
| service.get('name'), | |
| service.get('code'), | |
| category.get('id') if category else None, | |
| category.get('name') if category else None, | |
| service.get('description'), | |
| service.get('duration_mins', 30), | |
| float(pricing.get('price', 0.0)), | |
| pricing.get('currency', 'INR'), | |
| float(pricing.get('gst_rate', 18.0)), | |
| service.get('status', 'active'), | |
| service.get('sort_order', 0), | |
| pricing if pricing else None, | |
| category if category else None, | |
| service.get('created_at', datetime.utcnow()), | |
| service.get('updated_at', datetime.utcnow()) | |
| ) | |
| synced_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error syncing service {service.get('_id')}: {e}") | |
| await pg_conn.close() | |
| mongo_client.close() | |
| logger.info(f"β Synced {synced_count} catalogue services") | |
| return synced_count | |
| except Exception as e: | |
| logger.error(f"Error in catalogue services sync: {e}") | |
| raise | |
| async def main(): | |
| """Main sync function""" | |
| logger.info("=" * 60) | |
| logger.info("SIMPLE POS SYNC TO POSTGRESQL") | |
| logger.info("=" * 60) | |
| logger.info(f"Merchant ID: {MERCHANT_ID}") | |
| logger.info(f"MongoDB: {MONGODB_DB_NAME}") | |
| logger.info(f"PostgreSQL: Connected") | |
| try: | |
| # Ensure PostgreSQL schema | |
| await ensure_postgres_schema() | |
| # Sync all entities | |
| logger.info("\nπ Starting sync operations...") | |
| customer_count = await sync_customers() | |
| staff_count = await sync_staff() | |
| service_count = await sync_catalogue_services() | |
| # Summary | |
| logger.info("\n" + "=" * 60) | |
| logger.info("SYNC SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info(f"Customers: {customer_count}") | |
| logger.info(f"Staff: {staff_count}") | |
| logger.info(f"Services: {service_count}") | |
| logger.info(f"Total: {customer_count + staff_count + service_count}") | |
| logger.info("=" * 60) | |
| logger.info("π Sync completed successfully!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Sync failed: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| success = asyncio.run(main()) | |
| exit(0 if success else 1) |