Spaces:
Runtime error
Runtime error
| """ | |
| Worker Manager - Coordinates all sync workers. | |
| """ | |
| import asyncio | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from sqlalchemy.ext.asyncio import create_async_engine | |
| from sqlalchemy import text | |
| import redis.asyncio as redis | |
| from app.core.config import settings | |
| from app.core.logging import get_logger, log_service_lifecycle | |
| logger = get_logger(__name__) | |
| class WorkerManager: | |
| """Manages all sync workers.""" | |
| def __init__(self): | |
| self.mongo_client = None | |
| self.mongo_db = None | |
| self.pg_engine = None | |
| self.redis_client = None | |
| self.sync_services = {} | |
| async def initialize(self): | |
| """Initialize database connections.""" | |
| log_service_lifecycle(logger, "connections_init") | |
| # MongoDB | |
| log_service_lifecycle(logger, "mongodb_connecting", host=settings.MONGODB_URI.split("@")[-1]) | |
| self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME] | |
| await self.mongo_client.admin.command('ping') | |
| log_service_lifecycle(logger, "mongodb_connected", db=settings.MONGODB_DB_NAME) | |
| # PostgreSQL | |
| pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}" | |
| log_service_lifecycle(logger, "postgres_connecting", host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT) | |
| self.pg_engine = create_async_engine( | |
| pg_url, echo=False, pool_pre_ping=True, | |
| pool_size=3, max_overflow=2, pool_timeout=30 | |
| ) | |
| async with self.pg_engine.begin() as conn: | |
| await conn.execute(text("SELECT 1")) | |
| log_service_lifecycle(logger, "postgres_connected", db=settings.POSTGRES_DB) | |
| # Redis | |
| log_service_lifecycle(logger, "redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT) | |
| pool = redis.ConnectionPool( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| password=settings.REDIS_PASSWORD, | |
| db=settings.REDIS_DB, | |
| decode_responses=True, | |
| max_connections=50, # 2 pods × ~18 workers + headroom for stats/enqueue calls | |
| socket_keepalive=True, | |
| socket_connect_timeout=5, | |
| retry_on_timeout=True, | |
| health_check_interval=30, | |
| ) | |
| self.redis_client = redis.Redis(connection_pool=pool) | |
| await self.redis_client.ping() | |
| log_service_lifecycle(logger, "redis_connected", max_connections=15) | |
| await self._initialize_sync_services() | |
| async def _initialize_sync_services(self): | |
| """Initialize all sync services.""" | |
| from app.sync.catalogues.service import CatalogueSyncService | |
| from app.sync.employees.service import EmployeeSyncService | |
| from app.sync.merchants.service import MerchantSyncService | |
| from app.sync.warehouses.service import WarehouseSyncService | |
| from app.sync.uom.service import UOMSyncService | |
| log_service_lifecycle(logger, "sync_services_init") | |
| self.sync_services = { | |
| "catalogues": CatalogueSyncService( | |
| mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, | |
| max_queue_size=10000, worker_count=settings.CATALOGUE_WORKERS, max_retries=3, | |
| ), | |
| "employees": EmployeeSyncService( | |
| mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, | |
| max_queue_size=10000, worker_count=settings.EMPLOYEE_WORKERS, max_retries=3, | |
| ), | |
| "merchants": MerchantSyncService( | |
| mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, | |
| max_queue_size=10000, worker_count=settings.MERCHANT_WORKERS, max_retries=3, | |
| ), | |
| "warehouses": WarehouseSyncService( | |
| mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, | |
| max_queue_size=10000, worker_count=settings.WAREHOUSE_WORKERS, max_retries=3, | |
| ), | |
| "uom": UOMSyncService( | |
| mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, | |
| max_queue_size=5000, worker_count=settings.UOM_WORKERS, max_retries=3, | |
| ), | |
| } | |
| log_service_lifecycle(logger, "sync_services_ready", count=len(self.sync_services), | |
| services=list(self.sync_services.keys())) | |
| async def start_all_workers(self): | |
| """Start all sync workers.""" | |
| log_service_lifecycle(logger, "workers_starting", count=len(self.sync_services)) | |
| for entity_type, service in self.sync_services.items(): | |
| try: | |
| await service.start_workers() | |
| log_service_lifecycle(logger, "workers_started", entity_type=entity_type, | |
| worker_count=service.worker_count) | |
| except Exception as e: | |
| logger.error("Failed to start workers", exc_info=True, | |
| extra={"event": "workers_start_error", "entity_type": entity_type, "error": str(e)}) | |
| total_workers = sum(s.worker_count for s in self.sync_services.values()) | |
| log_service_lifecycle(logger, "all_workers_started", total_workers=total_workers, | |
| service_count=len(self.sync_services)) | |
| async def stop_all_workers(self): | |
| """Stop all sync workers.""" | |
| log_service_lifecycle(logger, "workers_stopping") | |
| for entity_type, service in self.sync_services.items(): | |
| try: | |
| await service.stop_workers() | |
| log_service_lifecycle(logger, "workers_stopped", entity_type=entity_type) | |
| except Exception as e: | |
| logger.error("Failed to stop workers", exc_info=True, | |
| extra={"event": "workers_stop_error", "entity_type": entity_type, "error": str(e)}) | |
| if self.redis_client: | |
| await self.redis_client.aclose() | |
| log_service_lifecycle(logger, "redis_disconnected") | |
| if self.pg_engine: | |
| await self.pg_engine.dispose() | |
| log_service_lifecycle(logger, "postgres_disconnected") | |
| if self.mongo_client: | |
| self.mongo_client.close() | |
| log_service_lifecycle(logger, "mongodb_disconnected") | |