cuatrolabs-pos-ms / simple_sync.py
MukeshKapoor25's picture
feat(sync): Add comprehensive POS data sync infrastructure and seed data management
9fd3989
#!/usr/bin/env python3
"""
Simple sync script that works without full POS app context.
Directly syncs MongoDB data to PostgreSQL using raw SQL.
"""
import asyncio
import logging
import motor.motor_asyncio
import asyncpg
import os
from datetime import datetime
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MERCHANT_ID = "company_cuatro_beauty_ltd"
# Database connections from .env
MONGODB_URI = os.getenv("MONGODB_URI")
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME")
DATABASE_URL = os.getenv("DATABASE_URL")
# Fix PostgreSQL URL for asyncpg (remove +asyncpg suffix)
if DATABASE_URL and "+asyncpg" in DATABASE_URL:
DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
logger.info("Fixed PostgreSQL URL for asyncpg compatibility")
async def ensure_postgres_schema():
"""Ensure PostgreSQL schema and tables exist"""
try:
conn = await asyncpg.connect(DATABASE_URL)
# Create schema
await conn.execute("CREATE SCHEMA IF NOT EXISTS pos")
# Create customer_ref table
await conn.execute("""
CREATE TABLE IF NOT EXISTS pos.customer_ref (
customer_id VARCHAR PRIMARY KEY,
merchant_id VARCHAR NOT NULL,
name VARCHAR(150) NOT NULL,
phone VARCHAR(20),
email VARCHAR(255),
notes TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'active',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Create staff_ref table
await conn.execute("""
CREATE TABLE IF NOT EXISTS pos.staff_ref (
staff_id VARCHAR PRIMARY KEY,
merchant_id VARCHAR NOT NULL,
name VARCHAR(150) NOT NULL,
phone VARCHAR(20),
email VARCHAR(255),
role VARCHAR(50),
specializations TEXT[],
status VARCHAR(20) NOT NULL DEFAULT 'active',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Create catalogue_service_ref table
await conn.execute("""
CREATE TABLE IF NOT EXISTS pos.catalogue_service_ref (
service_id VARCHAR PRIMARY KEY,
merchant_id VARCHAR NOT NULL,
service_name VARCHAR(200) NOT NULL,
service_code VARCHAR(50) NOT NULL,
category_id VARCHAR,
category_name VARCHAR(100),
description TEXT,
duration_mins INTEGER NOT NULL DEFAULT 30,
price DECIMAL(10,2) NOT NULL DEFAULT 0.00,
currency VARCHAR(3) NOT NULL DEFAULT 'INR',
gst_rate DECIMAL(5,2) NOT NULL DEFAULT 18.00,
status VARCHAR(20) NOT NULL DEFAULT 'active',
sort_order INTEGER NOT NULL DEFAULT 0,
pricing JSONB,
category JSONB,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
""")
# Create indexes
await conn.execute("CREATE INDEX IF NOT EXISTS idx_customer_ref_merchant_id ON pos.customer_ref(merchant_id)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_staff_ref_merchant_id ON pos.staff_ref(merchant_id)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_catalogue_service_ref_merchant_id ON pos.catalogue_service_ref(merchant_id)")
await conn.close()
logger.info("βœ… PostgreSQL schema and tables ensured")
except Exception as e:
logger.error(f"Error ensuring PostgreSQL schema: {e}")
raise
async def sync_customers():
"""Sync customers from MongoDB to PostgreSQL"""
try:
# MongoDB connection
mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI)
mongo_db = mongo_client[MONGODB_DB_NAME]
# PostgreSQL connection
pg_conn = await asyncpg.connect(DATABASE_URL)
# Get customers from MongoDB
customers = await mongo_db.pos_customers.find({"merchant_id": MERCHANT_ID}).to_list(length=None)
if not customers:
logger.warning("No customers found to sync")
return 0
synced_count = 0
for customer in customers:
try:
# Insert or update customer in PostgreSQL
await pg_conn.execute("""
INSERT INTO pos.customer_ref (
customer_id, merchant_id, name, phone, email, notes, status, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (customer_id) DO UPDATE SET
name = EXCLUDED.name,
phone = EXCLUDED.phone,
email = EXCLUDED.email,
notes = EXCLUDED.notes,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
""",
customer.get('customer_id'),
customer.get('merchant_id'),
customer.get('name'),
customer.get('phone'),
customer.get('email'),
customer.get('notes'),
customer.get('status', 'active'),
customer.get('created_at', datetime.utcnow()),
customer.get('updated_at', datetime.utcnow())
)
synced_count += 1
except Exception as e:
logger.error(f"Error syncing customer {customer.get('customer_id')}: {e}")
await pg_conn.close()
mongo_client.close()
logger.info(f"βœ… Synced {synced_count} customers")
return synced_count
except Exception as e:
logger.error(f"Error in customer sync: {e}")
raise
async def sync_staff():
"""Sync staff from MongoDB to PostgreSQL"""
try:
# MongoDB connection
mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI)
mongo_db = mongo_client[MONGODB_DB_NAME]
# PostgreSQL connection
pg_conn = await asyncpg.connect(DATABASE_URL)
# Get staff from MongoDB
staff_list = await mongo_db.pos_staff.find({"merchant_id": MERCHANT_ID}).to_list(length=None)
if not staff_list:
logger.warning("No staff found to sync")
return 0
synced_count = 0
for staff in staff_list:
try:
# Insert or update staff in PostgreSQL
await pg_conn.execute("""
INSERT INTO pos.staff_ref (
staff_id, merchant_id, name, phone, email, role, specializations, status, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (staff_id) DO UPDATE SET
name = EXCLUDED.name,
phone = EXCLUDED.phone,
email = EXCLUDED.email,
role = EXCLUDED.role,
specializations = EXCLUDED.specializations,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
""",
staff.get('_id'),
staff.get('merchant_id'),
staff.get('name'),
staff.get('phone'),
staff.get('email'),
staff.get('role'),
staff.get('specializations', []),
staff.get('status', 'active'),
staff.get('created_at', datetime.utcnow()),
staff.get('updated_at', datetime.utcnow())
)
synced_count += 1
except Exception as e:
logger.error(f"Error syncing staff {staff.get('_id')}: {e}")
await pg_conn.close()
mongo_client.close()
logger.info(f"βœ… Synced {synced_count} staff members")
return synced_count
except Exception as e:
logger.error(f"Error in staff sync: {e}")
raise
async def sync_catalogue_services():
"""Sync catalogue services from MongoDB to PostgreSQL"""
try:
# MongoDB connection
mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI)
mongo_db = mongo_client[MONGODB_DB_NAME]
# PostgreSQL connection
pg_conn = await asyncpg.connect(DATABASE_URL)
# Get services from MongoDB
services = await mongo_db.pos_catalogue_services.find({"merchant_id": MERCHANT_ID}).to_list(length=None)
if not services:
logger.warning("No catalogue services found to sync")
return 0
synced_count = 0
for service in services:
try:
# Extract data
pricing = service.get('pricing', {})
category = service.get('category', {})
# Insert or update service in PostgreSQL
await pg_conn.execute("""
INSERT INTO pos.catalogue_service_ref (
service_id, merchant_id, service_name, service_code, category_id, category_name,
description, duration_mins, price, currency, gst_rate, status, sort_order,
pricing, category, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT (service_id) DO UPDATE SET
service_name = EXCLUDED.service_name,
service_code = EXCLUDED.service_code,
category_id = EXCLUDED.category_id,
category_name = EXCLUDED.category_name,
description = EXCLUDED.description,
duration_mins = EXCLUDED.duration_mins,
price = EXCLUDED.price,
currency = EXCLUDED.currency,
gst_rate = EXCLUDED.gst_rate,
status = EXCLUDED.status,
sort_order = EXCLUDED.sort_order,
pricing = EXCLUDED.pricing,
category = EXCLUDED.category,
updated_at = EXCLUDED.updated_at
""",
service.get('_id'),
service.get('merchant_id'),
service.get('name'),
service.get('code'),
category.get('id') if category else None,
category.get('name') if category else None,
service.get('description'),
service.get('duration_mins', 30),
float(pricing.get('price', 0.0)),
pricing.get('currency', 'INR'),
float(pricing.get('gst_rate', 18.0)),
service.get('status', 'active'),
service.get('sort_order', 0),
pricing if pricing else None,
category if category else None,
service.get('created_at', datetime.utcnow()),
service.get('updated_at', datetime.utcnow())
)
synced_count += 1
except Exception as e:
logger.error(f"Error syncing service {service.get('_id')}: {e}")
await pg_conn.close()
mongo_client.close()
logger.info(f"βœ… Synced {synced_count} catalogue services")
return synced_count
except Exception as e:
logger.error(f"Error in catalogue services sync: {e}")
raise
async def main():
"""Main sync function"""
logger.info("=" * 60)
logger.info("SIMPLE POS SYNC TO POSTGRESQL")
logger.info("=" * 60)
logger.info(f"Merchant ID: {MERCHANT_ID}")
logger.info(f"MongoDB: {MONGODB_DB_NAME}")
logger.info(f"PostgreSQL: Connected")
try:
# Ensure PostgreSQL schema
await ensure_postgres_schema()
# Sync all entities
logger.info("\nπŸ“Š Starting sync operations...")
customer_count = await sync_customers()
staff_count = await sync_staff()
service_count = await sync_catalogue_services()
# Summary
logger.info("\n" + "=" * 60)
logger.info("SYNC SUMMARY")
logger.info("=" * 60)
logger.info(f"Customers: {customer_count}")
logger.info(f"Staff: {staff_count}")
logger.info(f"Services: {service_count}")
logger.info(f"Total: {customer_count + staff_count + service_count}")
logger.info("=" * 60)
logger.info("πŸŽ‰ Sync completed successfully!")
return True
except Exception as e:
logger.error(f"❌ Sync failed: {e}")
return False
if __name__ == "__main__":
success = asyncio.run(main())
exit(0 if success else 1)