""" Worker Manager - Coordinates all sync workers. """ import asyncio from motor.motor_asyncio import AsyncIOMotorClient from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text import redis.asyncio as redis from app.core.config import settings from app.core.logging import get_logger, log_service_lifecycle logger = get_logger(__name__) class WorkerManager: """Manages all sync workers.""" def __init__(self): self.mongo_client = None self.mongo_db = None self.pg_engine = None self.redis_client = None self.sync_services = {} async def initialize(self): """Initialize database connections.""" log_service_lifecycle(logger, "connections_init") # MongoDB log_service_lifecycle(logger, "mongodb_connecting", host=settings.MONGODB_URI.split("@")[-1]) self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI) self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME] await self.mongo_client.admin.command('ping') log_service_lifecycle(logger, "mongodb_connected", db=settings.MONGODB_DB_NAME) # PostgreSQL pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}" log_service_lifecycle(logger, "postgres_connecting", host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT) self.pg_engine = create_async_engine( pg_url, echo=False, pool_pre_ping=True, pool_size=3, max_overflow=2, pool_timeout=30 ) async with self.pg_engine.begin() as conn: await conn.execute(text("SELECT 1")) log_service_lifecycle(logger, "postgres_connected", db=settings.POSTGRES_DB) # Redis log_service_lifecycle(logger, "redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT) pool = redis.ConnectionPool( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, db=settings.REDIS_DB, decode_responses=True, max_connections=50, # 2 pods × ~18 workers + headroom for stats/enqueue calls socket_keepalive=True, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30, ) self.redis_client = redis.Redis(connection_pool=pool) await self.redis_client.ping() log_service_lifecycle(logger, "redis_connected", max_connections=15) await self._initialize_sync_services() async def _initialize_sync_services(self): """Initialize all sync services.""" from app.sync.catalogues.service import CatalogueSyncService from app.sync.employees.service import EmployeeSyncService from app.sync.merchants.service import MerchantSyncService from app.sync.warehouses.service import WarehouseSyncService from app.sync.uom.service import UOMSyncService log_service_lifecycle(logger, "sync_services_init") self.sync_services = { "catalogues": CatalogueSyncService( mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, max_queue_size=10000, worker_count=settings.CATALOGUE_WORKERS, max_retries=3, ), "employees": EmployeeSyncService( mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, max_queue_size=10000, worker_count=settings.EMPLOYEE_WORKERS, max_retries=3, ), "merchants": MerchantSyncService( mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, max_queue_size=10000, worker_count=settings.MERCHANT_WORKERS, max_retries=3, ), "warehouses": WarehouseSyncService( mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, max_queue_size=10000, worker_count=settings.WAREHOUSE_WORKERS, max_retries=3, ), "uom": UOMSyncService( mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client, max_queue_size=5000, worker_count=settings.UOM_WORKERS, max_retries=3, ), } log_service_lifecycle(logger, "sync_services_ready", count=len(self.sync_services), services=list(self.sync_services.keys())) async def start_all_workers(self): """Start all sync workers.""" log_service_lifecycle(logger, "workers_starting", count=len(self.sync_services)) for entity_type, service in self.sync_services.items(): try: await service.start_workers() log_service_lifecycle(logger, "workers_started", entity_type=entity_type, worker_count=service.worker_count) except Exception as e: logger.error("Failed to start workers", exc_info=True, extra={"event": "workers_start_error", "entity_type": entity_type, "error": str(e)}) total_workers = sum(s.worker_count for s in self.sync_services.values()) log_service_lifecycle(logger, "all_workers_started", total_workers=total_workers, service_count=len(self.sync_services)) async def stop_all_workers(self): """Stop all sync workers.""" log_service_lifecycle(logger, "workers_stopping") for entity_type, service in self.sync_services.items(): try: await service.stop_workers() log_service_lifecycle(logger, "workers_stopped", entity_type=entity_type) except Exception as e: logger.error("Failed to stop workers", exc_info=True, extra={"event": "workers_stop_error", "entity_type": entity_type, "error": str(e)}) if self.redis_client: await self.redis_client.aclose() log_service_lifecycle(logger, "redis_disconnected") if self.pg_engine: await self.pg_engine.dispose() log_service_lifecycle(logger, "postgres_disconnected") if self.mongo_client: self.mongo_client.close() log_service_lifecycle(logger, "mongodb_disconnected")