""" Utility functions for POS sync operations. """ from typing import Dict, Any, List, Optional import logging from datetime import datetime import asyncio from app.nosql import get_mongo_db from app.sql import get_postgres_session from app.sync.sync_service import POSSyncService logger = logging.getLogger(__name__) async def migrate_existing_customers(merchant_id: Optional[str] = None, limit: int = 1000): """ Migrate existing customers from MongoDB to PostgreSQL. Args: merchant_id: Optional merchant ID to filter customers limit: Maximum number of customers to migrate """ try: # Get MongoDB connection mongo_db = await get_mongo_db() customers_collection = mongo_db.pos_customers # Build query query = {} if merchant_id: query["merchant_id"] = merchant_id # Get PostgreSQL session async with get_postgres_session() as pg_session: sync_service = POSSyncService(pg_session) # Fetch customers from MongoDB cursor = customers_collection.find(query).limit(limit) customers = await cursor.to_list(length=limit) migrated_count = 0 error_count = 0 for customer_doc in customers: try: # Convert to CustomerModel from app.customers.models.model import CustomerModel customer = CustomerModel(**customer_doc) # Sync to PostgreSQL await sync_service.sync_customer_create(customer) migrated_count += 1 if migrated_count % 100 == 0: logger.info(f"Migrated {migrated_count} customers...") except Exception as e: logger.error(f"Error migrating customer {customer_doc.get('customer_id')}: {e}") error_count += 1 logger.info(f"Customer migration completed: {migrated_count} migrated, {error_count} errors") return {"migrated": migrated_count, "errors": error_count} except Exception as e: logger.error(f"Error in customer migration: {e}") raise async def migrate_existing_catalogue_services(merchant_id: Optional[str] = None, limit: int = 1000): """ Migrate existing catalogue services from MongoDB to PostgreSQL. Args: merchant_id: Optional merchant ID to filter services limit: Maximum number of services to migrate """ try: # Get MongoDB connection mongo_db = await get_mongo_db() services_collection = mongo_db.pos_catalogue_services # Build query query = {} if merchant_id: query["merchant_id"] = merchant_id # Get PostgreSQL session async with get_postgres_session() as pg_session: sync_service = POSSyncService(pg_session) # Fetch services from MongoDB cursor = services_collection.find(query).limit(limit) services = await cursor.to_list(length=limit) migrated_count = 0 error_count = 0 for service_doc in services: try: # Sync to PostgreSQL await sync_service.sync_catalogue_service_create(service_doc) migrated_count += 1 if migrated_count % 100 == 0: logger.info(f"Migrated {migrated_count} catalogue services...") except Exception as e: logger.error(f"Error migrating catalogue service {service_doc.get('_id')}: {e}") error_count += 1 logger.info(f"Catalogue service migration completed: {migrated_count} migrated, {error_count} errors") return {"migrated": migrated_count, "errors": error_count} except Exception as e: logger.error(f"Error in catalogue service migration: {e}") raise async def migrate_existing_staff(merchant_id: Optional[str] = None, limit: int = 1000): """ Migrate existing staff from MongoDB to PostgreSQL. Args: merchant_id: Optional merchant ID to filter staff limit: Maximum number of staff to migrate """ try: # Get MongoDB connection mongo_db = await get_mongo_db() staff_collection = mongo_db.pos_staff # Build query query = {} if merchant_id: query["merchant_id"] = merchant_id # Get PostgreSQL session async with get_postgres_session() as pg_session: sync_service = POSSyncService(pg_session) # Fetch staff from MongoDB cursor = staff_collection.find(query).limit(limit) staff_list = await cursor.to_list(length=limit) migrated_count = 0 error_count = 0 for staff_doc in staff_list: try: # Sync to PostgreSQL await sync_service.sync_staff_create(staff_doc) migrated_count += 1 if migrated_count % 100 == 0: logger.info(f"Migrated {migrated_count} staff...") except Exception as e: logger.error(f"Error migrating staff {staff_doc.get('staff_id')}: {e}") error_count += 1 logger.info(f"Staff migration completed: {migrated_count} migrated, {error_count} errors") return {"migrated": migrated_count, "errors": error_count} except Exception as e: logger.error(f"Error in staff migration: {e}") raise async def verify_sync_consistency(merchant_id: str, entity_type: str = "customers"): """ Verify consistency between MongoDB and PostgreSQL data. Args: merchant_id: Merchant ID to check entity_type: Type of entity to check ("customers", "staff", or "catalogue_services") """ try: # Get connections mongo_db = await get_mongo_db() async with get_postgres_session() as pg_session: sync_service = POSSyncService(pg_session) if entity_type == "customers": # Get MongoDB customers mongo_customers = await mongo_db.pos_customers.find({"merchant_id": merchant_id}).to_list(length=None) mongo_ids = {c["customer_id"] for c in mongo_customers} # Get PostgreSQL customers pg_customers = await sync_service.customer_sync.list_customers_by_merchant(merchant_id, limit=10000) pg_ids = {c.customer_id for c in pg_customers} # Compare missing_in_pg = mongo_ids - pg_ids extra_in_pg = pg_ids - mongo_ids logger.info(f"Consistency check for customers in merchant {merchant_id}:") logger.info(f"MongoDB count: {len(mongo_ids)}") logger.info(f"PostgreSQL count: {len(pg_ids)}") logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") return { "mongo_count": len(mongo_ids), "postgres_count": len(pg_ids), "missing_in_postgres": list(missing_in_pg), "extra_in_postgres": list(extra_in_pg) } elif entity_type == "staff": # Get MongoDB staff mongo_staff = await mongo_db.pos_staff.find({"merchant_id": merchant_id}).to_list(length=None) mongo_ids = {s["staff_id"] for s in mongo_staff} # Get PostgreSQL staff pg_staff = await sync_service.staff_sync.list_staff_by_merchant(merchant_id, limit=10000) pg_ids = {s.staff_id for s in pg_staff} # Compare missing_in_pg = mongo_ids - pg_ids extra_in_pg = pg_ids - mongo_ids logger.info(f"Consistency check for staff in merchant {merchant_id}:") logger.info(f"MongoDB count: {len(mongo_ids)}") logger.info(f"PostgreSQL count: {len(pg_ids)}") logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") return { "mongo_count": len(mongo_ids), "postgres_count": len(pg_ids), "missing_in_postgres": list(missing_in_pg), "extra_in_postgres": list(extra_in_pg) } elif entity_type == "catalogue_services": # Get MongoDB catalogue services mongo_services = await mongo_db.pos_catalogue_services.find({"merchant_id": merchant_id}).to_list(length=None) mongo_ids = {s["_id"] for s in mongo_services} # Get PostgreSQL catalogue services pg_services = await sync_service.catalogue_service_sync.list_catalogue_services_by_merchant(merchant_id, limit=10000) pg_ids = {s.service_id for s in pg_services} # Compare missing_in_pg = mongo_ids - pg_ids extra_in_pg = pg_ids - mongo_ids logger.info(f"Consistency check for catalogue services in merchant {merchant_id}:") logger.info(f"MongoDB count: {len(mongo_ids)}") logger.info(f"PostgreSQL count: {len(pg_ids)}") logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") return { "mongo_count": len(mongo_ids), "postgres_count": len(pg_ids), "missing_in_postgres": list(missing_in_pg), "extra_in_postgres": list(extra_in_pg) } except Exception as e: logger.error(f"Error in consistency check: {e}") raise async def cleanup_orphaned_records(merchant_id: str, entity_type: str = "customers"): """ Clean up orphaned records in PostgreSQL that don't exist in MongoDB. Args: merchant_id: Merchant ID to clean up entity_type: Type of entity to clean up ("customers", "staff", or "catalogue_services") """ try: consistency_result = await verify_sync_consistency(merchant_id, entity_type) extra_in_postgres = consistency_result.get("extra_in_postgres", []) if not extra_in_postgres: logger.info(f"No orphaned {entity_type} records found for merchant {merchant_id}") return {"cleaned": 0} async with get_postgres_session() as pg_session: sync_service = POSSyncService(pg_session) cleaned_count = 0 for record_id in extra_in_postgres: try: if entity_type == "customers": await sync_service.sync_customer_delete(record_id) elif entity_type == "staff": await sync_service.sync_staff_delete(record_id) elif entity_type == "catalogue_services": await sync_service.sync_catalogue_service_delete(record_id) cleaned_count += 1 except Exception as e: logger.error(f"Error cleaning up {entity_type} record {record_id}: {e}") logger.info(f"Cleaned up {cleaned_count} orphaned {entity_type} records") return {"cleaned": cleaned_count} except Exception as e: logger.error(f"Error in cleanup: {e}") raise # CLI-style functions for easy testing if __name__ == "__main__": import sys async def main(): if len(sys.argv) < 2: print("Usage: python sync/utils.py [args...]") print("Commands:") print(" migrate_customers [merchant_id] [limit]") print(" migrate_staff [merchant_id] [limit]") print(" migrate_catalogue_services [merchant_id] [limit]") print(" verify_consistency [entity_type]") print(" cleanup_orphaned [entity_type]") return command = sys.argv[1] if command == "migrate_customers": merchant_id = sys.argv[2] if len(sys.argv) > 2 else None limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 result = await migrate_existing_customers(merchant_id, limit) print(f"Migration result: {result}") elif command == "migrate_staff": merchant_id = sys.argv[2] if len(sys.argv) > 2 else None limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 result = await migrate_existing_staff(merchant_id, limit) print(f"Migration result: {result}") elif command == "migrate_catalogue_services": merchant_id = sys.argv[2] if len(sys.argv) > 2 else None limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 result = await migrate_existing_catalogue_services(merchant_id, limit) print(f"Migration result: {result}") elif command == "verify_consistency": if len(sys.argv) < 3: print("Error: merchant_id required") return merchant_id = sys.argv[2] entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers" result = await verify_sync_consistency(merchant_id, entity_type) print(f"Consistency check result: {result}") elif command == "cleanup_orphaned": if len(sys.argv) < 3: print("Error: merchant_id required") return merchant_id = sys.argv[2] entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers" result = await cleanup_orphaned_records(merchant_id, entity_type) print(f"Cleanup result: {result}") else: print(f"Unknown command: {command}") asyncio.run(main())