cuatrolabs-sync-worker-ms / app /sync /worker_manager.py
MukeshKapoor25's picture
fix: limit SQLAlchemy pool size to prevent connection exhaustion
5db0a21
"""
Worker Manager - Coordinates all sync workers.
"""
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
import redis.asyncio as redis
from app.core.config import settings
from app.core.logging import get_logger, log_service_lifecycle
logger = get_logger(__name__)
class WorkerManager:
"""Manages all sync workers."""
def __init__(self):
self.mongo_client = None
self.mongo_db = None
self.pg_engine = None
self.redis_client = None
self.sync_services = {}
async def initialize(self):
"""Initialize database connections."""
log_service_lifecycle(logger, "connections_init")
# MongoDB
log_service_lifecycle(logger, "mongodb_connecting", host=settings.MONGODB_URI.split("@")[-1])
self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME]
await self.mongo_client.admin.command('ping')
log_service_lifecycle(logger, "mongodb_connected", db=settings.MONGODB_DB_NAME)
# PostgreSQL
pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}"
log_service_lifecycle(logger, "postgres_connecting", host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT)
self.pg_engine = create_async_engine(
pg_url, echo=False, pool_pre_ping=True,
pool_size=3, max_overflow=2, pool_timeout=30
)
async with self.pg_engine.begin() as conn:
await conn.execute(text("SELECT 1"))
log_service_lifecycle(logger, "postgres_connected", db=settings.POSTGRES_DB)
# Redis
log_service_lifecycle(logger, "redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT)
pool = redis.ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_DB,
decode_responses=True,
max_connections=50, # 2 pods × ~18 workers + headroom for stats/enqueue calls
socket_keepalive=True,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30,
)
self.redis_client = redis.Redis(connection_pool=pool)
await self.redis_client.ping()
log_service_lifecycle(logger, "redis_connected", max_connections=15)
await self._initialize_sync_services()
async def _initialize_sync_services(self):
"""Initialize all sync services."""
from app.sync.catalogues.service import CatalogueSyncService
from app.sync.employees.service import EmployeeSyncService
from app.sync.merchants.service import MerchantSyncService
from app.sync.warehouses.service import WarehouseSyncService
from app.sync.uom.service import UOMSyncService
log_service_lifecycle(logger, "sync_services_init")
self.sync_services = {
"catalogues": CatalogueSyncService(
mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
max_queue_size=10000, worker_count=settings.CATALOGUE_WORKERS, max_retries=3,
),
"employees": EmployeeSyncService(
mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
max_queue_size=10000, worker_count=settings.EMPLOYEE_WORKERS, max_retries=3,
),
"merchants": MerchantSyncService(
mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
max_queue_size=10000, worker_count=settings.MERCHANT_WORKERS, max_retries=3,
),
"warehouses": WarehouseSyncService(
mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
max_queue_size=10000, worker_count=settings.WAREHOUSE_WORKERS, max_retries=3,
),
"uom": UOMSyncService(
mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
max_queue_size=5000, worker_count=settings.UOM_WORKERS, max_retries=3,
),
}
log_service_lifecycle(logger, "sync_services_ready", count=len(self.sync_services),
services=list(self.sync_services.keys()))
async def start_all_workers(self):
"""Start all sync workers."""
log_service_lifecycle(logger, "workers_starting", count=len(self.sync_services))
for entity_type, service in self.sync_services.items():
try:
await service.start_workers()
log_service_lifecycle(logger, "workers_started", entity_type=entity_type,
worker_count=service.worker_count)
except Exception as e:
logger.error("Failed to start workers", exc_info=True,
extra={"event": "workers_start_error", "entity_type": entity_type, "error": str(e)})
total_workers = sum(s.worker_count for s in self.sync_services.values())
log_service_lifecycle(logger, "all_workers_started", total_workers=total_workers,
service_count=len(self.sync_services))
async def stop_all_workers(self):
"""Stop all sync workers."""
log_service_lifecycle(logger, "workers_stopping")
for entity_type, service in self.sync_services.items():
try:
await service.stop_workers()
log_service_lifecycle(logger, "workers_stopped", entity_type=entity_type)
except Exception as e:
logger.error("Failed to stop workers", exc_info=True,
extra={"event": "workers_stop_error", "entity_type": entity_type, "error": str(e)})
if self.redis_client:
await self.redis_client.aclose()
log_service_lifecycle(logger, "redis_disconnected")
if self.pg_engine:
await self.pg_engine.dispose()
log_service_lifecycle(logger, "postgres_disconnected")
if self.mongo_client:
self.mongo_client.close()
log_service_lifecycle(logger, "mongodb_disconnected")