| | """ |
| | AnalyticsWorker v5.0: TCP Redis Pub/Sub + SRE Observability |
| | |
| | This is the initiator of all processes - treated as a critical path system. |
| | Changes: |
| | - Added real-time pub/sub events for every operation |
| | - SRE metrics emission for monitoring |
| | - Circuit breaker integration |
| | - Zero changes to core KPI calculation logic |
| | """ |
| |
|
| |
|
| | import asyncio |
| | import json |
| | import os |
| | import time |
| | from asyncio import Lock |
| | from datetime import datetime, timedelta |
| | from typing import Dict, Any, Optional, List |
| |
|
| | import pandas as pd |
| | import logging |
| |
|
| | from app.core.event_hub import event_hub |
| | from app.db import get_conn |
| | from app.schemas.org_schema import OrgSchema |
| | from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics |
| | from app.engine.kpi_calculators.registry import get_kpi_calculator_async |
| | from app.service.embedding_service import EmbeddingService |
| | from app.core.sre_logging import emit_worker_log |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s | %(levelname)s | [%(name)s] [%(funcName)s] %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | _WORKER_LOCKS: Dict[str, Lock] = {} |
| |
|
| |
|
| | class AnalyticsWorker: |
| | """ |
| | π§ +π Core engine with SRE observability |
| | - Zero changes to logic, only instrumentation added |
| | """ |
| | |
| | def __init__(self, org_id: str, source_id: str, hours_window: int = 24): |
| | self.org_id = org_id |
| | self.source_id = source_id |
| | self.hours_window = hours_window |
| | |
| | |
| | |
| | self.txn_embedder = EmbeddingService() |
| | self.vector_service = VectorService(org_id) |
| | |
| | self.computed_at: Optional[datetime] = None |
| | self._entity_type: Optional[str] = None |
| | |
| | |
| | self.lock_key = f"worker:lock:{org_id}:{source_id}" |
| | self.processed_key = f"worker:processed:{org_id}:{source_id}" |
| | self._process_lock = _WORKER_LOCKS.setdefault(self.lock_key, Lock()) |
| | |
| | |
| | self.vector_service.add_metrics_callback(self._export_to_prometheus) |
| | |
| | |
| | self._publish_worker_event( |
| | event_type="worker.initialized", |
| | data={ |
| | "org_id": org_id, |
| | "source_id": source_id, |
| | "hours_window": hours_window |
| | } |
| | ) |
| | |
| | |
| | |
| | def _on_vector_metrics(self, metrics: VectorMetrics): |
| | """Handle metrics from VectorService""" |
| | |
| | if metrics.cost_usd > 0.01: |
| | logger.warning( |
| | f"[SRE_ALERT] High vector cost: ${metrics.cost_usd:.4f} " |
| | f"for {metrics.vector_count} vectors" |
| | ) |
| | |
| | |
| | if metrics.duration_ms > 5000: |
| | logger.warning( |
| | f"[SRE_ALERT] Slow vector operation: {metrics.operation} " |
| | f"took {metrics.duration_ms:.2f}ms" |
| | ) |
| | |
| | logger.debug(f"[SRE_METRICS] {metrics}") |
| | |
| | def _publish_worker_event(self, event_type: str, data: Dict[str, Any]): |
| | """Publish worker lifecycle events via Redis pub/sub""" |
| | try: |
| | channel = f"worker:events:{self.org_id}:{self.source_id}" |
| | payload = { |
| | "type": event_type, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "data": data |
| | } |
| | |
| | |
| | asyncio.create_task( |
| | asyncio.to_thread( |
| | event_hub.publish, |
| | channel, |
| | json.dumps(payload) |
| | ) |
| | ) |
| | except Exception as e: |
| | logger.error(f"[EVENT] Failed to publish {event_type}: {e}") |
| | def _export_to_prometheus(self, metrics: VectorMetrics): |
| | """Push metrics to Prometheus pushgateway (free tier)""" |
| | try: |
| | from prometheus_client import Gauge, Counter, Histogram |
| | |
| | |
| | vector_duration = Histogram( |
| | 'vector_operation_duration_seconds', |
| | 'Time spent on vector operations', |
| | ['operation', 'org_id'] |
| | ) |
| | |
| | vector_cost = Counter( |
| | 'vector_operation_cost_usd_total', |
| | 'Total cost of vector operations', |
| | ['operation', 'org_id', 'redis_type'] |
| | ) |
| | |
| | |
| | vector_duration.labels( |
| | operation=metrics.operation, |
| | org_id=metrics.org_id |
| | ).observe(metrics.duration_ms / 1000) |
| | |
| | vector_cost.labels( |
| | operation=metrics.operation, |
| | org_id=metrics.org_id, |
| | redis_type="tcp" if metrics.pipeline_used else "upstash" |
| | ).inc(metrics.cost_usd) |
| | |
| | except Exception as e: |
| | logger.error(f"[PROMETHEUS] Failed to export: {e}") |
| | |
| | |
| | async def run(self) -> Dict[str, Any]: |
| | """ |
| | π― THE ENGINE - Core logic preserved, SRE instrumentation added |
| | """ |
| | start_time = time.time() |
| | worker_id = f"{self.org_id}/{self.source_id}" |
| | |
| | |
| | self._publish_worker_event("worker.run.started", {"worker_id": worker_id}) |
| | |
| | try: |
| | |
| | if await self._is_already_processed(): |
| | logger.warning(f"[WORKER] Already processed {worker_id}") |
| | return {"status": "skipped", "reason": "already_processed"} |
| | |
| | |
| | if not await self._acquire_lock(): |
| | return {"status": "skipped", "reason": "lock_failed"} |
| | |
| | emit_worker_log("info", f"π STARTING {worker_id}", worker_id=worker_id) |
| | |
| | |
| | await self._load_entity_from_redis() |
| | |
| | |
| | df = await self._load_dataframe() |
| | if df.empty: |
| | await self._publish_status("error", "No data") |
| | return {"status": "error", "reason": "no_data"} |
| | |
| | logger.info(f"[WORKER] π Loaded {len(df)} rows Γ {len(df.columns)} cols") |
| | |
| | |
| | mapping = await self._discover_schema(df) |
| | if not mapping: |
| | await self._publish_status("error", "Schema discovery failed") |
| | return {"status": "error", "reason": "no_schema"} |
| | |
| | logger.info(f"[WORKER] π Mapping: {list(mapping.items())[:5]}...") |
| | |
| | |
| | df = self._alias_columns(df, mapping) |
| | |
| | |
| | embed_task = asyncio.create_task( |
| | self._embed_transactions(df.head(1000)), |
| | name=f"embed-{self.org_id}-{self.source_id}" |
| | ) |
| | |
| | |
| | industry = await self._get_industry() |
| | calculator = await get_kpi_calculator_async( |
| | industry=industry, |
| | org_id=self.org_id, |
| | df=df, |
| | source_id=self.source_id, |
| | entity_type=self._entity_type |
| | ) |
| | |
| | |
| | results = await calculator.compute_all() |
| | |
| | |
| | await self._publish(results) |
| | |
| | |
| | await self._cache_results(results) |
| | |
| | |
| | await self._mark_processed() |
| | |
| | |
| | try: |
| | await asyncio.wait_for(embed_task, timeout=30) |
| | logger.info("[WORKER] β
Embeddings completed") |
| | except asyncio.TimeoutError: |
| | logger.warning("[WORKER] β οΈ Embedding timeout, but KPIs published") |
| | |
| | duration = time.time() - start_time |
| | logger.info(f"[WORKER] π― COMPLETE: {worker_id} in {duration:.2f}s") |
| | |
| | |
| | self._publish_worker_event( |
| | "worker.run.completed", |
| | { |
| | "worker_id": worker_id, |
| | "duration_sec": round(duration, 2), |
| | "rows_processed": len(df), |
| | "entity_type": self._entity_type |
| | } |
| | ) |
| | |
| | return results |
| | |
| | except Exception as e: |
| | emit_worker_log("error", f"β CRITICAL: {e}", error=str(e)) |
| | await self._publish_status("error", str(e)) |
| | |
| | |
| | self._publish_worker_event( |
| | "worker.run.failed", |
| | { |
| | "worker_id": worker_id, |
| | "error": str(e), |
| | "traceback": logging.traceback.format_exc() |
| | } |
| | ) |
| | |
| | return {"status": "error", "reason": str(e)} |
| | |
| | finally: |
| | await self._release_lock() |
| | self._publish_worker_event("worker.run.finished", {"worker_id": worker_id}) |
| | |
| | |
| | |
| | async def _is_already_processed(self) -> bool: |
| | try: |
| | |
| | result = await asyncio.to_thread(event_hub.redis.exists, self.processed_key) |
| | exists = bool(result) if result is not None else False |
| | |
| | if exists: |
| | logger.info(f"[IDEMPOTENCY] β
Found processed key: {self.processed_key}") |
| | |
| | return exists |
| | except Exception as e: |
| | logger.error(f"[IDEMPOTENCY] β Error: {e}") |
| | |
| | return False |
| | |
| | async def _acquire_lock(self) -> bool: |
| | """Acquire distributed lock (TCP Redis + Upstash compatible)""" |
| | try: |
| | |
| | lock_acquired = await asyncio.to_thread( |
| | event_hub.redis.set, |
| | self.lock_key, |
| | "1", |
| | nx=True, |
| | px=300000 |
| | ) |
| | |
| | if not lock_acquired: |
| | logger.warning(f"[LOCK] β Already locked: {self.lock_key}") |
| | return False |
| | |
| | |
| | acquired = await asyncio.wait_for(self._process_lock.acquire(), timeout=1.0) |
| | if not acquired: |
| | |
| | await asyncio.to_thread(event_hub.redis.delete, self.lock_key) |
| | return False |
| | |
| | logger.info(f"[LOCK] β
Acquired: {self.lock_key}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"[LOCK] β Error: {e}") |
| | return False |
| | |
| | async def _release_lock(self): |
| | try: |
| | if self._process_lock.locked(): |
| | self._process_lock.release() |
| | |
| | await asyncio.to_thread(event_hub.redis.delete, self.lock_key) |
| | logger.info(f"[LOCK] π Released: {self.lock_key}") |
| | except Exception as e: |
| | logger.error(f"[LOCK] β Error releasing: {e}") |
| | |
| | async def _mark_processed(self): |
| | try: |
| | |
| | await asyncio.to_thread( |
| | event_hub.redis.setex, |
| | self.processed_key, |
| | 300, |
| | "1" |
| | ) |
| | logger.info(f"[IDEMPOTENCY] β
Marked processed: {self.processed_key}") |
| | except Exception as e: |
| | logger.error(f"[IDEMPOTENCY] β Error: {e}") |
| | |
| | async def _load_entity_from_redis(self) -> dict: |
| | """Load entity info from Redis (TCP/Upstash compatible)""" |
| | try: |
| | entity_key = f"entity:{self.org_id}:{self.source_id}" |
| | data = await asyncio.to_thread(event_hub.get_key, entity_key) |
| | |
| | if not data: |
| | raise ValueError(f"Entity key not found: {entity_key}") |
| | |
| | entity_info = json.loads(data) |
| | self._entity_type = entity_info["entity_type"] |
| | |
| | |
| | industry_key = f"industry:{self.org_id}:{self.source_id}" |
| | industry_data = await asyncio.to_thread(event_hub.get_key, industry_key) |
| | |
| | if industry_data: |
| | self._industry_info = json.loads(industry_data) |
| | logger.info(f"[ENTITY] β
Loaded: {self._entity_type}, industry={self._industry_info.get('industry')}") |
| | else: |
| | logger.warning(f"[ENTITY] β οΈ Industry not found for {self.org_id}:{self.source_id}") |
| | |
| | return entity_info |
| | |
| | except Exception as e: |
| | logger.error(f"[ENTITY] β Failed: {e}") |
| | raise |
| | |
| | async def _load_dataframe(self) -> pd.DataFrame: |
| | """Load data asynchronously (entity_type must be set)""" |
| | if not getattr(self, '_entity_type', None): |
| | raise ValueError("entity_type must be loaded from Redis first") |
| | |
| | return await asyncio.to_thread(self._sync_load_dataframe, self._entity_type) |
| | |
| | def _sync_load_dataframe(self, entity_type: str) -> pd.DataFrame: |
| | """Synchronous data loader (runs in thread pool)""" |
| | try: |
| | conn = get_conn(self.org_id) |
| | table_name = f"main.{entity_type}_canonical" |
| | |
| | |
| | table_exists = conn.execute( |
| | "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' AND table_name = ?", |
| | [entity_type + "_canonical"] |
| | ).fetchone()[0] > 0 |
| | |
| | if not table_exists: |
| | logger.error(f"[LOAD] Table {table_name} does not exist") |
| | return pd.DataFrame() |
| | |
| | |
| | cutoff = datetime.now() - timedelta(hours=self.hours_window) |
| | df = conn.execute( |
| | f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 10000", |
| | [cutoff] |
| | ).df() |
| | |
| | if not df.empty: |
| | logger.info(f"[LOAD] π Loaded {len(df)} rows Γ {len(df.columns)} cols (filtered)") |
| | return df |
| | |
| | |
| | logger.warning(f"[LOAD] No data in {self.hours_window}h window, returning recent rows") |
| | df = conn.execute(f"SELECT * FROM {table_name} ORDER BY timestamp DESC LIMIT 1000").df() |
| | |
| | return df |
| | |
| | except Exception as e: |
| | logger.error(f"[LOAD] β Fatal: {e}", exc_info=True) |
| | return pd.DataFrame() |
| | |
| | async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]: |
| | """Schema discovery (non-blocking)""" |
| | try: |
| | cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache" |
| | |
| | |
| | cached = await asyncio.to_thread(event_hub.get_key, cache_key) |
| | if cached: |
| | logger.info("[SCHEMA] β
Cache hit") |
| | return json.loads(cached) |
| | |
| | logger.info("[SCHEMA] π§ Cache miss, discovering...") |
| | |
| | def sync_discover(): |
| | schema = OrgSchema(self.org_id, self._entity_type) |
| | return schema.get_mapping() |
| | |
| | mapping = await asyncio.to_thread(sync_discover) |
| | |
| | if mapping: |
| | |
| | await asyncio.to_thread( |
| | event_hub.setex, |
| | cache_key, |
| | 86400, |
| | json.dumps(mapping) |
| | ) |
| | |
| | return mapping or {} |
| | |
| | except Exception as e: |
| | logger.error(f"[SCHEMA] β Error: {e}", exc_info=True) |
| | |
| | return {col: col for col in df.columns} |
| | |
| | def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame: |
| | """Rename columns""" |
| | try: |
| | rename_map = { |
| | actual: semantic |
| | for semantic, actual in mapping.items() |
| | if actual in df.columns |
| | } |
| | |
| | if rename_map: |
| | logger.info(f"[ALIAS] π Renaming {len(rename_map)} columns") |
| | return df.rename(columns=rename_map) |
| | |
| | return df |
| | |
| | except Exception as e: |
| | logger.error(f"[ALIAS] β Error: {e}") |
| | return df |
| | |
| | async def _get_industry(self) -> str: |
| | """Get industry from Redis""" |
| | try: |
| | industry_key = f"industry:{self.org_id}:{self.source_id}" |
| | data = await asyncio.to_thread(event_hub.get_key, industry_key) |
| | |
| | if data: |
| | industry_info = json.loads(data) |
| | industry = industry_info.get("industry", "general") |
| | logger.info(f"[INDUSTRY] β
Loaded: {industry}") |
| | return industry |
| | |
| | logger.warning(f"[INDUSTRY] β οΈ Not found, using 'general'") |
| | return "general" |
| | |
| | except Exception as e: |
| | logger.error(f"[INDUSTRY] β Error: {e}") |
| | return "general" |
| | |
| | async def _embed_transactions(self, df: pd.DataFrame) -> List[List[float]]: |
| | """Embed transactions (delegates to VectorService)""" |
| | try: |
| | if df.empty: |
| | return [] |
| | |
| | texts, metadata = [], [] |
| | for idx, row in df.iterrows(): |
| | parts = [] |
| | if 'total' in row and pd.notna(row['total']): |
| | parts.append(f"sale:{row['total']}") |
| | if 'timestamp' in row: |
| | parts.append(f"at:{row['timestamp']}") |
| | if 'category' in row: |
| | parts.append(f"cat:{row['category']}") |
| | if 'product_id' in row: |
| | parts.append(f"sku:{row['product_id']}") |
| | |
| | if parts: |
| | texts.append(" ".join(parts)) |
| | metadata.append({ |
| | "org_id": self.org_id, |
| | "source_id": self.source_id, |
| | "idx": int(idx), |
| | "timestamp": row.get('timestamp', '').isoformat() if pd.notna(row.get('timestamp')) else None, |
| | }) |
| | |
| | if not texts: |
| | return [] |
| | |
| | logger.info(f"[EMBED] Generating {len(texts)} embeddings...") |
| | |
| | |
| | namespace = f"{self._entity_type}:{self.org_id}" |
| | await self.vector_service.upsert_embeddings( |
| | embeddings=await self.vector_service.embed_batch(texts), |
| | metadata=metadata, |
| | namespace=namespace |
| | ) |
| | |
| | logger.info(f"[EMBED] β
Stored {len(texts)} vectors") |
| | return [] |
| | |
| | except Exception as e: |
| | logger.error(f"[EMBED] β Critical: {e}", exc_info=True) |
| | return [] |
| | |
| | async def _publish(self, results: Dict[str, Any]): |
| | """Publish results with SRE metrics""" |
| | publish_start = time.time() |
| | |
| | try: |
| | ts = datetime.now().isoformat() |
| | |
| | |
| | pipe = event_hub.redis.pipeline() |
| | |
| | |
| | kpi_data = { |
| | "data": results, |
| | "rows": results.get("metadata", {}).get("rows_analyzed", 0), |
| | "timestamp": ts |
| | } |
| | |
| | pipe.setex( |
| | f"kpi_cache:{self.org_id}:{self.source_id}", |
| | 300, |
| | json.dumps(kpi_data) |
| | ) |
| | |
| | |
| | for alert in results.get("predictive", {}).get("alerts", []): |
| | pipe.lpush( |
| | f"insights:{self.org_id}:{self.source_id}", |
| | json.dumps(alert) |
| | ) |
| | pipe.expire(f"insights:{self.org_id}:{self.source_id}", 300) |
| | |
| | |
| | await asyncio.to_thread(pipe.execute) |
| | |
| | duration_ms = (time.time() - publish_start) * 1000 |
| | logger.info(f"[PUBLISH] π€ Published in {duration_ms:.2f}ms") |
| | |
| | |
| | self._publish_worker_event( |
| | "worker.publish.completed", |
| | { |
| | "rows": kpi_data["rows"], |
| | "insights": len(results.get("predictive", {}).get("alerts", [])), |
| | "latency_ms": round(duration_ms, 2) |
| | } |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"[PUBLISH] β Error: {e}", exc_info=True) |
| | |
| | async def _cache_results(self, results: Dict[str, Any]): |
| | """Cache results""" |
| | try: |
| | cache_key = f"kpi_cache:{self.org_id}:{self.source_id}" |
| | await asyncio.to_thread( |
| | event_hub.setex, |
| | cache_key, |
| | 300, |
| | json.dumps(results) |
| | ) |
| | logger.debug("[CACHE] β
Results cached") |
| | except Exception as e: |
| | logger.warning(f"[CACHE] β οΈ Failed: {e}") |
| | |
| | async def _publish_status(self, status: str, message: str = ""): |
| | """Publish worker status via pub/sub""" |
| | try: |
| | status_data = { |
| | "status": status, |
| | "message": message, |
| | "timestamp": datetime.now().isoformat(), |
| | "worker_id": f"{self.org_id}:{self.source_id}" |
| | } |
| | |
| | channel = f"worker:status:{self.org_id}:{self.source_id}" |
| | await asyncio.to_thread( |
| | event_hub.publish, |
| | channel, |
| | json.dumps(status_data) |
| | ) |
| | |
| | logger.info(f"[STATUS] π’ {status}: {message}") |
| | except Exception as e: |
| | logger.error(f"[STATUS] β Failed: {e}") |
| |
|
| |
|
| | |
| |
|
| | class WorkerManager: |
| | """ |
| | ποΈ Manages worker lifecycle with SRE observability |
| | """ |
| | |
| | def __init__(self): |
| | self.active_workers: Dict[str, asyncio.Task] = {} |
| | self._shutdown = False |
| | self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0")) |
| | self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0")) |
| | self.consecutive_empty = 0 |
| | |
| | |
| | self._metrics = { |
| | "triggers_processed": 0, |
| | "workers_spawned": 0, |
| | "workers_failed": 0, |
| | "total_latency_ms": 0 |
| | } |
| | |
| | async def start_listener(self): |
| | """π§ Main listener loop with SRE logging""" |
| | logger.info( |
| | f"π§ Worker Manager Started | " |
| | f"active_interval={self.active_interval}s | " |
| | f"idle_interval={self.idle_interval}s" |
| | ) |
| | |
| | while not self._shutdown: |
| | try: |
| | messages = await self._fetch_pending_triggers() |
| | |
| | if messages: |
| | self.consecutive_empty = 0 |
| | await self._process_batch(messages) |
| | interval = self.active_interval |
| | else: |
| | self.consecutive_empty += 1 |
| | interval = self._get_backoff_interval() |
| | |
| | if self.consecutive_empty == 5: |
| | logger.info(f"[MANAGER] π Idle mode (poll: {interval}s)") |
| | |
| | await asyncio.sleep(interval) |
| | |
| | except asyncio.CancelledError: |
| | logger.info("[MANAGER] π Cancelled") |
| | break |
| | except Exception as e: |
| | logger.error(f"[MANAGER] β Error: {e}", exc_info=True) |
| | await asyncio.sleep(5) |
| | |
| | async def _fetch_pending_triggers(self) -> List[tuple]: |
| | """Fetch triggers with SRE timing""" |
| | start = time.time() |
| | |
| | try: |
| | result = event_hub.redis.xrevrange( |
| | "stream:analytics_triggers", |
| | count=10 |
| | ) |
| | |
| | messages = [] |
| | if isinstance(result, dict): |
| | messages = list(result.items()) if result else [] |
| | elif isinstance(result, list): |
| | messages = result |
| | |
| | |
| | if messages: |
| | logger.info(f"[MANAGER] π₯ Fetched {len(messages)} triggers in {(time.time()-start)*1000:.2f}ms") |
| | |
| | return messages |
| | |
| | except Exception as e: |
| | logger.error(f"[MANAGER] β Fetch failed: {e}") |
| | return [] |
| | |
| | async def _process_batch(self, messages: List[tuple]): |
| | """Process triggers with SRE tracking""" |
| | logger.info(f"[MANAGER] Processing {len(messages)} triggers") |
| | |
| | for msg_id, msg_data in messages: |
| | try: |
| | payload = json.loads(msg_data.get("message", "{}")) |
| | await self._handle_trigger(payload) |
| | |
| | |
| | await asyncio.to_thread(event_hub.redis.xdel, "stream:analytics_triggers", msg_id) |
| | |
| | self._metrics["triggers_processed"] += 1 |
| | |
| | except Exception as e: |
| | logger.error(f"[MANAGER] β Process error: {e}", exc_info=True) |
| | self._metrics["workers_failed"] += 1 |
| | |
| | async def _handle_trigger(self, data: dict): |
| | """Handle trigger with deduplication""" |
| | org_id = data.get("org_id") |
| | source_id = data.get("source_id") |
| | |
| | if not org_id or not source_id: |
| | logger.warning(f"[MANAGER] β οΈ Invalid payload: {data}") |
| | return |
| | |
| | worker_id = f"{org_id}:{source_id}" |
| | |
| | |
| | if worker_id in self.active_workers and not self.active_workers[worker_id].done(): |
| | logger.debug(f"[MANAGER] βοΈ Already running: {worker_id}") |
| | return |
| | |
| | |
| | task = asyncio.create_task( |
| | self._run_worker(worker_id, org_id, source_id), |
| | name=f"worker-{worker_id}" |
| | ) |
| | self.active_workers[worker_id] = task |
| | self._metrics["workers_spawned"] += 1 |
| | |
| | logger.info(f"[MANAGER] π Spawned: {worker_id}") |
| | |
| | async def _run_worker(self, worker_id: str, org_id: str, source_id: str): |
| | """Execute worker with SRE tracking""" |
| | start = time.time() |
| | |
| | try: |
| | worker = AnalyticsWorker(org_id, source_id) |
| | results = await worker.run() |
| | |
| | duration_ms = (time.time() - start) * 1000 |
| | self._metrics["total_latency_ms"] += duration_ms |
| | |
| | logger.info(f"[MANAGER] β
Complete: {worker_id} in {duration_ms:.2f}ms") |
| | |
| | |
| | channel = f"manager:events:{org_id}" |
| | await asyncio.to_thread( |
| | event_hub.publish, |
| | channel, |
| | json.dumps({ |
| | "type": "worker.completed", |
| | "worker_id": worker_id, |
| | "duration_ms": round(duration_ms, 2), |
| | "status": "success" |
| | }) |
| | ) |
| | |
| | except Exception as e: |
| | self._metrics["workers_failed"] += 1 |
| | |
| | logger.error(f"[MANAGER] β Failed: {worker_id} - {e}", exc_info=True) |
| | |
| | |
| | channel = f"manager:events:{org_id}" |
| | await asyncio.to_thread( |
| | event_hub.publish, |
| | channel, |
| | json.dumps({ |
| | "type": "worker.failed", |
| | "worker_id": worker_id, |
| | "error": str(e) |
| | }) |
| | ) |
| | |
| | finally: |
| | self.active_workers.pop(worker_id, None) |
| | |
| | def _get_backoff_interval(self) -> float: |
| | """Adaptive backoff with SRE logic""" |
| | if self.consecutive_empty < 5: |
| | return self.active_interval |
| | |
| | interval = min( |
| | self.idle_interval, |
| | self.active_interval * (2 ** min(self.consecutive_empty - 5, 5)) |
| | ) |
| | |
| | |
| | if interval > self.idle_interval * 0.9: |
| | logger.debug(f"[MANAGER] π Deep sleep: {interval}s") |
| | |
| | return interval |
| | |
| | def get_metrics(self) -> Dict[str, Any]: |
| | """SRE: Get current metrics snapshot""" |
| | return { |
| | **self._metrics, |
| | "active_workers": len(self.active_workers), |
| | "consecutive_empty": self.consecutive_empty, |
| | "backoff_interval": self._get_backoff_interval() |
| | } |
| | |
| | def shutdown(self): |
| | """Graceful shutdown with SRE logging""" |
| | self._shutdown = True |
| | logger.info(f"[MANAGER] π Shutdown: {len(self.active_workers)} workers active") |
| | |
| | |
| | logger.info(f"[MANAGER] π Final metrics: {self.get_metrics()}") |
| |
|
| |
|
| | |
| |
|
| | _worker_manager: Optional[WorkerManager] = None |
| |
|
| |
|
| | async def get_worker_manager() -> WorkerManager: |
| | """Singleton manager with SRE init logging""" |
| | global _worker_manager |
| | if _worker_manager is None: |
| | _worker_manager = WorkerManager() |
| | logger.info("[SRE] WorkerManager initialized with SRE observability") |
| | return _worker_manager |
| |
|
| |
|
| | async def trigger_kpi_computation(org_id: str, source_id: str) -> Dict[str, Any]: |
| | """Trigger KPI computation with SRE tracking""" |
| | try: |
| | start = time.time() |
| | |
| | event_hub.redis.xadd( |
| | "stream:analytics_triggers", |
| | { |
| | "message": json.dumps({ |
| | "org_id": org_id, |
| | "source_id": source_id, |
| | "type": "kpi_compute", |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | } |
| | ) |
| | |
| | duration_ms = (time.time() - start) * 1000 |
| | |
| | logger.info( |
| | f"π― Triggered KPI: {org_id}/{source_id} " |
| | f"(latency: {duration_ms:.2f}ms)" |
| | ) |
| | |
| | return { |
| | "status": "triggered", |
| | "org_id": org_id, |
| | "source_id": source_id, |
| | "trigger_latency_ms": round(duration_ms, 2) |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Trigger failed: {e}", exc_info=True) |
| | |
| | |
| | await asyncio.to_thread( |
| | event_hub.publish, |
| | f"trigger:events:{org_id}", |
| | json.dumps({ |
| | "type": "trigger.failed", |
| | "error": str(e), |
| | "source_id": source_id |
| | }) |
| | ) |
| | |
| | return {"status": "error", "message": str(e)} |
| |
|
| |
|
| | |
| |
|
| | """ |
| | # Add to app/main.py: |
| | |
| | from app.tasks.analytics_worker import get_worker_manager, continuous_kpi_refresh |
| | import asyncio |
| | |
| | @app.on_event("startup") |
| | async def start_workers(): |
| | manager = await get_worker_manager() |
| | |
| | # Start worker manager listener |
| | asyncio.create_task( |
| | manager.start_listener(), |
| | name="worker-manager-listener" |
| | ) |
| | |
| | # Optional: Start background refresh |
| | if os.getenv("ENABLE_AUTO_REFRESH", "0") == "1": |
| | asyncio.create_task( |
| | continuous_kpi_refresh(manager), |
| | name="background-refresh" |
| | ) |
| | |
| | logger.info("β
SRE-observable worker system started") |
| | |
| | @app.on_event("shutdown") |
| | async def stop_workers(): |
| | manager = await get_worker_manager() |
| | manager.shutdown() |
| | |
| | # Wait for active workers to complete |
| | tasks = [t for t in manager.active_workers.values()] |
| | if tasks: |
| | await asyncio.gather(*tasks, return_exceptions=True) |
| | |
| | logger.info("π Workers gracefully shut down") |
| | |
| | # Health check endpoint for SRE monitoring |
| | @app.get("/health/workers") |
| | async def health_check(): |
| | manager = await get_worker_manager() |
| | metrics = manager.get_metrics() |
| | |
| | # Alert if too many failures |
| | if metrics["workers_failed"] > 10: |
| | return JSONResponse( |
| | status_code=503, |
| | content={"status": "unhealthy", "metrics": metrics} |
| | ) |
| | |
| | return { |
| | "status": "healthy", |
| | "active_workers": metrics["active_workers"], |
| | "triggers_processed": metrics["triggers_processed"], |
| | "avg_latency_ms": ( |
| | metrics["total_latency_ms"] / metrics["triggers_processed"] |
| | if metrics["triggers_processed"] > 0 else 0 |
| | ) |
| | } |
| | """ |