Spaces:
Running
Running
File size: 14,987 Bytes
9fd3989 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 | """
Utility functions for POS sync operations.
"""
from typing import Dict, Any, List, Optional
import logging
from datetime import datetime
import asyncio
from app.nosql import get_mongo_db
from app.sql import get_postgres_session
from app.sync.sync_service import POSSyncService
logger = logging.getLogger(__name__)
async def migrate_existing_customers(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing customers from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter customers
limit: Maximum number of customers to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
customers_collection = mongo_db.pos_customers
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch customers from MongoDB
cursor = customers_collection.find(query).limit(limit)
customers = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for customer_doc in customers:
try:
# Convert to CustomerModel
from app.customers.models.model import CustomerModel
customer = CustomerModel(**customer_doc)
# Sync to PostgreSQL
await sync_service.sync_customer_create(customer)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} customers...")
except Exception as e:
logger.error(f"Error migrating customer {customer_doc.get('customer_id')}: {e}")
error_count += 1
logger.info(f"Customer migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in customer migration: {e}")
raise
async def migrate_existing_catalogue_services(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing catalogue services from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter services
limit: Maximum number of services to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
services_collection = mongo_db.pos_catalogue_services
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch services from MongoDB
cursor = services_collection.find(query).limit(limit)
services = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for service_doc in services:
try:
# Sync to PostgreSQL
await sync_service.sync_catalogue_service_create(service_doc)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} catalogue services...")
except Exception as e:
logger.error(f"Error migrating catalogue service {service_doc.get('_id')}: {e}")
error_count += 1
logger.info(f"Catalogue service migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in catalogue service migration: {e}")
raise
async def migrate_existing_staff(merchant_id: Optional[str] = None, limit: int = 1000):
"""
Migrate existing staff from MongoDB to PostgreSQL.
Args:
merchant_id: Optional merchant ID to filter staff
limit: Maximum number of staff to migrate
"""
try:
# Get MongoDB connection
mongo_db = await get_mongo_db()
staff_collection = mongo_db.pos_staff
# Build query
query = {}
if merchant_id:
query["merchant_id"] = merchant_id
# Get PostgreSQL session
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
# Fetch staff from MongoDB
cursor = staff_collection.find(query).limit(limit)
staff_list = await cursor.to_list(length=limit)
migrated_count = 0
error_count = 0
for staff_doc in staff_list:
try:
# Sync to PostgreSQL
await sync_service.sync_staff_create(staff_doc)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(f"Migrated {migrated_count} staff...")
except Exception as e:
logger.error(f"Error migrating staff {staff_doc.get('staff_id')}: {e}")
error_count += 1
logger.info(f"Staff migration completed: {migrated_count} migrated, {error_count} errors")
return {"migrated": migrated_count, "errors": error_count}
except Exception as e:
logger.error(f"Error in staff migration: {e}")
raise
async def verify_sync_consistency(merchant_id: str, entity_type: str = "customers"):
"""
Verify consistency between MongoDB and PostgreSQL data.
Args:
merchant_id: Merchant ID to check
entity_type: Type of entity to check ("customers", "staff", or "catalogue_services")
"""
try:
# Get connections
mongo_db = await get_mongo_db()
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
if entity_type == "customers":
# Get MongoDB customers
mongo_customers = await mongo_db.pos_customers.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {c["customer_id"] for c in mongo_customers}
# Get PostgreSQL customers
pg_customers = await sync_service.customer_sync.list_customers_by_merchant(merchant_id, limit=10000)
pg_ids = {c.customer_id for c in pg_customers}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for customers in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
elif entity_type == "staff":
# Get MongoDB staff
mongo_staff = await mongo_db.pos_staff.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {s["staff_id"] for s in mongo_staff}
# Get PostgreSQL staff
pg_staff = await sync_service.staff_sync.list_staff_by_merchant(merchant_id, limit=10000)
pg_ids = {s.staff_id for s in pg_staff}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for staff in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
elif entity_type == "catalogue_services":
# Get MongoDB catalogue services
mongo_services = await mongo_db.pos_catalogue_services.find({"merchant_id": merchant_id}).to_list(length=None)
mongo_ids = {s["_id"] for s in mongo_services}
# Get PostgreSQL catalogue services
pg_services = await sync_service.catalogue_service_sync.list_catalogue_services_by_merchant(merchant_id, limit=10000)
pg_ids = {s.service_id for s in pg_services}
# Compare
missing_in_pg = mongo_ids - pg_ids
extra_in_pg = pg_ids - mongo_ids
logger.info(f"Consistency check for catalogue services in merchant {merchant_id}:")
logger.info(f"MongoDB count: {len(mongo_ids)}")
logger.info(f"PostgreSQL count: {len(pg_ids)}")
logger.info(f"Missing in PostgreSQL: {len(missing_in_pg)}")
logger.info(f"Extra in PostgreSQL: {len(extra_in_pg)}")
return {
"mongo_count": len(mongo_ids),
"postgres_count": len(pg_ids),
"missing_in_postgres": list(missing_in_pg),
"extra_in_postgres": list(extra_in_pg)
}
except Exception as e:
logger.error(f"Error in consistency check: {e}")
raise
async def cleanup_orphaned_records(merchant_id: str, entity_type: str = "customers"):
"""
Clean up orphaned records in PostgreSQL that don't exist in MongoDB.
Args:
merchant_id: Merchant ID to clean up
entity_type: Type of entity to clean up ("customers", "staff", or "catalogue_services")
"""
try:
consistency_result = await verify_sync_consistency(merchant_id, entity_type)
extra_in_postgres = consistency_result.get("extra_in_postgres", [])
if not extra_in_postgres:
logger.info(f"No orphaned {entity_type} records found for merchant {merchant_id}")
return {"cleaned": 0}
async with get_postgres_session() as pg_session:
sync_service = POSSyncService(pg_session)
cleaned_count = 0
for record_id in extra_in_postgres:
try:
if entity_type == "customers":
await sync_service.sync_customer_delete(record_id)
elif entity_type == "staff":
await sync_service.sync_staff_delete(record_id)
elif entity_type == "catalogue_services":
await sync_service.sync_catalogue_service_delete(record_id)
cleaned_count += 1
except Exception as e:
logger.error(f"Error cleaning up {entity_type} record {record_id}: {e}")
logger.info(f"Cleaned up {cleaned_count} orphaned {entity_type} records")
return {"cleaned": cleaned_count}
except Exception as e:
logger.error(f"Error in cleanup: {e}")
raise
# CLI-style functions for easy testing
if __name__ == "__main__":
import sys
async def main():
if len(sys.argv) < 2:
print("Usage: python sync/utils.py <command> [args...]")
print("Commands:")
print(" migrate_customers [merchant_id] [limit]")
print(" migrate_staff [merchant_id] [limit]")
print(" migrate_catalogue_services [merchant_id] [limit]")
print(" verify_consistency <merchant_id> [entity_type]")
print(" cleanup_orphaned <merchant_id> [entity_type]")
return
command = sys.argv[1]
if command == "migrate_customers":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_customers(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "migrate_staff":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_staff(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "migrate_catalogue_services":
merchant_id = sys.argv[2] if len(sys.argv) > 2 else None
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 1000
result = await migrate_existing_catalogue_services(merchant_id, limit)
print(f"Migration result: {result}")
elif command == "verify_consistency":
if len(sys.argv) < 3:
print("Error: merchant_id required")
return
merchant_id = sys.argv[2]
entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers"
result = await verify_sync_consistency(merchant_id, entity_type)
print(f"Consistency check result: {result}")
elif command == "cleanup_orphaned":
if len(sys.argv) < 3:
print("Error: merchant_id required")
return
merchant_id = sys.argv[2]
entity_type = sys.argv[3] if len(sys.argv) > 3 else "customers"
result = await cleanup_orphaned_records(merchant_id, entity_type)
print(f"Cleanup result: {result}")
else:
print(f"Unknown command: {command}")
asyncio.run(main()) |