""" AnalyticsWorker v5.0: TCP Redis Pub/Sub + SRE Observability This is the initiator of all processes - treated as a critical path system. Changes: - Added real-time pub/sub events for every operation - SRE metrics emission for monitoring - Circuit breaker integration - Zero changes to core KPI calculation logic """ import asyncio import json import os import time from asyncio import Lock from datetime import datetime, timedelta from typing import Dict, Any, Optional, List import pandas as pd import logging from app.core.event_hub import event_hub from app.db import get_conn from app.schemas.org_schema import OrgSchema from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics from app.engine.kpi_calculators.registry import get_kpi_calculator_async from app.service.embedding_service import EmbeddingService from app.core.sre_logging import emit_worker_log # Configure structured logging for SRE tools (Loki, etc.) logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)s | [%(name)s] [%(funcName)s] %(message)s' ) logger = logging.getLogger(__name__) # Global lock registry _WORKER_LOCKS: Dict[str, Lock] = {} class AnalyticsWorker: """ 🧠+🚀 Core engine with SRE observability - Zero changes to logic, only instrumentation added """ def __init__(self, org_id: str, source_id: str, hours_window: int = 24): self.org_id = org_id self.source_id = source_id self.hours_window = hours_window # Core engines (unchanged) self.txn_embedder = EmbeddingService() self.vector_service = VectorService(org_id) self.computed_at: Optional[datetime] = None self._entity_type: Optional[str] = None # Deduplication keys self.lock_key = f"worker:lock:{org_id}:{source_id}" self.processed_key = f"worker:processed:{org_id}:{source_id}" self._process_lock = _WORKER_LOCKS.setdefault(self.lock_key, Lock()) # 🎯 SRE: Register metrics callback self.vector_service.add_metrics_callback(self._export_to_prometheus) # 🎯 Publish worker lifecycle events self._publish_worker_event( event_type="worker.initialized", data={ "org_id": org_id, "source_id": source_id, "hours_window": hours_window } ) # ====== SRE: Metrics & Event Publishing (NEW) ====== def _on_vector_metrics(self, metrics: VectorMetrics): """Handle metrics from VectorService""" # Alert on high cost if metrics.cost_usd > 0.01: logger.warning( f"[SRE_ALERT] High vector cost: ${metrics.cost_usd:.4f} " f"for {metrics.vector_count} vectors" ) # Alert on slow operations if metrics.duration_ms > 5000: logger.warning( f"[SRE_ALERT] Slow vector operation: {metrics.operation} " f"took {metrics.duration_ms:.2f}ms" ) logger.debug(f"[SRE_METRICS] {metrics}") def _publish_worker_event(self, event_type: str, data: Dict[str, Any]): """Publish worker lifecycle events via Redis pub/sub""" try: channel = f"worker:events:{self.org_id}:{self.source_id}" payload = { "type": event_type, "timestamp": datetime.utcnow().isoformat(), "data": data } # Fire-and-forget to avoid blocking asyncio.create_task( asyncio.to_thread( event_hub.publish, channel, json.dumps(payload) ) ) except Exception as e: logger.error(f"[EVENT] Failed to publish {event_type}: {e}") def _export_to_prometheus(self, metrics: VectorMetrics): """Push metrics to Prometheus pushgateway (free tier)""" try: from prometheus_client import Gauge, Counter, Histogram # Define metrics once (globally) vector_duration = Histogram( 'vector_operation_duration_seconds', 'Time spent on vector operations', ['operation', 'org_id'] ) vector_cost = Counter( 'vector_operation_cost_usd_total', 'Total cost of vector operations', ['operation', 'org_id', 'redis_type'] ) # Record metrics vector_duration.labels( operation=metrics.operation, org_id=metrics.org_id ).observe(metrics.duration_ms / 1000) vector_cost.labels( operation=metrics.operation, org_id=metrics.org_id, redis_type="tcp" if metrics.pipeline_used else "upstash" ).inc(metrics.cost_usd) except Exception as e: logger.error(f"[PROMETHEUS] Failed to export: {e}") # ====== RUN Method (Core logic unchanged, instrumentation added) ====== async def run(self) -> Dict[str, Any]: """ 🎯 THE ENGINE - Core logic preserved, SRE instrumentation added """ start_time = time.time() worker_id = f"{self.org_id}/{self.source_id}" # Publish start event self._publish_worker_event("worker.run.started", {"worker_id": worker_id}) try: # STEP 0: Idempotency check if await self._is_already_processed(): logger.warning(f"[WORKER] Already processed {worker_id}") return {"status": "skipped", "reason": "already_processed"} # STEP 1: Lock acquisition if not await self._acquire_lock(): return {"status": "skipped", "reason": "lock_failed"} emit_worker_log("info", f"🚀 STARTING {worker_id}", worker_id=worker_id) # STEP 2: Load entity info from Redis await self._load_entity_from_redis() # STEP 3: Load data df = await self._load_dataframe() if df.empty: await self._publish_status("error", "No data") return {"status": "error", "reason": "no_data"} logger.info(f"[WORKER] 📊 Loaded {len(df)} rows × {len(df.columns)} cols") # STEP 4: Schema discovery mapping = await self._discover_schema(df) if not mapping: await self._publish_status("error", "Schema discovery failed") return {"status": "error", "reason": "no_schema"} logger.info(f"[WORKER] 🔀 Mapping: {list(mapping.items())[:5]}...") # STEP 5: Alias columns df = self._alias_columns(df, mapping) # STEP 6: Start embeddings (non-blocking) embed_task = asyncio.create_task( self._embed_transactions(df.head(1000)), name=f"embed-{self.org_id}-{self.source_id}" ) # STEP 7: Compute KPIs industry = await self._get_industry() calculator = await get_kpi_calculator_async( industry=industry, org_id=self.org_id, df=df, source_id=self.source_id, entity_type=self._entity_type ) # ✅ FIXED: Direct await (no asyncio.to_thread for async method) results = await calculator.compute_all() # STEP 8: Publish results await self._publish(results) # STEP 9: Cache results await self._cache_results(results) # STEP 10: Mark processed await self._mark_processed() # STEP 11: Wait for embeddings (timeout) try: await asyncio.wait_for(embed_task, timeout=30) logger.info("[WORKER] ✅ Embeddings completed") except asyncio.TimeoutError: logger.warning("[WORKER] ⚠️ Embedding timeout, but KPIs published") duration = time.time() - start_time logger.info(f"[WORKER] 🎯 COMPLETE: {worker_id} in {duration:.2f}s") # Publish completion event self._publish_worker_event( "worker.run.completed", { "worker_id": worker_id, "duration_sec": round(duration, 2), "rows_processed": len(df), "entity_type": self._entity_type } ) return results except Exception as e: emit_worker_log("error", f"❌ CRITICAL: {e}", error=str(e)) await self._publish_status("error", str(e)) # Publish error event self._publish_worker_event( "worker.run.failed", { "worker_id": worker_id, "error": str(e), "traceback": logging.traceback.format_exc() } ) return {"status": "error", "reason": str(e)} finally: await self._release_lock() self._publish_worker_event("worker.run.finished", {"worker_id": worker_id}) # ====== Existing methods (bug fixes + SRE logging) ====== async def _is_already_processed(self) -> bool: try: # Handle both TCP and Upstash Redis result = await asyncio.to_thread(event_hub.redis.exists, self.processed_key) exists = bool(result) if result is not None else False if exists: logger.info(f"[IDEMPOTENCY] ✅ Found processed key: {self.processed_key}") return exists except Exception as e: logger.error(f"[IDEMPOTENCY] ❌ Error: {e}") # Fail open: if we can't check, assume not processed return False async def _acquire_lock(self) -> bool: """Acquire distributed lock (TCP Redis + Upstash compatible)""" try: # Use SET NX PX for atomic lock (works in both TCP and Upstash) lock_acquired = await asyncio.to_thread( event_hub.redis.set, self.lock_key, "1", nx=True, # Only set if not exists px=300000 # 5 minute expiry (milliseconds) ) if not lock_acquired: logger.warning(f"[LOCK] ❌ Already locked: {self.lock_key}") return False # Also acquire in-process lock acquired = await asyncio.wait_for(self._process_lock.acquire(), timeout=1.0) if not acquired: # Clean up Redis lock await asyncio.to_thread(event_hub.redis.delete, self.lock_key) return False logger.info(f"[LOCK] ✅ Acquired: {self.lock_key}") return True except Exception as e: logger.error(f"[LOCK] ❌ Error: {e}") return False async def _release_lock(self): try: if self._process_lock.locked(): self._process_lock.release() await asyncio.to_thread(event_hub.redis.delete, self.lock_key) logger.info(f"[LOCK] 🔓 Released: {self.lock_key}") except Exception as e: logger.error(f"[LOCK] ❌ Error releasing: {e}") async def _mark_processed(self): try: # Mark with 5 minute TTL await asyncio.to_thread( event_hub.redis.setex, self.processed_key, 300, # 5 minutes "1" ) logger.info(f"[IDEMPOTENCY] ✅ Marked processed: {self.processed_key}") except Exception as e: logger.error(f"[IDEMPOTENCY] ❌ Error: {e}") async def _load_entity_from_redis(self) -> dict: """Load entity info from Redis (TCP/Upstash compatible)""" try: entity_key = f"entity:{self.org_id}:{self.source_id}" data = await asyncio.to_thread(event_hub.get_key, entity_key) if not data: raise ValueError(f"Entity key not found: {entity_key}") entity_info = json.loads(data) self._entity_type = entity_info["entity_type"] # Load industry industry_key = f"industry:{self.org_id}:{self.source_id}" industry_data = await asyncio.to_thread(event_hub.get_key, industry_key) if industry_data: self._industry_info = json.loads(industry_data) logger.info(f"[ENTITY] ✅ Loaded: {self._entity_type}, industry={self._industry_info.get('industry')}") else: logger.warning(f"[ENTITY] ⚠️ Industry not found for {self.org_id}:{self.source_id}") return entity_info except Exception as e: logger.error(f"[ENTITY] ❌ Failed: {e}") raise async def _load_dataframe(self) -> pd.DataFrame: """Load data asynchronously (entity_type must be set)""" if not getattr(self, '_entity_type', None): raise ValueError("entity_type must be loaded from Redis first") return await asyncio.to_thread(self._sync_load_dataframe, self._entity_type) def _sync_load_dataframe(self, entity_type: str) -> pd.DataFrame: """Synchronous data loader (runs in thread pool)""" try: conn = get_conn(self.org_id) table_name = f"main.{entity_type}_canonical" # Verify table exists table_exists = conn.execute( "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' AND table_name = ?", [entity_type + "_canonical"] ).fetchone()[0] > 0 if not table_exists: logger.error(f"[LOAD] Table {table_name} does not exist") return pd.DataFrame() # Load with time window cutoff = datetime.now() - timedelta(hours=self.hours_window) df = conn.execute( f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 10000", [cutoff] ).df() if not df.empty: logger.info(f"[LOAD] 📊 Loaded {len(df)} rows × {len(df.columns)} cols (filtered)") return df # Fallback logger.warning(f"[LOAD] No data in {self.hours_window}h window, returning recent rows") df = conn.execute(f"SELECT * FROM {table_name} ORDER BY timestamp DESC LIMIT 1000").df() return df except Exception as e: logger.error(f"[LOAD] ❌ Fatal: {e}", exc_info=True) return pd.DataFrame() async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]: """Schema discovery (non-blocking)""" try: cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache" # Try cache first cached = await asyncio.to_thread(event_hub.get_key, cache_key) if cached: logger.info("[SCHEMA] ✅ Cache hit") return json.loads(cached) logger.info("[SCHEMA] 🧠 Cache miss, discovering...") def sync_discover(): schema = OrgSchema(self.org_id, self._entity_type) return schema.get_mapping() mapping = await asyncio.to_thread(sync_discover) if mapping: # Cache for 24 hours await asyncio.to_thread( event_hub.setex, cache_key, 86400, json.dumps(mapping) ) return mapping or {} except Exception as e: logger.error(f"[SCHEMA] ❌ Error: {e}", exc_info=True) # Emergency fallback return {col: col for col in df.columns} def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame: """Rename columns""" try: rename_map = { actual: semantic for semantic, actual in mapping.items() if actual in df.columns } if rename_map: logger.info(f"[ALIAS] 🔀 Renaming {len(rename_map)} columns") return df.rename(columns=rename_map) return df except Exception as e: logger.error(f"[ALIAS] ❌ Error: {e}") return df async def _get_industry(self) -> str: """Get industry from Redis""" try: industry_key = f"industry:{self.org_id}:{self.source_id}" data = await asyncio.to_thread(event_hub.get_key, industry_key) if data: industry_info = json.loads(data) industry = industry_info.get("industry", "general") logger.info(f"[INDUSTRY] ✅ Loaded: {industry}") return industry logger.warning(f"[INDUSTRY] ⚠️ Not found, using 'general'") return "general" except Exception as e: logger.error(f"[INDUSTRY] ❌ Error: {e}") return "general" async def _embed_transactions(self, df: pd.DataFrame) -> List[List[float]]: """Embed transactions (delegates to VectorService)""" try: if df.empty: return [] texts, metadata = [], [] for idx, row in df.iterrows(): parts = [] if 'total' in row and pd.notna(row['total']): parts.append(f"sale:{row['total']}") if 'timestamp' in row: parts.append(f"at:{row['timestamp']}") if 'category' in row: parts.append(f"cat:{row['category']}") if 'product_id' in row: parts.append(f"sku:{row['product_id']}") if parts: texts.append(" ".join(parts)) metadata.append({ "org_id": self.org_id, "source_id": self.source_id, "idx": int(idx), "timestamp": row.get('timestamp', '').isoformat() if pd.notna(row.get('timestamp')) else None, }) if not texts: return [] logger.info(f"[EMBED] Generating {len(texts)} embeddings...") # Use VectorService (which now has SRE metrics built-in) namespace = f"{self._entity_type}:{self.org_id}" await self.vector_service.upsert_embeddings( embeddings=await self.vector_service.embed_batch(texts), metadata=metadata, namespace=namespace ) logger.info(f"[EMBED] ✅ Stored {len(texts)} vectors") return [] except Exception as e: logger.error(f"[EMBED] ❌ Critical: {e}", exc_info=True) return [] async def _publish(self, results: Dict[str, Any]): """Publish results with SRE metrics""" publish_start = time.time() try: ts = datetime.now().isoformat() # Use pipeline pipe = event_hub.redis.pipeline() # Publish KPI update kpi_data = { "data": results, "rows": results.get("metadata", {}).get("rows_analyzed", 0), "timestamp": ts } pipe.setex( f"kpi_cache:{self.org_id}:{self.source_id}", 300, json.dumps(kpi_data) ) # Publish insights for alert in results.get("predictive", {}).get("alerts", []): pipe.lpush( f"insights:{self.org_id}:{self.source_id}", json.dumps(alert) ) pipe.expire(f"insights:{self.org_id}:{self.source_id}", 300) # Execute pipeline await asyncio.to_thread(pipe.execute) duration_ms = (time.time() - publish_start) * 1000 logger.info(f"[PUBLISH] 📤 Published in {duration_ms:.2f}ms") # SRE event self._publish_worker_event( "worker.publish.completed", { "rows": kpi_data["rows"], "insights": len(results.get("predictive", {}).get("alerts", [])), "latency_ms": round(duration_ms, 2) } ) except Exception as e: logger.error(f"[PUBLISH] ❌ Error: {e}", exc_info=True) async def _cache_results(self, results: Dict[str, Any]): """Cache results""" try: cache_key = f"kpi_cache:{self.org_id}:{self.source_id}" await asyncio.to_thread( event_hub.setex, cache_key, 300, json.dumps(results) ) logger.debug("[CACHE] ✅ Results cached") except Exception as e: logger.warning(f"[CACHE] ⚠️ Failed: {e}") async def _publish_status(self, status: str, message: str = ""): """Publish worker status via pub/sub""" try: status_data = { "status": status, "message": message, "timestamp": datetime.now().isoformat(), "worker_id": f"{self.org_id}:{self.source_id}" } channel = f"worker:status:{self.org_id}:{self.source_id}" await asyncio.to_thread( event_hub.publish, channel, json.dumps(status_data) ) logger.info(f"[STATUS] 📢 {status}: {message}") except Exception as e: logger.error(f"[STATUS] ❌ Failed: {e}") # ==================== WorkerManager (SRE Instrumentation Added) ==================== class WorkerManager: """ 🎛️ Manages worker lifecycle with SRE observability """ def __init__(self): self.active_workers: Dict[str, asyncio.Task] = {} self._shutdown = False self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0")) self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0")) self.consecutive_empty = 0 # SRE: Track metrics self._metrics = { "triggers_processed": 0, "workers_spawned": 0, "workers_failed": 0, "total_latency_ms": 0 } async def start_listener(self): """🎧 Main listener loop with SRE logging""" logger.info( f"🎧 Worker Manager Started | " f"active_interval={self.active_interval}s | " f"idle_interval={self.idle_interval}s" ) while not self._shutdown: try: messages = await self._fetch_pending_triggers() if messages: self.consecutive_empty = 0 await self._process_batch(messages) interval = self.active_interval else: self.consecutive_empty += 1 interval = self._get_backoff_interval() if self.consecutive_empty == 5: logger.info(f"[MANAGER] 🛌 Idle mode (poll: {interval}s)") await asyncio.sleep(interval) except asyncio.CancelledError: logger.info("[MANAGER] 🛑 Cancelled") break except Exception as e: logger.error(f"[MANAGER] ❌ Error: {e}", exc_info=True) await asyncio.sleep(5) async def _fetch_pending_triggers(self) -> List[tuple]: """Fetch triggers with SRE timing""" start = time.time() try: result = event_hub.redis.xrevrange( "stream:analytics_triggers", count=10 ) messages = [] if isinstance(result, dict): messages = list(result.items()) if result else [] elif isinstance(result, list): messages = result # SRE metric if messages: logger.info(f"[MANAGER] 📥 Fetched {len(messages)} triggers in {(time.time()-start)*1000:.2f}ms") return messages except Exception as e: logger.error(f"[MANAGER] ❌ Fetch failed: {e}") return [] async def _process_batch(self, messages: List[tuple]): """Process triggers with SRE tracking""" logger.info(f"[MANAGER] Processing {len(messages)} triggers") for msg_id, msg_data in messages: try: payload = json.loads(msg_data.get("message", "{}")) await self._handle_trigger(payload) # Delete processed message await asyncio.to_thread(event_hub.redis.xdel, "stream:analytics_triggers", msg_id) self._metrics["triggers_processed"] += 1 except Exception as e: logger.error(f"[MANAGER] ❌ Process error: {e}", exc_info=True) self._metrics["workers_failed"] += 1 async def _handle_trigger(self, data: dict): """Handle trigger with deduplication""" org_id = data.get("org_id") source_id = data.get("source_id") if not org_id or not source_id: logger.warning(f"[MANAGER] ⚠️ Invalid payload: {data}") return worker_id = f"{org_id}:{source_id}" # Skip if running if worker_id in self.active_workers and not self.active_workers[worker_id].done(): logger.debug(f"[MANAGER] ⏭️ Already running: {worker_id}") return # Spawn worker task = asyncio.create_task( self._run_worker(worker_id, org_id, source_id), name=f"worker-{worker_id}" ) self.active_workers[worker_id] = task self._metrics["workers_spawned"] += 1 logger.info(f"[MANAGER] 🚀 Spawned: {worker_id}") async def _run_worker(self, worker_id: str, org_id: str, source_id: str): """Execute worker with SRE tracking""" start = time.time() try: worker = AnalyticsWorker(org_id, source_id) results = await worker.run() duration_ms = (time.time() - start) * 1000 self._metrics["total_latency_ms"] += duration_ms logger.info(f"[MANAGER] ✅ Complete: {worker_id} in {duration_ms:.2f}ms") # Publish completion event channel = f"manager:events:{org_id}" await asyncio.to_thread( event_hub.publish, channel, json.dumps({ "type": "worker.completed", "worker_id": worker_id, "duration_ms": round(duration_ms, 2), "status": "success" }) ) except Exception as e: self._metrics["workers_failed"] += 1 logger.error(f"[MANAGER] ❌ Failed: {worker_id} - {e}", exc_info=True) # Publish error event channel = f"manager:events:{org_id}" await asyncio.to_thread( event_hub.publish, channel, json.dumps({ "type": "worker.failed", "worker_id": worker_id, "error": str(e) }) ) finally: self.active_workers.pop(worker_id, None) def _get_backoff_interval(self) -> float: """Adaptive backoff with SRE logic""" if self.consecutive_empty < 5: return self.active_interval interval = min( self.idle_interval, self.active_interval * (2 ** min(self.consecutive_empty - 5, 5)) ) # Log significant backoff changes if interval > self.idle_interval * 0.9: logger.debug(f"[MANAGER] 📉 Deep sleep: {interval}s") return interval def get_metrics(self) -> Dict[str, Any]: """SRE: Get current metrics snapshot""" return { **self._metrics, "active_workers": len(self.active_workers), "consecutive_empty": self.consecutive_empty, "backoff_interval": self._get_backoff_interval() } def shutdown(self): """Graceful shutdown with SRE logging""" self._shutdown = True logger.info(f"[MANAGER] 🛑 Shutdown: {len(self.active_workers)} workers active") # Log final metrics logger.info(f"[MANAGER] 📊 Final metrics: {self.get_metrics()}") # ==================== FastAPI Integration ==================== _worker_manager: Optional[WorkerManager] = None async def get_worker_manager() -> WorkerManager: """Singleton manager with SRE init logging""" global _worker_manager if _worker_manager is None: _worker_manager = WorkerManager() logger.info("[SRE] WorkerManager initialized with SRE observability") return _worker_manager async def trigger_kpi_computation(org_id: str, source_id: str) -> Dict[str, Any]: """Trigger KPI computation with SRE tracking""" try: start = time.time() event_hub.redis.xadd( "stream:analytics_triggers", { "message": json.dumps({ "org_id": org_id, "source_id": source_id, "type": "kpi_compute", "timestamp": datetime.now().isoformat() }) } ) duration_ms = (time.time() - start) * 1000 logger.info( f"🎯 Triggered KPI: {org_id}/{source_id} " f"(latency: {duration_ms:.2f}ms)" ) return { "status": "triggered", "org_id": org_id, "source_id": source_id, "trigger_latency_ms": round(duration_ms, 2) } except Exception as e: logger.error(f"Trigger failed: {e}", exc_info=True) # SRE: Publish trigger failure event await asyncio.to_thread( event_hub.publish, f"trigger:events:{org_id}", json.dumps({ "type": "trigger.failed", "error": str(e), "source_id": source_id }) ) return {"status": "error", "message": str(e)} # ==================== MAIN.PY Integration ==================== """ # Add to app/main.py: from app.tasks.analytics_worker import get_worker_manager, continuous_kpi_refresh import asyncio @app.on_event("startup") async def start_workers(): manager = await get_worker_manager() # Start worker manager listener asyncio.create_task( manager.start_listener(), name="worker-manager-listener" ) # Optional: Start background refresh if os.getenv("ENABLE_AUTO_REFRESH", "0") == "1": asyncio.create_task( continuous_kpi_refresh(manager), name="background-refresh" ) logger.info("✅ SRE-observable worker system started") @app.on_event("shutdown") async def stop_workers(): manager = await get_worker_manager() manager.shutdown() # Wait for active workers to complete tasks = [t for t in manager.active_workers.values()] if tasks: await asyncio.gather(*tasks, return_exceptions=True) logger.info("🛑 Workers gracefully shut down") # Health check endpoint for SRE monitoring @app.get("/health/workers") async def health_check(): manager = await get_worker_manager() metrics = manager.get_metrics() # Alert if too many failures if metrics["workers_failed"] > 10: return JSONResponse( status_code=503, content={"status": "unhealthy", "metrics": metrics} ) return { "status": "healthy", "active_workers": metrics["active_workers"], "triggers_processed": metrics["triggers_processed"], "avg_latency_ms": ( metrics["total_latency_ms"] / metrics["triggers_processed"] if metrics["triggers_processed"] > 0 else 0 ) } """