MukeshKapoor25's picture
feat(sync): Add comprehensive POS data sync infrastructure and seed data management
9fd3989
"""
Utility functions for POS sync operations.
"""
from typing import Dict, Any, List, Optional
import logging
from datetime import datetime
import asyncio
from app.nosql import get_mongo_db
from app.sql import get_postgres_session
from app.sync.sync_service import POSSyncService
logger = logging.getLogger(__name__)
async def migrate_existing_customers(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing customers from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter customers
limit: Maximum number of customers to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
customers_collection = mongo_db.pos_customers
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch customers from MongoDB
cursor = customers_collection.find(query).limit(limit)
customers = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for customer_doc in customers:
try:
# Convert to CustomerModel
from app.customers.models.model import CustomerModel
customer = CustomerModel(**customer_doc)
# Sync to PostgreSQL
await sync_service.sync_customer_create(customer)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} customers...")
except Exception as e:
logger.error(f"Error migrating customer {customer_doc.get('customer_id')}: {e}")
error_count += 1
logger.info(f"Customer migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in customer migration: {e}")
raise
async def migrate_existing_catalogue_services(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing catalogue services from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter services
limit: Maximum number of services to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
services_collection = mongo_db.pos_catalogue_services
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch services from MongoDB
cursor = services_collection.find(query).limit(limit)
services = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for service_doc in services:
try:
# Sync to PostgreSQL
await sync_service.sync_catalogue_service_create(service_doc)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} catalogue services...")
except Exception as e:
logger.error(f"Error migrating catalogue service {service_doc.get('_id')}: {e}")
error_count += 1
logger.info(f"Catalogue service migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in catalogue service migration: {e}")
raise
async def migrate_existing_staff(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing staff from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter staff
limit: Maximum number of staff to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
staff_collection = mongo_db.pos_staff
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch staff from MongoDB
cursor = staff_collection.find(query).limit(limit)
staff_list = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for staff_doc in staff_list:
try:
# Sync to PostgreSQL
await sync_service.sync_staff_create(staff_doc)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} staff...")
except Exception as e:
logger.error(f"Error migrating staff {staff_doc.get('staff_id')}: {e}")
error_count += 1
logger.info(f"Staff migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in staff migration: {e}")
raise
async def verify_sync_consistency(merchant_id: str, entity_type: str = "customers"):
"""
Verify consistency between MongoDB and PostgreSQL data.
Args:
merchant_id: Merchant ID to check
entity_type: Type of entity to check ("customers", "staff", or "catalogue_services")
"""
try:
# Get connections
mongo_db = await get_mongo_db()
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
if entity_type == "customers":
# Get MongoDB customers
mongo_customers = await mongo_db.pos_customers.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {c["customer_id"] for c in mongo_customers}
# Get PostgreSQL customers
pg_customers = await sync_service.customer_sync.list_customers_by_merchant(merchant_id, limit=10000)
pg_ids = {c.customer_id for c in pg_customers}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for customers in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
elif entity_type == "staff":
# Get MongoDB staff
mongo_staff = await mongo_db.pos_staff.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {s["staff_id"] for s in mongo_staff}
# Get PostgreSQL staff
pg_staff = await sync_service.staff_sync.list_staff_by_merchant(merchant_id, limit=10000)
pg_ids = {s.staff_id for s in pg_staff}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for staff in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
elif entity_type == "catalogue_services":
# Get MongoDB catalogue services
mongo_services = await mongo_db.pos_catalogue_services.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {s["_id"] for s in mongo_services}
# Get PostgreSQL catalogue services
pg_services = await sync_service.catalogue_service_sync.list_catalogue_services_by_merchant(merchant_id, limit=10000)
pg_ids = {s.service_id for s in pg_services}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for catalogue services in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
except Exception as e:
logger.error(f"Error in consistency check: {e}")
raise
async def cleanup_orphaned_records(merchant_id: str, entity_type: str = "customers"):
"""
Clean up orphaned records in PostgreSQL that don't exist in MongoDB.
Args:
merchant_id: Merchant ID to clean up
entity_type: Type of entity to clean up ("customers", "staff", or "catalogue_services")
"""
try:
consistency_result = await verify_sync_consistency(merchant_id, entity_type)
extra_in_postgres = consistency_result.get("extra_in_postgres", [])
if not extra_in_postgres:
logger.info(f"No orphaned {entity_type} records found for merchant {merchant_id}")
return {"cleaned": 0}
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
cleaned_count = 0
for record_id in extra_in_postgres:
try:
if entity_type == "customers":
await sync_service.sync_customer_delete(record_id)
elif entity_type == "staff":
await sync_service.sync_staff_delete(record_id)
elif entity_type == "catalogue_services":
await sync_service.sync_catalogue_service_delete(record_id)
cleaned_count += 1
except Exception as e:
logger.error(f"Error cleaning up {entity_type} record {record_id}: {e}")
logger.info(f"Cleaned up {cleaned_count} orphaned {entity_type} records")
return {"cleaned": cleaned_count}
except Exception as e:
logger.error(f"Error in cleanup: {e}")
raise
# CLI-style functions for easy testing
if __name__ == "__main__":
import sys
async def main():
if len(sys.argv) < 2:
print("Usage: python sync/utils.py <command> [args...]")
print("Commands:")
print(" migrate_customers [merchant_id] [limit]")
print(" migrate_staff [merchant_id] [limit]")
print(" migrate_catalogue_services [merchant_id] [limit]")
print(" verify_consistency <merchant_id> [entity_type]")
print(" cleanup_orphaned <merchant_id> [entity_type]")
return
command = sys.argv[1]
if command == "migrate_customers":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_customers(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "migrate_staff":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_staff(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "migrate_catalogue_services":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_catalogue_services(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "verify_consistency":
if len(sys.argv) < 3:
print("Error: merchant_id required")
return
merchant_id = sys.argv[2]
entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers"
result = await verify_sync_consistency(merchant_id, entity_type)
print(f"Consistency check result: {result}")
elif command == "cleanup_orphaned":
if len(sys.argv) < 3:
print("Error: merchant_id required")
return
merchant_id = sys.argv[2]
entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers"
result = await cleanup_orphaned_records(merchant_id, entity_type)
print(f"Cleanup result: {result}")
else:
print(f"Unknown command: {command}")
asyncio.run(main())