Spaces:
Running
Running
| """ | |
| Utility functions for POS sync operations. | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| import logging | |
| from datetime import datetime | |
| import asyncio | |
| from app.nosql import get_mongo_db | |
| from app.sql import get_postgres_session | |
| from app.sync.sync_service import POSSyncService | |
| logger = logging.getLogger(__name__) | |
| async def migrate_existing_customers(merchant_id: Optional[str] = None, limit: int = 1000): | |
| """ | |
| Migrate existing customers from MongoDB to PostgreSQL. | |
| Args: | |
| merchant_id: Optional merchant ID to filter customers | |
| limit: Maximum number of customers to migrate | |
| """ | |
| try: | |
| # Get MongoDB connection | |
| mongo_db = await get_mongo_db() | |
| customers_collection = mongo_db.pos_customers | |
| # Build query | |
| query = {} | |
| if merchant_id: | |
| query["merchant_id"] = merchant_id | |
| # Get PostgreSQL session | |
| async with get_postgres_session() as pg_session: | |
| sync_service = POSSyncService(pg_session) | |
| # Fetch customers from MongoDB | |
| cursor = customers_collection.find(query).limit(limit) | |
| customers = await cursor.to_list(length=limit) | |
| migrated_count = 0 | |
| error_count = 0 | |
| for customer_doc in customers: | |
| try: | |
| # Convert to CustomerModel | |
| from app.customers.models.model import CustomerModel | |
| customer = CustomerModel(**customer_doc) | |
| # Sync to PostgreSQL | |
| await sync_service.sync_customer_create(customer) | |
| migrated_count += 1 | |
| if migrated_count % 100 == 0: | |
| logger.info(f"Migrated {migrated_count} customers...") | |
| except Exception as e: | |
| logger.error(f"Error migrating customer {customer_doc.get('customer_id')}: {e}") | |
| error_count += 1 | |
| logger.info(f"Customer migration completed: {migrated_count} migrated, {error_count} errors") | |
| return {"migrated": migrated_count, "errors": error_count} | |
| except Exception as e: | |
| logger.error(f"Error in customer migration: {e}") | |
| raise | |
| async def migrate_existing_catalogue_services(merchant_id: Optional[str] = None, limit: int = 1000): | |
| """ | |
| Migrate existing catalogue services from MongoDB to PostgreSQL. | |
| Args: | |
| merchant_id: Optional merchant ID to filter services | |
| limit: Maximum number of services to migrate | |
| """ | |
| try: | |
| # Get MongoDB connection | |
| mongo_db = await get_mongo_db() | |
| services_collection = mongo_db.pos_catalogue_services | |
| # Build query | |
| query = {} | |
| if merchant_id: | |
| query["merchant_id"] = merchant_id | |
| # Get PostgreSQL session | |
| async with get_postgres_session() as pg_session: | |
| sync_service = POSSyncService(pg_session) | |
| # Fetch services from MongoDB | |
| cursor = services_collection.find(query).limit(limit) | |
| services = await cursor.to_list(length=limit) | |
| migrated_count = 0 | |
| error_count = 0 | |
| for service_doc in services: | |
| try: | |
| # Sync to PostgreSQL | |
| await sync_service.sync_catalogue_service_create(service_doc) | |
| migrated_count += 1 | |
| if migrated_count % 100 == 0: | |
| logger.info(f"Migrated {migrated_count} catalogue services...") | |
| except Exception as e: | |
| logger.error(f"Error migrating catalogue service {service_doc.get('_id')}: {e}") | |
| error_count += 1 | |
| logger.info(f"Catalogue service migration completed: {migrated_count} migrated, {error_count} errors") | |
| return {"migrated": migrated_count, "errors": error_count} | |
| except Exception as e: | |
| logger.error(f"Error in catalogue service migration: {e}") | |
| raise | |
| async def migrate_existing_staff(merchant_id: Optional[str] = None, limit: int = 1000): | |
| """ | |
| Migrate existing staff from MongoDB to PostgreSQL. | |
| Args: | |
| merchant_id: Optional merchant ID to filter staff | |
| limit: Maximum number of staff to migrate | |
| """ | |
| try: | |
| # Get MongoDB connection | |
| mongo_db = await get_mongo_db() | |
| staff_collection = mongo_db.pos_staff | |
| # Build query | |
| query = {} | |
| if merchant_id: | |
| query["merchant_id"] = merchant_id | |
| # Get PostgreSQL session | |
| async with get_postgres_session() as pg_session: | |
| sync_service = POSSyncService(pg_session) | |
| # Fetch staff from MongoDB | |
| cursor = staff_collection.find(query).limit(limit) | |
| staff_list = await cursor.to_list(length=limit) | |
| migrated_count = 0 | |
| error_count = 0 | |
| for staff_doc in staff_list: | |
| try: | |
| # Sync to PostgreSQL | |
| await sync_service.sync_staff_create(staff_doc) | |
| migrated_count += 1 | |
| if migrated_count % 100 == 0: | |
| logger.info(f"Migrated {migrated_count} staff...") | |
| except Exception as e: | |
| logger.error(f"Error migrating staff {staff_doc.get('staff_id')}: {e}") | |
| error_count += 1 | |
| logger.info(f"Staff migration completed: {migrated_count} migrated, {error_count} errors") | |
| return {"migrated": migrated_count, "errors": error_count} | |
| except Exception as e: | |
| logger.error(f"Error in staff migration: {e}") | |
| raise | |
| async def verify_sync_consistency(merchant_id: str, entity_type: str = "customers"): | |
| """ | |
| Verify consistency between MongoDB and PostgreSQL data. | |
| Args: | |
| merchant_id: Merchant ID to check | |
| entity_type: Type of entity to check ("customers", "staff", or "catalogue_services") | |
| """ | |
| try: | |
| # Get connections | |
| mongo_db = await get_mongo_db() | |
| async with get_postgres_session() as pg_session: | |
| sync_service = POSSyncService(pg_session) | |
| if entity_type == "customers": | |
| # Get MongoDB customers | |
| mongo_customers = await mongo_db.pos_customers.find({"merchant_id": merchant_id}).to_list(length=None) | |
| mongo_ids = {c["customer_id"] for c in mongo_customers} | |
| # Get PostgreSQL customers | |
| pg_customers = await sync_service.customer_sync.list_customers_by_merchant(merchant_id, limit=10000) | |
| pg_ids = {c.customer_id for c in pg_customers} | |
| # Compare | |
| missing_in_pg = mongo_ids - pg_ids | |
| extra_in_pg = pg_ids - mongo_ids | |
| logger.info(f"Consistency check for customers in merchant {merchant_id}:") | |
| logger.info(f"MongoDB count: {len(mongo_ids)}") | |
| logger.info(f"PostgreSQL count: {len(pg_ids)}") | |
| logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") | |
| logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") | |
| return { | |
| "mongo_count": len(mongo_ids), | |
| "postgres_count": len(pg_ids), | |
| "missing_in_postgres": list(missing_in_pg), | |
| "extra_in_postgres": list(extra_in_pg) | |
| } | |
| elif entity_type == "staff": | |
| # Get MongoDB staff | |
| mongo_staff = await mongo_db.pos_staff.find({"merchant_id": merchant_id}).to_list(length=None) | |
| mongo_ids = {s["staff_id"] for s in mongo_staff} | |
| # Get PostgreSQL staff | |
| pg_staff = await sync_service.staff_sync.list_staff_by_merchant(merchant_id, limit=10000) | |
| pg_ids = {s.staff_id for s in pg_staff} | |
| # Compare | |
| missing_in_pg = mongo_ids - pg_ids | |
| extra_in_pg = pg_ids - mongo_ids | |
| logger.info(f"Consistency check for staff in merchant {merchant_id}:") | |
| logger.info(f"MongoDB count: {len(mongo_ids)}") | |
| logger.info(f"PostgreSQL count: {len(pg_ids)}") | |
| logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") | |
| logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") | |
| return { | |
| "mongo_count": len(mongo_ids), | |
| "postgres_count": len(pg_ids), | |
| "missing_in_postgres": list(missing_in_pg), | |
| "extra_in_postgres": list(extra_in_pg) | |
| } | |
| elif entity_type == "catalogue_services": | |
| # Get MongoDB catalogue services | |
| mongo_services = await mongo_db.pos_catalogue_services.find({"merchant_id": merchant_id}).to_list(length=None) | |
| mongo_ids = {s["_id"] for s in mongo_services} | |
| # Get PostgreSQL catalogue services | |
| pg_services = await sync_service.catalogue_service_sync.list_catalogue_services_by_merchant(merchant_id, limit=10000) | |
| pg_ids = {s.service_id for s in pg_services} | |
| # Compare | |
| missing_in_pg = mongo_ids - pg_ids | |
| extra_in_pg = pg_ids - mongo_ids | |
| logger.info(f"Consistency check for catalogue services in merchant {merchant_id}:") | |
| logger.info(f"MongoDB count: {len(mongo_ids)}") | |
| logger.info(f"PostgreSQL count: {len(pg_ids)}") | |
| logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}") | |
| logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}") | |
| return { | |
| "mongo_count": len(mongo_ids), | |
| "postgres_count": len(pg_ids), | |
| "missing_in_postgres": list(missing_in_pg), | |
| "extra_in_postgres": list(extra_in_pg) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in consistency check: {e}") | |
| raise | |
| async def cleanup_orphaned_records(merchant_id: str, entity_type: str = "customers"): | |
| """ | |
| Clean up orphaned records in PostgreSQL that don't exist in MongoDB. | |
| Args: | |
| merchant_id: Merchant ID to clean up | |
| entity_type: Type of entity to clean up ("customers", "staff", or "catalogue_services") | |
| """ | |
| try: | |
| consistency_result = await verify_sync_consistency(merchant_id, entity_type) | |
| extra_in_postgres = consistency_result.get("extra_in_postgres", []) | |
| if not extra_in_postgres: | |
| logger.info(f"No orphaned {entity_type} records found for merchant {merchant_id}") | |
| return {"cleaned": 0} | |
| async with get_postgres_session() as pg_session: | |
| sync_service = POSSyncService(pg_session) | |
| cleaned_count = 0 | |
| for record_id in extra_in_postgres: | |
| try: | |
| if entity_type == "customers": | |
| await sync_service.sync_customer_delete(record_id) | |
| elif entity_type == "staff": | |
| await sync_service.sync_staff_delete(record_id) | |
| elif entity_type == "catalogue_services": | |
| await sync_service.sync_catalogue_service_delete(record_id) | |
| cleaned_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error cleaning up {entity_type} record {record_id}: {e}") | |
| logger.info(f"Cleaned up {cleaned_count} orphaned {entity_type} records") | |
| return {"cleaned": cleaned_count} | |
| except Exception as e: | |
| logger.error(f"Error in cleanup: {e}") | |
| raise | |
| # CLI-style functions for easy testing | |
| if __name__ == "__main__": | |
| import sys | |
| async def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python sync/utils.py <command> [args...]") | |
| print("Commands:") | |
| print(" migrate_customers [merchant_id] [limit]") | |
| print(" migrate_staff [merchant_id] [limit]") | |
| print(" migrate_catalogue_services [merchant_id] [limit]") | |
| print(" verify_consistency <merchant_id> [entity_type]") | |
| print(" cleanup_orphaned <merchant_id> [entity_type]") | |
| return | |
| command = sys.argv[1] | |
| if command == "migrate_customers": | |
| merchant_id = sys.argv[2] if len(sys.argv) > 2 else None | |
| limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 | |
| result = await migrate_existing_customers(merchant_id, limit) | |
| print(f"Migration result: {result}") | |
| elif command == "migrate_staff": | |
| merchant_id = sys.argv[2] if len(sys.argv) > 2 else None | |
| limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 | |
| result = await migrate_existing_staff(merchant_id, limit) | |
| print(f"Migration result: {result}") | |
| elif command == "migrate_catalogue_services": | |
| merchant_id = sys.argv[2] if len(sys.argv) > 2 else None | |
| limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000 | |
| result = await migrate_existing_catalogue_services(merchant_id, limit) | |
| print(f"Migration result: {result}") | |
| elif command == "verify_consistency": | |
| if len(sys.argv) < 3: | |
| print("Error: merchant_id required") | |
| return | |
| merchant_id = sys.argv[2] | |
| entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers" | |
| result = await verify_sync_consistency(merchant_id, entity_type) | |
| print(f"Consistency check result: {result}") | |
| elif command == "cleanup_orphaned": | |
| if len(sys.argv) < 3: | |
| print("Error: merchant_id required") | |
| return | |
| merchant_id = sys.argv[2] | |
| entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers" | |
| result = await cleanup_orphaned_records(merchant_id, entity_type) | |
| print(f"Cleanup result: {result}") | |
| else: | |
| print(f"Unknown command: {command}") | |
| asyncio.run(main()) |