File size: 6,487 Bytes
6391dfd
 
 
 
 
 
 
 
 
 
f2bfacd
6391dfd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2bfacd
 
6391dfd
f2bfacd
6391dfd
 
 
f2bfacd
 
6391dfd
 
f2bfacd
5db0a21
 
 
 
6391dfd
 
f2bfacd
 
 
 
e86dee7
6391dfd
 
 
 
e86dee7
0708ac4
e86dee7
 
 
f2bfacd
6391dfd
e86dee7
6391dfd
f2bfacd
 
6391dfd
 
 
 
 
 
 
 
 
f2bfacd
 
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
 
 
6391dfd
f2bfacd
 
 
6391dfd
 
 
f2bfacd
 
6391dfd
 
 
f2bfacd
 
6391dfd
f2bfacd
 
 
6391dfd
f2bfacd
 
 
6391dfd
 
f2bfacd
 
6391dfd
 
 
f2bfacd
6391dfd
f2bfacd
 
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
 
6391dfd
 
f2bfacd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Worker Manager - Coordinates all sync workers.
"""
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
import redis.asyncio as redis

from app.core.config import settings
from app.core.logging import get_logger, log_service_lifecycle

logger = get_logger(__name__)


class WorkerManager:
    """Manages all sync workers."""
    
    def __init__(self):
        self.mongo_client = None
        self.mongo_db = None
        self.pg_engine = None
        self.redis_client = None
        self.sync_services = {}
    
    async def initialize(self):
        """Initialize database connections."""
        log_service_lifecycle(logger, "connections_init")

        # MongoDB
        log_service_lifecycle(logger, "mongodb_connecting", host=settings.MONGODB_URI.split("@")[-1])
        self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
        self.mongo_db = self.mongo_client[settings.MONGODB_DB_NAME]
        await self.mongo_client.admin.command('ping')
        log_service_lifecycle(logger, "mongodb_connected", db=settings.MONGODB_DB_NAME)

        # PostgreSQL
        pg_url = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}"
        log_service_lifecycle(logger, "postgres_connecting", host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT)
        self.pg_engine = create_async_engine(
            pg_url, echo=False, pool_pre_ping=True,
            pool_size=3, max_overflow=2, pool_timeout=30
        )
        async with self.pg_engine.begin() as conn:
            await conn.execute(text("SELECT 1"))
        log_service_lifecycle(logger, "postgres_connected", db=settings.POSTGRES_DB)

        # Redis
        log_service_lifecycle(logger, "redis_connecting", host=settings.REDIS_HOST, port=settings.REDIS_PORT)
        pool = redis.ConnectionPool(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            password=settings.REDIS_PASSWORD,
            db=settings.REDIS_DB,
            decode_responses=True,
            max_connections=50,  # 2 pods × ~18 workers + headroom for stats/enqueue calls
            socket_keepalive=True,
            socket_connect_timeout=5,
            retry_on_timeout=True,
            health_check_interval=30,
        )
        self.redis_client = redis.Redis(connection_pool=pool)
        await self.redis_client.ping()
        log_service_lifecycle(logger, "redis_connected", max_connections=15)

        await self._initialize_sync_services()
    
    async def _initialize_sync_services(self):
        """Initialize all sync services."""
        from app.sync.catalogues.service import CatalogueSyncService
        from app.sync.employees.service import EmployeeSyncService
        from app.sync.merchants.service import MerchantSyncService
        from app.sync.warehouses.service import WarehouseSyncService
        from app.sync.uom.service import UOMSyncService

        log_service_lifecycle(logger, "sync_services_init")

        self.sync_services = {
            "catalogues": CatalogueSyncService(
                mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
                max_queue_size=10000, worker_count=settings.CATALOGUE_WORKERS, max_retries=3,
            ),
            "employees": EmployeeSyncService(
                mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
                max_queue_size=10000, worker_count=settings.EMPLOYEE_WORKERS, max_retries=3,
            ),
            "merchants": MerchantSyncService(
                mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
                max_queue_size=10000, worker_count=settings.MERCHANT_WORKERS, max_retries=3,
            ),
            "warehouses": WarehouseSyncService(
                mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
                max_queue_size=10000, worker_count=settings.WAREHOUSE_WORKERS, max_retries=3,
            ),
            "uom": UOMSyncService(
                mongo_db=self.mongo_db, pg_engine=self.pg_engine, redis_client=self.redis_client,
                max_queue_size=5000, worker_count=settings.UOM_WORKERS, max_retries=3,
            ),
        }

        log_service_lifecycle(logger, "sync_services_ready", count=len(self.sync_services),
                              services=list(self.sync_services.keys()))
    
    async def start_all_workers(self):
        """Start all sync workers."""
        log_service_lifecycle(logger, "workers_starting", count=len(self.sync_services))

        for entity_type, service in self.sync_services.items():
            try:
                await service.start_workers()
                log_service_lifecycle(logger, "workers_started", entity_type=entity_type,
                                      worker_count=service.worker_count)
            except Exception as e:
                logger.error("Failed to start workers", exc_info=True,
                             extra={"event": "workers_start_error", "entity_type": entity_type, "error": str(e)})

        total_workers = sum(s.worker_count for s in self.sync_services.values())
        log_service_lifecycle(logger, "all_workers_started", total_workers=total_workers,
                              service_count=len(self.sync_services))

    async def stop_all_workers(self):
        """Stop all sync workers."""
        log_service_lifecycle(logger, "workers_stopping")

        for entity_type, service in self.sync_services.items():
            try:
                await service.stop_workers()
                log_service_lifecycle(logger, "workers_stopped", entity_type=entity_type)
            except Exception as e:
                logger.error("Failed to stop workers", exc_info=True,
                             extra={"event": "workers_stop_error", "entity_type": entity_type, "error": str(e)})

        if self.redis_client:
            await self.redis_client.aclose()
            log_service_lifecycle(logger, "redis_disconnected")

        if self.pg_engine:
            await self.pg_engine.dispose()
            log_service_lifecycle(logger, "postgres_disconnected")

        if self.mongo_client:
            self.mongo_client.close()
            log_service_lifecycle(logger, "mongodb_disconnected")