shaliz-kong commited on
Commit
b39a40c
Β·
1 Parent(s): 1848ca0

made severe changes

Browse files
app/core/detection_engine.py ADDED
@@ -0,0 +1,248 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ app/core/detection_engine.py – UNIVERSAL DETECTION ENGINE
3
+ =======================================================
4
+
5
+ Consolidated entity and industry detection with dual-mode (LLM + rule-based).
6
+
7
+ Functions:
8
+ - hybrid_detect_entity_type()
9
+ - hybrid_detect_industry_type()
10
+ - Redis caching helpers
11
+ - Prometheus metrics
12
+ - Zero circular dependencies
13
+ """
14
+
15
+ import json
16
+ import logging
17
+ import pandas as pd
18
+ from typing import Tuple, Optional, Dict, Any
19
+ from datetime import datetime
20
+ import time
21
+ from app.core.event_hub import event_hub
22
+ from app.service.llm_service import get_llm_service
23
+
24
+ # βœ… RULE-BASED IMPORTS (both in one place)
25
+ from app.entity_detector import detect_entity_type as rule_based_entity
26
+ from app.utils.detect_industry import detect_industry as rule_based_industry
27
+
28
+ from app.core.sre_logging import emit_mapper_log
29
+
30
+ # SRE: Prometheus metrics
31
+ try:
32
+ from prometheus_client import Counter, Histogram
33
+ detection_latency = Histogram(
34
+ 'detection_duration_seconds',
35
+ 'Time to detect entity/industry',
36
+ ['detection_type', 'org_id']
37
+ )
38
+ detection_errors = Counter(
39
+ 'detection_errors_total',
40
+ 'Total detection failures',
41
+ ['detection_type', 'org_id', 'error_type']
42
+ )
43
+ except ImportError:
44
+ detection_latency = None
45
+ detection_errors = None
46
+
47
+ logger = logging.getLogger(__name__)
48
+
49
+
50
+ # ====================================================================
51
+ # 🎯 ENTITY TYPE DETECTION
52
+ # ====================================================================
53
+
54
+ def hybrid_detect_entity_type(org_id: str, df: pd.DataFrame, source_id: str,
55
+ use_llm: bool = False) -> Tuple[str, float, bool]:
56
+ """
57
+ Detect entity_type (SALES, INVENTORY, CUSTOMER, PRODUCT, etc.)
58
+
59
+ Args:
60
+ org_id: Organization ID
61
+ df: DataFrame to analyze
62
+ source_id: Source identifier
63
+ use_llm: If True, use LLM fallback when confidence < 0.75
64
+
65
+ Returns:
66
+ (entity_type: str, confidence: float, is_confident: bool)
67
+ """
68
+ start_time = time.time()
69
+ emit_mapper_log("info", "Entity detection started",
70
+ org_id=org_id, source_id=source_id, use_llm=use_llm)
71
+
72
+ # 1. Rule-based detection (ALWAYS runs first – <10ms)
73
+ entity_type, confidence = rule_based_entity(df)
74
+ entity_type = entity_type.upper()
75
+
76
+ emit_mapper_log("info", "Rule-based entity completed",
77
+ org_id=org_id, source_id=source_id,
78
+ entity_type=entity_type, confidence=confidence)
79
+
80
+ # 2. If confident OR LLM disabled, return immediately
81
+ if confidence > 0.75 or not use_llm:
82
+ return entity_type, confidence, True
83
+
84
+ # 3. LLM fallback (only when use_llm=True and confidence < 0.75)
85
+ try:
86
+ emit_mapper_log("info", "Entity LLM fallback required",
87
+ org_id=org_id, source_id=source_id, rule_confidence=confidence)
88
+
89
+ llm = get_llm_service()
90
+ if not llm.is_ready():
91
+ emit_mapper_log("warning", "LLM not ready, using rule-based entity",
92
+ org_id=org_id, source_id=source_id)
93
+ return entity_type, confidence, False
94
+
95
+ # Build prompt
96
+ columns_str = ",".join(df.columns)
97
+ prompt = f"""Analyze these column names and determine the business entity type:
98
+
99
+ Columns: {columns_str}
100
+
101
+ Return ONLY JSON:
102
+ {{"entity_type":"SALES|INVENTORY|CUSTOMER|PRODUCT","confidence":0.95}}"""
103
+
104
+ # Generate with LLM
105
+ response = llm.generate(prompt, max_tokens=50, temperature=0.1)
106
+ result = json.loads(response)
107
+
108
+ llm_entity = result["entity_type"].upper()
109
+ llm_confidence = float(result["confidence"])
110
+
111
+ emit_mapper_log("info", "Entity LLM completed",
112
+ org_id=org_id, source_id=source_id,
113
+ llm_entity=llm_entity, llm_confidence=llm_confidence)
114
+
115
+ # Use LLM result if more confident
116
+ if llm_confidence > confidence:
117
+ return llm_entity, llm_confidence, True
118
+
119
+ return entity_type, confidence, False
120
+
121
+ except Exception as e:
122
+ emit_mapper_log("error", "Entity LLM fallback failed",
123
+ org_id=org_id, source_id=source_id, error=str(e))
124
+
125
+ if detection_errors:
126
+ detection_errors.labels(detection_type="entity", org_id=org_id, error_type=type(e).__name__).inc()
127
+
128
+ return entity_type, confidence, False
129
+
130
+
131
+ # ====================================================================
132
+ # 🎯 INDUSTRY TYPE DETECTION
133
+ # ====================================================================
134
+
135
+ def hybrid_detect_industry_type(org_id: str, df: pd.DataFrame, source_id: str,
136
+ use_llm: bool = False) -> Tuple[str, float, bool]:
137
+ """
138
+ Detect industry vertical (SUPERMARKET, MANUFACTURING, PHARMA, RETAIL, WHOLESALE, HEALTHCARE)
139
+
140
+ Args:
141
+ org_id: Organization ID
142
+ df: DataFrame to analyze
143
+ source_id: Source identifier
144
+ use_llm: If True, enhance with LLM when confidence < 0.75
145
+
146
+ Returns:
147
+ (industry: str, confidence: float, is_confident: bool)
148
+ """
149
+ start_time = time.time()
150
+ emit_mapper_log("info", "Industry detection started",
151
+ org_id=org_id, source_id=source_id, use_llm=use_llm)
152
+
153
+ # βœ… RULE-BASED DETECTION (always runs first – <10ms)
154
+ industry, confidence = rule_based_industry(df)
155
+ industry = industry.upper()
156
+
157
+ emit_mapper_log("info", "Rule-based industry completed",
158
+ org_id=org_id, source_id=source_id,
159
+ industry=industry, confidence=confidence)
160
+
161
+ # 2. If confident OR LLM disabled, return immediately
162
+ if confidence > 0.75 or not use_llm:
163
+ return industry, confidence, True
164
+
165
+ # 3. LLM fallback
166
+ try:
167
+ emit_mapper_log("info", "Industry LLM fallback required",
168
+ org_id=org_id, source_id=source_id, rule_confidence=confidence)
169
+
170
+ llm = get_llm_service()
171
+ if not llm.is_ready():
172
+ emit_mapper_log("warning", "LLM not ready for industry",
173
+ org_id=org_id, source_id=source_id)
174
+ return industry, confidence, False
175
+
176
+ # Industry-specific prompt with sample data
177
+ columns_str = ",".join(df.columns)
178
+ sample_data = df.head(3).to_dict(orient="records")
179
+
180
+ prompt = f"""Analyze this dataset and determine the business industry vertical:
181
+
182
+ Columns: {columns_str}
183
+ Sample rows: {json.dumps(sample_data)}
184
+
185
+ Return ONLY JSON:
186
+ {{"industry":"SUPERMARKET|MANUFACTURING|PHARMA|RETAIL|WHOLESALE|HEALTHCARE","confidence":0.95}}"""
187
+
188
+ response = llm.generate(prompt, max_tokens=50, temperature=0.1)
189
+ result = json.loads(response)
190
+
191
+ llm_industry = result["industry"].upper()
192
+ llm_confidence = float(result["confidence"])
193
+
194
+ emit_mapper_log("info", "Industry LLM completed",
195
+ org_id=org_id, source_id=source_id,
196
+ llm_industry=llm_industry, llm_confidence=llm_confidence)
197
+
198
+ if llm_confidence > confidence:
199
+ return llm_industry, llm_confidence, True
200
+
201
+ return industry, confidence, False
202
+
203
+ except Exception as e:
204
+ emit_mapper_log("error", "Industry LLM fallback failed",
205
+ org_id=org_id, source_id=source_id, error=str(e))
206
+
207
+ if detection_errors:
208
+ detection_errors.labels(detection_type="industry", org_id=org_id, error_type=type(e).__name__).inc()
209
+
210
+ return industry, confidence, False
211
+
212
+
213
+ # ====================================================================
214
+ # πŸ”§ REDIS CACHE HELPERS (Shared by both)
215
+ # ====================================================================
216
+
217
+ def get_cached_detection(org_id: str, source_id: str, detection_type: str) -> Optional[Dict[str, Any]]:
218
+ """
219
+ Check Redis for cached detection result
220
+
221
+ Args:
222
+ detection_type: "entity" or "industry"
223
+
224
+ Returns:
225
+ {"type": str, "confidence": float, "cached": True} or None
226
+ """
227
+ key = f"{detection_type}:{org_id}:{source_id}"
228
+ cached = event_hub.get_key(key)
229
+
230
+ if cached:
231
+ data = json.loads(cached)
232
+ data["cached"] = True
233
+ return data
234
+
235
+ return None
236
+
237
+
238
+ def cache_detection(org_id: str, source_id: str, detection_type: str,
239
+ value: str, confidence: float):
240
+ """Store detection result in Redis with 1-hour TTL"""
241
+ key = f"{detection_type}:{org_id}:{source_id}"
242
+
243
+ event_hub.setex(key, 3600, json.dumps({
244
+ "type": value,
245
+ "confidence": confidence,
246
+ "cached_by": "detection_engine",
247
+ "cached_at": datetime.utcnow().isoformat()
248
+ }))
app/core/sre_logging.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ app/core/sre_logging.py – SRE Log Aggregation (No Circular Dependencies)
3
+ ==========================================================================
4
+ Central log aggregator and emitter functions that can be safely imported
5
+ by any service without causing circular imports.
6
+ """
7
+
8
+ import threading
9
+ import logging
10
+ from datetime import datetime, timedelta
11
+ from typing import List, Dict, Any, Optional
12
+ from collections import deque
13
+
14
+ # Global log aggregator (ring buffer for recent logs)
15
+ class LogAggregator:
16
+ """Thread-safe ring buffer storing last 1000 logs from all services"""
17
+ def __init__(self, max_size: int = 1000):
18
+ self.max_size = max_size
19
+ self.buffer: deque = deque(maxlen=max_size)
20
+ self.lock = threading.Lock()
21
+
22
+ def emit(self, service: str, level: str, message: str, **kwargs):
23
+ """Add a log entry from any service"""
24
+ with self.lock:
25
+ entry = {
26
+ "timestamp": datetime.utcnow().isoformat(),
27
+ "service": service,
28
+ "level": level,
29
+ "message": message,
30
+ **kwargs
31
+ }
32
+ self.buffer.append(entry)
33
+
34
+ def get_logs(self, service: Optional[str] = None, level: Optional[str] = None, limit: int = 100) -> List[Dict]:
35
+ """Retrieve filtered logs (most recent first)"""
36
+ with self.lock:
37
+ filtered = [
38
+ log for log in self.buffer
39
+ if (not service or log["service"] == service)
40
+ and (not level or log["level"] == level)
41
+ ]
42
+ return list(filtered)[-limit:]
43
+
44
+ def get_error_rate(self, service: Optional[str], window_minutes: int = 5) -> float:
45
+ """Calculate error rate for a service (or all if service=None)"""
46
+ cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
47
+ cutoff_str = cutoff.isoformat()
48
+
49
+ with self.lock:
50
+ recent = [
51
+ log for log in self.buffer
52
+ if log["timestamp"] >= cutoff_str
53
+ and (not service or log["service"] == service)
54
+ ]
55
+ if not recent:
56
+ return 0.0
57
+ errors = [log for log in recent if log["level"] in ("error", "critical")]
58
+ return len(errors) / len(recent)
59
+
60
+ # Global singleton
61
+ log_aggregator = LogAggregator(max_size=1000)
62
+
63
+ # Service-specific emitter functions (safe to import anywhere)
64
+ def emit_worker_log(level: str, message: str, **kwargs):
65
+ log_aggregator.emit("analytics_worker", level, message, **kwargs)
66
+
67
+ def emit_vector_log(level: str, message: str, **kwargs):
68
+ log_aggregator.emit("vector_service", level, message, **kwargs)
69
+
70
+ def emit_llm_log(level: str, message: str, **kwargs):
71
+ log_aggregator.emit("llm_service", level, message, **kwargs)
72
+
73
+ def emit_mapper_log(level: str, message: str, **kwargs):
74
+ log_aggregator.emit("mapper", level, message, **kwargs)
75
+
76
+ def emit_deps_log(level: str, message: str, **kwargs):
77
+ log_aggregator.emit("dependencies", level, message, **kwargs)
app/core/worker_manager.py CHANGED
@@ -1,49 +1,255 @@
1
- # app/core/worker_manager.py – UPSTASH-COMPATIBLE v4.1
 
 
 
 
 
 
 
 
 
 
2
 
3
  import asyncio
4
  import json
5
  import os
6
  import time
7
- from typing import Dict, List, Optional, Any
 
8
  import logging
9
- import datetime
 
10
  from app.core.event_hub import event_hub
11
  from app.tasks.analytics_worker import AnalyticsWorker
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
 
16
- def _safe_redis_decode(value: Any) -> str:
17
- """Safely decode Redis values that might be bytes or str"""
18
- if isinstance(value, bytes):
19
- return value.decode('utf-8')
20
- return str(value)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
 
23
  class WorkerManager:
24
  """
25
- πŸŽ›οΈ Manages worker lifecycle and prevents Redis hammering
26
- Uses ONLY Upstash-safe HTTP commands: GET, SET, EXISTS, DEL, XREVRANGE
27
  """
28
 
29
  def __init__(self):
30
  self.active_workers: Dict[str, asyncio.Task] = {}
31
  self._shutdown = False
32
 
33
- # ⚑ ADAPTIVE POLLING (configurable via env vars)
34
  self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0"))
35
  self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0"))
36
  self.consecutive_empty = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
  async def start_listener(self):
39
  """
40
- 🎧 UPSTASH-SAFE: No pubsub, no blocking xread, just smart async polling
41
- Redis ops: ~0.03/sec idle, ~2/sec under load
 
 
42
  """
43
- logger.info(
44
- f"🎧 Worker Manager: Einstein+Elon mode ENGAGED "
45
- f"(active: {self.active_interval}s, idle: {self.idle_interval}s)"
46
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
  while not self._shutdown:
49
  try:
@@ -58,50 +264,43 @@ class WorkerManager:
58
  self.consecutive_empty += 1
59
  interval = self._get_backoff_interval()
60
 
61
- # Log state changes
62
  if self.consecutive_empty == 5:
63
- logger.info(f"[MANAGER] πŸ›Œ Idle mode activated (poll: {interval:.1f}s)")
64
 
65
  await asyncio.sleep(interval)
66
 
67
  except asyncio.CancelledError:
68
- logger.info("[MANAGER] πŸ›‘ Listener cancelled")
69
  break
70
  except Exception as e:
71
- logger.error(f"[MANAGER] ❌ Error: {e}", exc_info=True)
 
72
  await asyncio.sleep(5)
73
 
 
 
74
  async def _fetch_pending_triggers(self) -> List[tuple]:
75
- """
76
- Fetch pending triggers in a SINGLE Redis call
77
- Uses xrevrange to get newest messages without blocking
78
- Returns: [(msg_id, {field: value}), ...]
79
- """
80
  try:
81
- # Get last 10 messages from stream (non-blocking)
82
  result = event_hub.redis.xrevrange(
83
  "stream:analytics_triggers",
84
  count=10
85
  )
86
 
87
- # Handle different response formats from Upstash
88
  messages = []
89
  if isinstance(result, dict):
90
- # Format: {msg_id: {field: value}, ...}
91
  for msg_id, data in result.items():
92
  messages.append((msg_id, data))
93
  elif isinstance(result, list):
94
- # Format: [(msg_id, [field, value, field, value]), ...]
95
  for item in result:
96
  if isinstance(item, (list, tuple)) and len(item) == 2:
97
  msg_id, data = item
98
- # Convert flat list to dict if needed
99
  if isinstance(data, list):
100
  data_dict = {}
101
  for i in range(0, len(data), 2):
102
  if i + 1 < len(data):
103
- key = _safe_redis_decode(data[i])
104
- value = _safe_redis_decode(data[i + 1])
105
  data_dict[key] = value
106
  messages.append((msg_id, data_dict))
107
  else:
@@ -110,166 +309,245 @@ class WorkerManager:
110
  return messages
111
 
112
  except Exception as e:
113
- logger.debug(f"[MANAGER] Fetch failed: {e}")
114
  return []
115
 
116
  async def _process_batch(self, messages: List[tuple]):
117
  """Process multiple triggers efficiently"""
118
- logger.info(f"[MANAGER] πŸ“₯ Processing {len(messages)} triggers")
119
 
120
  for msg_id, msg_data in messages:
121
  try:
122
- # Handle different data formats
123
  if isinstance(msg_data, dict):
124
- # Already a dict
125
  message_str = msg_data.get("message", "{}")
126
- elif isinstance(msg_data, list):
127
- # Flat list: [field, value, field, value]
128
- message_str = "{}"
129
- for i in range(0, len(msg_data), 2):
130
- if i + 1 < len(msg_data):
131
- key = _safe_redis_decode(msg_data[i])
132
- if key == "message":
133
- message_str = _safe_redis_decode(msg_data[i + 1])
134
- break
135
  else:
136
- logger.warning(f"[MANAGER] Unknown msg_data format: {type(msg_data)}")
137
- continue
138
 
139
  payload = json.loads(message_str)
140
  await self._handle_trigger(payload)
141
 
142
  # Acknowledge: delete processed message
143
  event_hub.redis.xdel("stream:analytics_triggers", msg_id)
 
144
 
145
  except Exception as e:
146
- logger.error(f"[MANAGER] ❌ Process error: {e}", exc_info=True)
 
 
 
 
147
 
148
  async def _handle_trigger(self, data: dict):
149
- """Launch worker with deduplication"""
150
  org_id = data.get("org_id")
151
  source_id = data.get("source_id")
152
 
153
  if not org_id or not source_id:
154
- logger.warning(f"[MANAGER] ⚠️ Invalid payload: {data}")
155
  return
156
 
157
  worker_id = f"{org_id}:{source_id}"
158
 
159
  # Skip if already running
160
  if worker_id in self.active_workers and not self.active_workers[worker_id].done():
161
- logger.debug(f"[MANAGER] ⏭️ Already running: {worker_id}")
162
  return
163
 
164
  # Spawn worker
 
165
  task = asyncio.create_task(
166
- self._run_worker(worker_id, org_id, source_id),
167
  name=f"worker-{worker_id}"
168
  )
169
  self.active_workers[worker_id] = task
170
- logger.info(f"[MANAGER] πŸš€ Spawned: {worker_id}")
 
 
 
 
 
 
 
 
 
 
171
 
172
- async def _run_worker(self, worker_id: str, org_id: str, source_id: str):
173
- """Execute KPI computation with automatic cleanup"""
 
 
174
  try:
 
 
175
  worker = AnalyticsWorker(org_id, source_id)
176
- await worker.run()
177
- logger.info(f"[MANAGER] βœ… Complete: {worker_id}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  except Exception as e:
179
- logger.error(f"[MANAGER] ❌ Failed: {worker_id} - {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
180
  finally:
181
  self.active_workers.pop(worker_id, None)
182
 
183
- def _get_backoff_interval(self) -> float:
184
- """Adaptive backoff: faster when busy, slower when idle"""
185
- if self.consecutive_empty < 5:
186
- return self.active_interval
187
- return min(
188
- self.idle_interval,
189
- self.active_interval * (2 ** min(self.consecutive_empty - 5, 5))
190
- )
 
 
 
 
 
 
 
191
 
192
  def shutdown(self):
193
- """Graceful shutdown"""
194
  self._shutdown = True
195
- logger.info("[MANAGER] πŸ›‘ Shutdown initiated")
196
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
 
198
- # ==================== FASTAPI INTEGRATION ====================
199
 
200
- # Global manager instance (for app/main.py import)
201
- worker_manager = WorkerManager()
202
 
203
- _worker_manager: Optional[WorkerManager] = None
204
 
205
 
206
  async def get_worker_manager() -> WorkerManager:
207
- """Get or create worker manager singleton (async factory)"""
208
- global _worker_manager
209
- if _worker_manager is None:
210
- _worker_manager = WorkerManager()
211
- return _worker_manager
212
 
213
 
214
- async def trigger_kpi_computation(org_id: str, source_id: str):
215
  """
216
- 🎯 FastAPI endpoint handler - triggers worker via Redis stream
217
- Idempotent: multiple calls won't spawn duplicate workers
218
  """
219
  try:
220
- # Write to stream (HTTP-safe)
221
- event_hub.redis.xadd(
222
- "stream:analytics_triggers",
223
- {
224
- "message": json.dumps({
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  "org_id": org_id,
226
  "source_id": source_id,
227
  "type": "kpi_compute",
228
- "timestamp": datetime.now().isoformat()
229
- })
230
- }
231
- )
232
- logger.info(f"🎯 Triggered KPI computation: {org_id}/{source_id}")
233
- return {"status": "triggered", "org_id": org_id, "source_id": source_id}
 
 
 
 
 
 
 
234
 
235
  except Exception as e:
236
- logger.error(f"Trigger failed: {e}", exc_info=True)
237
  return {"status": "error", "message": str(e)}
238
 
239
- # ==================== BACKGROUND REFRESH (Optional) ====================
240
 
241
  async def continuous_kpi_refresh(manager: WorkerManager):
242
- """
243
- πŸŽ›οΈ Gentle background refresh - runs every 5 minutes
244
- Only triggers for stale data (no active worker, no fresh cache)
245
- """
246
- await asyncio.sleep(10) # Let app startup complete
247
 
248
  while True:
249
  try:
250
- # Get all entity keys (HTTP-safe)
251
- entity_keys = event_hub.redis.keys("entity:*:*")
252
 
253
- for key in entity_keys[:10]: # Max 10 per cycle
254
  key_str = key.decode() if isinstance(key, bytes) else key
255
  _, org_id, source_id = key_str.split(":")
256
 
257
- worker_id = f"{org_id}:{source_id}"
258
-
259
- # Skip if worker already running
260
- if worker_id in manager.active_workers:
261
  continue
262
 
263
- # Skip if KPIs are fresh (< 5 min old)
264
  cache_key = f"kpi_cache:{org_id}:{source_id}"
265
  if event_hub.redis.exists(cache_key):
266
  continue
267
 
268
- # Trigger refresh
269
  await trigger_kpi_computation(org_id, source_id)
270
- await asyncio.sleep(1) # 1s gap
271
 
272
  except Exception as e:
273
- logger.error(f"[AUTO] Error: {e}", exc_info=True)
274
 
275
- await asyncio.sleep(300) # ⭐ Sleep 5 minutes
 
1
+ """
2
+ WorkerManager v5.0: TCP Redis Pub/Sub + SRE Observability
3
+
4
+ Key changes:
5
+ - Replaces polling with Redis pub/sub for instant trigger detection
6
+ - Adds Prometheus metrics for worker lifecycle
7
+ - Circuit breaker for Redis connection failures
8
+ - Structured JSON logging for Loki/Splunk
9
+ - Backward compatible: falls back to polling if TCP Redis unavailable
10
+ - Zero changes to public API
11
+ """
12
 
13
  import asyncio
14
  import json
15
  import os
16
  import time
17
+ from typing import Dict, List, Optional, Any, AsyncGenerator
18
+ from datetime import datetime
19
  import logging
20
+ from enum import Enum
21
+
22
  from app.core.event_hub import event_hub
23
  from app.tasks.analytics_worker import AnalyticsWorker
24
+ from app.core.sre_logging import emit_worker_log, emit_deps_log
25
+
26
+ # Prometheus metrics (free tier compatible)
27
+ try:
28
+ from prometheus_client import Counter, Histogram, Gauge
29
+ except ImportError:
30
+ class Counter:
31
+ def __init__(self, *args, **kwargs): pass
32
+ def inc(self, amount=1): pass
33
+
34
+ class Histogram:
35
+ def __init__(self, *args, **kwargs): pass
36
+ def observe(self, value): pass
37
+
38
+ class Gauge:
39
+ def __init__(self, *args, **kwargs): pass
40
+ def set(self, value): pass
41
 
42
  logger = logging.getLogger(__name__)
43
 
44
 
45
+ class WorkerEventType(Enum):
46
+ """Pub/sub event types for worker lifecycle"""
47
+ WORKER_STARTED = "worker.started"
48
+ WORKER_COMPLETED = "worker.completed"
49
+ WORKER_FAILED = "worker.failed"
50
+ TRIGGER_RECEIVED = "trigger.received"
51
+
52
+
53
+ class WorkerManagerMetrics:
54
+ """SRE: Prometheus metrics for worker operations"""
55
+ triggers_received = Counter(
56
+ 'worker_triggers_total',
57
+ 'Total triggers received',
58
+ ['org_id', 'source_id']
59
+ )
60
+
61
+ workers_spawned = Counter(
62
+ 'workers_spawned_total',
63
+ 'Total workers spawned',
64
+ ['org_id', 'source_id']
65
+ )
66
+
67
+ workers_failed = Counter(
68
+ 'workers_failed_total',
69
+ 'Total worker failures',
70
+ ['org_id', 'source_id', 'error_type']
71
+ )
72
+
73
+ worker_duration = Histogram(
74
+ 'worker_duration_seconds',
75
+ 'Worker execution duration',
76
+ ['org_id', 'source_id']
77
+ )
78
+
79
+ trigger_latency = Histogram(
80
+ 'trigger_latency_seconds',
81
+ 'Time from trigger to worker start',
82
+ ['org_id', 'source_id']
83
+ )
84
+
85
+ active_workers_gauge = Gauge(
86
+ 'active_workers',
87
+ 'Number of currently active workers',
88
+ ['org_id']
89
+ )
90
 
91
 
92
  class WorkerManager:
93
  """
94
+ πŸŽ›οΈ Enterprise worker manager with SRE observability
95
+ Uses TCP Redis pub/sub for real-time triggers, falls back to polling
96
  """
97
 
98
  def __init__(self):
99
  self.active_workers: Dict[str, asyncio.Task] = {}
100
  self._shutdown = False
101
 
102
+ # Adaptive polling config (used as fallback)
103
  self.active_interval = float(os.getenv("WORKER_POLL_ACTIVE", "1.0"))
104
  self.idle_interval = float(os.getenv("WORKER_POLL_IDLE", "30.0"))
105
  self.consecutive_empty = 0
106
+
107
+ # Pub/sub state
108
+ self._pubsub = None
109
+ self._subscription_task = None
110
+
111
+ # SRE: Circuit breaker
112
+ self._circuit_breaker = {
113
+ "failure_count": 0,
114
+ "last_failure_time": None,
115
+ "is_open": False,
116
+ "threshold": 5,
117
+ "reset_timeout": 300
118
+ }
119
+
120
+ # SRE: Metrics tracking
121
+ self._metrics = {
122
+ "triggers_processed": 0,
123
+ "workers_spawned": 0,
124
+ "workers_failed": 0,
125
+ "total_latency_ms": 0
126
+ }
127
+
128
+ emit_worker_log("info", "WorkerManager initialized with SRE observability")
129
+
130
+ # ====== SRE: Circuit Breaker ======
131
+
132
+ def _check_circuit_breaker(self) -> bool:
133
+ """Check if Redis circuit is open"""
134
+ if not self._circuit_breaker["is_open"]:
135
+ return True
136
+
137
+ # Check if enough time has passed to retry
138
+ if self._circuit_breaker["last_failure_time"]:
139
+ elapsed = time.time() - self._circuit_breaker["last_failure_time"]
140
+ if elapsed > self._circuit_breaker["reset_timeout"]:
141
+ logger.warning("[WORKER] Circuit breaker closing, retrying...")
142
+ self._circuit_breaker["is_open"] = False
143
+ self._circuit_breaker["failure_count"] = 0
144
+ return True
145
+
146
+ logger.error("[WORKER] Circuit breaker OPEN - rejecting operations")
147
+ return False
148
+
149
+ def _record_failure(self, error_type: str):
150
+ """Track Redis/pubsub failures"""
151
+ self._circuit_breaker["failure_count"] += 1
152
+ self._circuit_breaker["last_failure_time"] = time.time()
153
+
154
+ if self._circuit_breaker["failure_count"] >= self._circuit_breaker["threshold"]:
155
+ self._circuit_breaker["is_open"] = True
156
+ logger.critical(f"[WORKER] Circuit opened! {self._circuit_breaker['failure_count']} failures")
157
+
158
+ def _record_success(self):
159
+ """Reset failure count on success"""
160
+ if self._circuit_breaker["failure_count"] > 0:
161
+ logger.info(f"[WORKER] Resetting failure count (was {self._circuit_breaker['failure_count']})")
162
+ self._circuit_breaker["failure_count"] = 0
163
+
164
+ # ====== SRE: Metrics Collection ======
165
+
166
+ def _emit_metrics(self, operation: str, duration_ms: float, **kwargs):
167
+ """Emit structured metrics for monitoring"""
168
+ metrics_data = {
169
+ "service": "worker_manager",
170
+ "operation": operation,
171
+ "duration_ms": round(duration_ms, 2),
172
+ "timestamp": datetime.utcnow().isoformat(),
173
+ **kwargs
174
+ }
175
+
176
+ emit_worker_log("info", f"Metrics: {operation}", **metrics_data)
177
+
178
+ # ====== Pub/Sub Listener (NEW) ======
179
 
180
  async def start_listener(self):
181
  """
182
+ 🎧 TCP REDIS: Real-time pub/sub trigger listener
183
+ Falls back to polling if TCP Redis unavailable
184
+
185
+ Redis ops: 0/sec idle, instant delivery under load
186
  """
187
+ emit_worker_log("info", "Starting WorkerManager listener",
188
+ active_interval=self.active_interval,
189
+ idle_interval=self.idle_interval)
190
+
191
+ # Try pub/sub first (TCP Redis only)
192
+ if hasattr(event_hub.redis, 'pubsub') and not event_hub.is_rest_api:
193
+ await self._start_pubsub_listener()
194
+ else:
195
+ # Fall back to polling (Upstash-compatible)
196
+ logger.warning("[WORKER] ⚠️ TCP Redis not available, falling back to polling")
197
+ await self._start_polling_listener()
198
+
199
+ async def _start_pubsub_listener(self):
200
+ """Real-time pub/sub subscription"""
201
+ try:
202
+ self._pubsub = event_hub.redis.pubsub()
203
+ channel = "stream:analytics_triggers"
204
+
205
+ await asyncio.to_thread(self._pubsub.subscribe, channel)
206
+ logger.info(f"[WORKER] πŸ“‘ Subscribed to {channel}")
207
+
208
+ while not self._shutdown:
209
+ if not self._check_circuit_breaker():
210
+ await asyncio.sleep(self._circuit_breaker["reset_timeout"])
211
+ continue
212
+
213
+ try:
214
+ message = await asyncio.to_thread(self._pubsub.get_message, timeout=1.0)
215
+
216
+ if message and message['type'] == 'message':
217
+ trigger_start = time.time()
218
+
219
+ payload = json.loads(message['data'])
220
+ await self._handle_trigger(payload)
221
+
222
+ # SRE: Record trigger latency
223
+ latency_ms = (time.time() - trigger_start) * 1000
224
+ org_id = payload.get("org_id", "unknown")
225
+ source_id = payload.get("source_id", "unknown")
226
+
227
+ WorkerManagerMetrics.trigger_latency.labels(
228
+ org_id=org_id, source_id=source_id
229
+ ).observe(latency_ms / 1000)
230
+
231
+ WorkerManagerMetrics.triggers_received.labels(
232
+ org_id=org_id, source_id=source_id
233
+ ).inc()
234
+
235
+ emit_worker_log("info", "Trigger processed via pub/sub",
236
+ org_id=org_id, source_id=source_id, latency_ms=latency_ms)
237
+
238
+ # Heartbeat
239
+ await asyncio.sleep(0.1)
240
+
241
+ except Exception as e:
242
+ self._record_failure(f"pubsub_error:{type(e).__name__}")
243
+ emit_worker_log("error", "Pub/sub error", error=str(e))
244
+ await asyncio.sleep(5)
245
+
246
+ except Exception as e:
247
+ logger.error(f"[WORKER] ❌ Pub/sub init failed: {e}, falling back to polling")
248
+ await self._start_polling_listener()
249
+
250
+ async def _start_polling_listener(self):
251
+ """Legacy polling-based listener (Upstash-compatible)"""
252
+ emit_worker_log("info", "Starting polling-based listener (fallback)")
253
 
254
  while not self._shutdown:
255
  try:
 
264
  self.consecutive_empty += 1
265
  interval = self._get_backoff_interval()
266
 
 
267
  if self.consecutive_empty == 5:
268
+ logger.info(f"[WORKER] πŸ›Œ Idle mode (poll: {interval:.1f}s)")
269
 
270
  await asyncio.sleep(interval)
271
 
272
  except asyncio.CancelledError:
273
+ logger.info("[WORKER] πŸ›‘ Listener cancelled")
274
  break
275
  except Exception as e:
276
+ self._record_failure(f"polling_error:{type(e).__name__}")
277
+ emit_worker_log("error", "Polling error", error=str(e))
278
  await asyncio.sleep(5)
279
 
280
+ # ====== Fallback Polling Methods (UNCHANGED) ======
281
+
282
  async def _fetch_pending_triggers(self) -> List[tuple]:
283
+ """Fetch pending triggers using xrevrange (Upstash-compatible)"""
 
 
 
 
284
  try:
 
285
  result = event_hub.redis.xrevrange(
286
  "stream:analytics_triggers",
287
  count=10
288
  )
289
 
 
290
  messages = []
291
  if isinstance(result, dict):
 
292
  for msg_id, data in result.items():
293
  messages.append((msg_id, data))
294
  elif isinstance(result, list):
 
295
  for item in result:
296
  if isinstance(item, (list, tuple)) and len(item) == 2:
297
  msg_id, data = item
 
298
  if isinstance(data, list):
299
  data_dict = {}
300
  for i in range(0, len(data), 2):
301
  if i + 1 < len(data):
302
+ key = data[i].decode() if isinstance(data[i], bytes) else str(data[i])
303
+ value = data[i+1].decode() if isinstance(data[i+1], bytes) else str(data[i+1])
304
  data_dict[key] = value
305
  messages.append((msg_id, data_dict))
306
  else:
 
309
  return messages
310
 
311
  except Exception as e:
312
+ emit_worker_log("error", "Fetch triggers failed", error=str(e))
313
  return []
314
 
315
  async def _process_batch(self, messages: List[tuple]):
316
  """Process multiple triggers efficiently"""
317
+ emit_worker_log("info", f"Processing {len(messages)} triggers", trigger_count=len(messages))
318
 
319
  for msg_id, msg_data in messages:
320
  try:
 
321
  if isinstance(msg_data, dict):
 
322
  message_str = msg_data.get("message", "{}")
 
 
 
 
 
 
 
 
 
323
  else:
324
+ message_str = "{}"
 
325
 
326
  payload = json.loads(message_str)
327
  await self._handle_trigger(payload)
328
 
329
  # Acknowledge: delete processed message
330
  event_hub.redis.xdel("stream:analytics_triggers", msg_id)
331
+ self._metrics["triggers_processed"] += 1
332
 
333
  except Exception as e:
334
+ self._metrics["workers_failed"] += 1
335
+ self._record_failure(f"process_error:{type(e).__name__}")
336
+ emit_worker_log("error", "Process error", error=str(e))
337
+
338
+ # ====== Worker Execution (INSTRUMENTED) ======
339
 
340
  async def _handle_trigger(self, data: dict):
341
+ """Launch worker with deduplication and metrics"""
342
  org_id = data.get("org_id")
343
  source_id = data.get("source_id")
344
 
345
  if not org_id or not source_id:
346
+ emit_worker_log("warning", "Invalid trigger payload", payload=data)
347
  return
348
 
349
  worker_id = f"{org_id}:{source_id}"
350
 
351
  # Skip if already running
352
  if worker_id in self.active_workers and not self.active_workers[worker_id].done():
353
+ emit_worker_log("debug", "Worker already running", worker_id=worker_id)
354
  return
355
 
356
  # Spawn worker
357
+ start_time = time.time()
358
  task = asyncio.create_task(
359
+ self._run_worker(worker_id, org_id, source_id, data),
360
  name=f"worker-{worker_id}"
361
  )
362
  self.active_workers[worker_id] = task
363
+
364
+ # SRE: Update metrics
365
+ self._metrics["workers_spawned"] += 1
366
+ WorkerManagerMetrics.workers_spawned.labels(
367
+ org_id=org_id, source_id=source_id
368
+ ).inc()
369
+
370
+ WorkerManagerMetrics.active_workers_gauge.labels(org_id=org_id).inc()
371
+
372
+ emit_worker_log("info", "Worker spawned",
373
+ worker_id=worker_id, org_id=org_id, source_id=source_id)
374
 
375
+ async def _run_worker(self, worker_id: str, org_id: str, source_id: str, trigger_data: dict):
376
+ """Execute KPI computation with full instrumentation"""
377
+ start_time = time.time()
378
+
379
  try:
380
+ emit_worker_log("info", "Worker execution started", worker_id=worker_id)
381
+
382
  worker = AnalyticsWorker(org_id, source_id)
383
+ results = await worker.run()
384
+
385
+ duration_ms = (time.time() - start_time) * 1000
386
+ self._metrics["total_latency_ms"] += duration_ms
387
+
388
+ WorkerManagerMetrics.worker_duration.labels(
389
+ org_id=org_id, source_id=source_id
390
+ ).observe(duration_ms / 1000)
391
+
392
+ # Update active workers gauge
393
+ WorkerManagerMetrics.active_workers_gauge.labels(org_id=org_id).dec()
394
+
395
+ emit_worker_log("info", "Worker completed",
396
+ worker_id=worker_id, duration_ms=round(duration_ms, 2))
397
+
398
+ return results
399
+
400
  except Exception as e:
401
+ self._metrics["workers_failed"] += 1
402
+ self._record_failure(f"worker_error:{type(e).__name__}")
403
+
404
+ WorkerManagerMetrics.workers_failed.labels(
405
+ org_id=org_id, source_id=source_id, error_type=type(e).__name__
406
+ ).inc()
407
+
408
+ emit_worker_log("error", "Worker failed",
409
+ worker_id=worker_id, error=str(e))
410
+
411
+ raise
412
+
413
  finally:
414
  self.active_workers.pop(worker_id, None)
415
 
416
+ # ====== SRE: Status & Metrics ======
417
+
418
+ def get_metrics(self) -> Dict[str, Any]:
419
+ """SRE: Get current metrics snapshot"""
420
+ return {
421
+ **self._metrics,
422
+ "active_workers": len(self.active_workers),
423
+ "consecutive_empty": self.consecutive_empty,
424
+ "backoff_interval": self._get_backoff_interval(),
425
+ "circuit_breaker": {
426
+ "open": self._circuit_breaker["is_open"],
427
+ "failure_count": self._circuit_breaker["failure_count"]
428
+ },
429
+ "pubsub_mode": self._pubsub is not None
430
+ }
431
 
432
  def shutdown(self):
433
+ """Graceful shutdown with SRE cleanup"""
434
  self._shutdown = True
435
+
436
+ # Close pub/sub connection
437
+ if self._pubsub:
438
+ try:
439
+ asyncio.run_coroutine_threadsafe(
440
+ asyncio.to_thread(self._pubsub.close),
441
+ asyncio.get_event_loop()
442
+ )
443
+ except:
444
+ pass
445
+
446
+ emit_worker_log("warning", "Shutdown initiated",
447
+ active_workers=len(self.active_workers))
448
+
449
+ # Wait for active workers to complete
450
+ if self.active_workers:
451
+ pending = list(self.active_workers.values())
452
+ asyncio.gather(*pending, return_exceptions=True)
453
+
454
+ emit_worker_log("info", "Shutdown completed")
455
 
 
456
 
457
+ # ==================== FastAPI Integration ====================
 
458
 
459
+ _worker_manager_instance: Optional[WorkerManager] = None
460
 
461
 
462
  async def get_worker_manager() -> WorkerManager:
463
+ """Singleton manager factory"""
464
+ global _worker_manager_instance
465
+ if _worker_manager_instance is None:
466
+ _worker_manager_instance = WorkerManager()
467
+ return _worker_manager_instance
468
 
469
 
470
+ async def trigger_kpi_computation(org_id: str, source_id: str) -> Dict[str, Any]:
471
  """
472
+ 🎯 Endpoint handler - triggers worker via pub/sub or stream
473
+ Now emits SRE metrics for tracking
474
  """
475
  try:
476
+ manager = await get_worker_manager()
477
+
478
+ # Publish to pub/sub if available (TCP Redis)
479
+ if hasattr(event_hub.redis, 'pubsub') and not event_hub.is_rest_api:
480
+ channel = "stream:analytics_triggers"
481
+ payload = {
482
+ "org_id": org_id,
483
+ "source_id": source_id,
484
+ "type": "kpi_compute",
485
+ "timestamp": datetime.utcnow().isoformat()
486
+ }
487
+
488
+ await asyncio.to_thread(
489
+ event_hub.publish,
490
+ channel,
491
+ json.dumps(payload)
492
+ )
493
+
494
+ WorkerManagerMetrics.triggers_received.labels(
495
+ org_id=org_id, source_id=source_id
496
+ ).inc()
497
+
498
+ emit_worker_log("info", "Trigger published via pub/sub",
499
+ org_id=org_id, source_id=source_id)
500
+ else:
501
+ # Fall back to stream (Upstash)
502
+ event_hub.redis.xadd(
503
+ "stream:analytics_triggers",
504
+ {"message": json.dumps({
505
  "org_id": org_id,
506
  "source_id": source_id,
507
  "type": "kpi_compute",
508
+ "timestamp": datetime.utcnow().isoformat()
509
+ })}
510
+ )
511
+
512
+ emit_worker_log("info", "Trigger published via stream (fallback)",
513
+ org_id=org_id, source_id=source_id)
514
+
515
+ return {
516
+ "status": "triggered",
517
+ "org_id": org_id,
518
+ "source_id": source_id,
519
+ "mode": "pubsub" if hasattr(event_hub.redis, 'pubsub') and not event_hub.is_rest_api else "stream"
520
+ }
521
 
522
  except Exception as e:
523
+ emit_worker_log("error", "Trigger failed", error=str(e))
524
  return {"status": "error", "message": str(e)}
525
 
 
526
 
527
  async def continuous_kpi_refresh(manager: WorkerManager):
528
+ """Background refresh (optional, unchanged logic)"""
529
+ await asyncio.sleep(10)
 
 
 
530
 
531
  while True:
532
  try:
533
+ manager = await get_worker_manager()
534
+ keys = event_hub.redis.keys("entity:*:*")
535
 
536
+ for key in keys[:10]:
537
  key_str = key.decode() if isinstance(key, bytes) else key
538
  _, org_id, source_id = key_str.split(":")
539
 
540
+ if f"{org_id}:{source_id}" in manager.active_workers:
 
 
 
541
  continue
542
 
 
543
  cache_key = f"kpi_cache:{org_id}:{source_id}"
544
  if event_hub.redis.exists(cache_key):
545
  continue
546
 
 
547
  await trigger_kpi_computation(org_id, source_id)
548
+ await asyncio.sleep(1)
549
 
550
  except Exception as e:
551
+ emit_worker_log("error", "Background refresh error", error=str(e))
552
 
553
+ await asyncio.sleep(300)
app/hybrid_entity_detector.py DELETED
@@ -1,81 +0,0 @@
1
- # app/hybrid_entity_detector.py
2
- import logging
3
- from typing import Tuple
4
- import pandas as pd
5
- from app.entity_detector import detect_entity_type as rule_based_detect
6
- from app.service.ai_service import ai_service
7
-
8
- logger = logging.getLogger(__name__)
9
-
10
- # ====================================================================
11
- # ❌ COMMENT OUT THE ORIGINAL LLM VERSION BELOW
12
- # ====================================================================
13
- # def hybrid_detect_entity_type(org_id: str, df: pd.DataFrame, filename: str) -> Tuple[str, float, bool]:
14
- # """
15
- # Hybrid detection: Rule-based (fast) β†’ LLM fallback (accurate).
16
- # Returns: (entity_type, confidence, is_confident)
17
- # """
18
- # # 1. Rule-based first (ALWAYS runs)
19
- # entity_type, confidence = rule_based_detect(df)
20
- # logger.info(f"[hybrid] Rule-based: {entity_type} ({confidence:.2f})")
21
- #
22
- # # 2. If confident, return IMMEDIATELY
23
- # if confidence > 0.75:
24
- # logger.info(f"[hybrid] βœ“ Confident enough, skipping LLM")
25
- # return entity_type, confidence, True
26
- #
27
- # # 3. LLM fallback with BULLETPROOF error handling
28
- # try:
29
- # logger.info(f"[hybrid] β†’ LLM fallback needed (confidence < 0.75)")
30
- #
31
- # # Check if LLM is ready (FAIL FAST)
32
- # if not ai_service.llm.is_loaded:
33
- # logger.warning("[hybrid] ⚠️ LLM not ready yet")
34
- # return entity_type, confidence, False
35
- #
36
- # logger.info(f"[hybrid] β†’ Calling AI service...")
37
- # columns = list(df.columns)
38
- # llm_result = ai_service.detect_entity_type(org_id, columns, filename)
39
- #
40
- # logger.info(f"[hybrid] ← AI service returned: {llm_result}")
41
- #
42
- # # Extract values safely
43
- # llm_entity = llm_result.get("entity_type", entity_type).upper()
44
- # llm_confidence = float(llm_result.get("confidence", 0.0))
45
- #
46
- # if llm_confidence > confidence:
47
- # logger.info(f"[hybrid] βœ“ Using LLM result: {llm_entity}")
48
- # return llm_entity, llm_confidence, True
49
- #
50
- # logger.info(f"[hybrid] β†’ Rule-based retained: {entity_type}")
51
- # return entity_type, confidence, False
52
- #
53
- # except Exception as e:
54
- # logger.error(f"[hybrid] ❌ CRASH: {e}", exc_info=True)
55
- # # βœ… NEVER crash the pipeline
56
- # return entity_type, confidence, False
57
-
58
- # ====================================================================
59
- # βœ… PASTE THIS RULE-BASED-ONLY VERSION BELOW
60
- # ====================================================================
61
-
62
- def hybrid_detect_entity_type(org_id: str, df: pd.DataFrame, filename: str) -> Tuple[str, float, bool]:
63
- """
64
- RULE-BASED ONLY MODE: Fast detection, no LLM fallback
65
- Returns: (entity_type, confidence, is_confident)
66
- """
67
- # Rule-based detection only - runs in < 10ms
68
- entity_type, confidence = rule_based_detect(df)
69
- entity_type = entity_type.upper() # Normalize
70
-
71
- # Log that we're in rule-based mode
72
- logger.info(f"[hybrid] RULE-BASED ONLY: {entity_type} ({confidence:.2f})")
73
-
74
- # Return as "confident" to bypass any LLM logic elsewhere
75
- return entity_type, confidence, True
76
-
77
- # ====================================================================
78
- # TO RE-ENABLE LLM:
79
- # 1. Comment out the RULE-BASED ONLY version above
80
- # 2. Uncomment the original LLM version below
81
- # ====================================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/hybrid_industry_detector.py DELETED
@@ -1,28 +0,0 @@
1
- # app/hybrid_industry_detector.py
2
- import logging
3
- import pandas as pd
4
- from typing import Tuple, Dict
5
- from app.utils.detect_industry import detect_industry as rule_based_detect
6
- from app.service.ai_service import ai_service
7
-
8
- logger = logging.getLogger(__name__)
9
-
10
- def hybrid_detect_industry_type(org_id: str, df: pd.DataFrame, filename: str = "") -> Tuple[str, float, bool]:
11
- """
12
- Detects BUSINESS VERTICAL (SUPERMARKET/MANUFACTURING/PHARMA/RETAIL/WHOLESALE/HEALTHCARE)
13
-
14
- Returns: (industry, confidence, is_confident)
15
- """
16
- # 1. Rule-based detection from utils (<10ms, zero LLM cost)
17
- industry, confidence = rule_based_detect(df)
18
- industry = industry.upper() # Normalize
19
-
20
- logger.info(f"[hybrid_industry] RULE-BASED ONLY: {industry} ({confidence:.2f})")
21
-
22
- # 2. [FUTURE] LLM fallback if confidence < 0.75
23
- # if confidence < 0.75:
24
- # logger.info(f"[hybrid_industry] β†’ LLM fallback needed")
25
- # # ... LLM logic here ...
26
-
27
- # 3. Always return as confident (rule-based is authoritative)
28
- return industry, confidence, True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/main.py CHANGED
@@ -25,7 +25,7 @@ from app.core.worker_manager import worker_manager
25
  from app.deps import rate_limit_org, verify_api_key, check_all_services
26
  from app.tasks.analytics_worker import trigger_kpi_computation
27
  from app.service.vector_service import cleanup_expired_vectors
28
- from app.routers import health, datasources, reports, flags, scheduler, run, socket, analytics_stream,ai_query,schema
29
  from app.service.llm_service import load_llm_service
30
  from app.deps import get_qstash_client
31
  from prometheus_client import make_asgi_app
@@ -422,8 +422,6 @@ app.include_router(datasources.router, prefix="/api/v1/datasources", dependencie
422
  app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depends(verify_api_key)])
423
  app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
424
  app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
425
- app.include_router(run.router, prefix="/api/v1/run", dependencies=[Depends(verify_api_key)])
426
- app.include_router(socket.router, prefix="/api/v1/socket", dependencies=[Depends(verify_api_key)])
427
  app.include_router(analytics_stream.router, dependencies=[Depends(verify_api_key)])
428
  app.include_router(ai_query.router, prefix="/api/v1/ai-query", dependencies=[Depends(verify_api_key)])
429
  app.include_router(schema.router, prefix="/api/v1/schema", dependencies=[Depends(verify_api_key)])
 
25
  from app.deps import rate_limit_org, verify_api_key, check_all_services
26
  from app.tasks.analytics_worker import trigger_kpi_computation
27
  from app.service.vector_service import cleanup_expired_vectors
28
+ from app.routers import health, datasources, reports, flags, scheduler, analytics_stream,ai_query,schema
29
  from app.service.llm_service import load_llm_service
30
  from app.deps import get_qstash_client
31
  from prometheus_client import make_asgi_app
 
422
  app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depends(verify_api_key)])
423
  app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
424
  app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
 
 
425
  app.include_router(analytics_stream.router, dependencies=[Depends(verify_api_key)])
426
  app.include_router(ai_query.router, prefix="/api/v1/ai-query", dependencies=[Depends(verify_api_key)])
427
  app.include_router(schema.router, prefix="/api/v1/schema", dependencies=[Depends(verify_api_key)])
app/mapper.py CHANGED
@@ -22,7 +22,7 @@ import logging
22
  from typing import Dict, Any, Optional
23
 
24
  from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table
25
- from app.hybrid_entity_detector import hybrid_detect_entity_type
26
  from app.core.event_hub import event_hub
27
  from app.deps import get_sre_metrics
28
  from app.routers.health import emit_mapper_log
@@ -428,15 +428,15 @@ def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]:
428
 
429
  def detect_entity():
430
  try:
431
- return hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
432
  except Exception as e:
433
  logger.error(f"[FALLBACK] Entity detection failed: {e}")
434
  return ("UNKNOWN", 0.0, False)
435
 
436
  def detect_industry():
437
  try:
438
- from app.hybrid_industry_detector import hybrid_detect_industry_type
439
- return hybrid_detect_industry_type(org_id, df, source_id)
440
  except Exception as e:
441
  logger.error(f"[FALLBACK] Industry detection failed: {e}")
442
  return ("UNKNOWN", 0.0, False)
@@ -528,8 +528,9 @@ def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
528
  df = pd.DataFrame(parsed)
529
  df.columns = [str(col).lower().strip() for col in df.columns]
530
 
531
- from app.hybrid_industry_detector import hybrid_detect_industry_type
532
- industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
 
533
 
534
  industry_info = {"industry": industry, "confidence": confidence}
535
  logger.info(f"[FALLBACK_IND] βœ… Detected: {industry} ({confidence:.2%})")
 
22
  from typing import Dict, Any, Optional
23
 
24
  from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table
25
+ from app.core.detection_engine import hybrid_detect_entity_type,hybrid_detect_industry_type
26
  from app.core.event_hub import event_hub
27
  from app.deps import get_sre_metrics
28
  from app.routers.health import emit_mapper_log
 
428
 
429
  def detect_entity():
430
  try:
431
+ return hybrid_detect_entity_type(org_id, df, source_id, use_llm=False)
432
  except Exception as e:
433
  logger.error(f"[FALLBACK] Entity detection failed: {e}")
434
  return ("UNKNOWN", 0.0, False)
435
 
436
  def detect_industry():
437
  try:
438
+
439
+ return hybrid_detect_industry_type(org_id, df, source_id, use_llm=False)
440
  except Exception as e:
441
  logger.error(f"[FALLBACK] Industry detection failed: {e}")
442
  return ("UNKNOWN", 0.0, False)
 
528
  df = pd.DataFrame(parsed)
529
  df.columns = [str(col).lower().strip() for col in df.columns]
530
 
531
+ from app.core.detection_engine import hybrid_detect_industry_type
532
+ industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id, use_llm=False)
533
+
534
 
535
  industry_info = {"industry": industry, "confidence": confidence}
536
  logger.info(f"[FALLBACK_IND] βœ… Detected: {industry} ({confidence:.2%})")
app/routers/health.py CHANGED
@@ -32,6 +32,7 @@ from app.tasks.analytics_worker import get_worker_manager
32
  from app.service.vector_service import VectorService
33
  from app.mapper import health_check_mapper, MapperMetrics
34
  from app.core.event_hub import StreamingResponse, Response
 
35
 
36
  # Prometheus aggregation
37
  try:
@@ -52,68 +53,6 @@ except ImportError:
52
  logger = logging.getLogger(__name__)
53
  router = APIRouter(tags=["health"])
54
 
55
- # Global log aggregator (in-memory ring buffer for recent logs)
56
- class LogAggregator:
57
- """Ring buffer storing last 1000 logs from all services"""
58
- def __init__(self, max_size: int = 1000):
59
- self.max_size = max_size
60
- self.buffer: List[Dict[str, Any]] = []
61
- self.lock = threading.Lock()
62
-
63
- def emit(self, service: str, level: str, message: str, **kwargs):
64
- """Add a log entry from any service"""
65
- with self.lock:
66
- entry = {
67
- "timestamp": datetime.utcnow().isoformat(),
68
- "service": service,
69
- "level": level,
70
- "message": message,
71
- **kwargs
72
- }
73
- self.buffer.append(entry)
74
- if len(self.buffer) > self.max_size:
75
- self.buffer.pop(0)
76
-
77
- def get_logs(self, service: Optional[str] = None, level: Optional[str] = None, limit: int = 100) -> List[Dict]:
78
- """Retrieve filtered logs"""
79
- with self.lock:
80
- filtered = [
81
- log for log in self.buffer
82
- if (not service or log["service"] == service)
83
- and (not level or log["level"] == level)
84
- ]
85
- return filtered[-limit:]
86
-
87
- def get_error_rate(self, service: str, window_minutes: int = 5) -> float:
88
- """Calculate error rate for a service"""
89
- cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
90
- recent = [
91
- log for log in self.buffer
92
- if log["service"] == service and log["timestamp"] >= cutoff.isoformat()
93
- ]
94
- if not recent:
95
- return 0.0
96
- errors = [log for log in recent if log["level"] in ("error", "critical")]
97
- return len(errors) / len(recent)
98
-
99
- # Global aggregator instance
100
- log_aggregator = LogAggregator(max_size=1000)
101
-
102
- # Service-specific log emitters (to be imported by each service)
103
- def emit_worker_log(level: str, message: str, **kwargs):
104
- log_aggregator.emit("analytics_worker", level, message, **kwargs)
105
-
106
- def emit_vector_log(level: str, message: str, **kwargs):
107
- log_aggregator.emit("vector_service", level, message, **kwargs)
108
-
109
- def emit_llm_log(level: str, message: str, **kwargs):
110
- log_aggregator.emit("llm_service", level, message, **kwargs)
111
-
112
- def emit_mapper_log(level: str, message: str, **kwargs):
113
- log_aggregator.emit("mapper", level, message, **kwargs)
114
-
115
- def emit_deps_log(level: str, message: str, **kwargs):
116
- log_aggregator.emit("dependencies", level, message, **kwargs)
117
 
118
  # ---------------------- SRE: Unified Health Endpoint ---------------------- #
119
 
 
32
  from app.service.vector_service import VectorService
33
  from app.mapper import health_check_mapper, MapperMetrics
34
  from app.core.event_hub import StreamingResponse, Response
35
+ from app.core.sre_logging import log_aggregator, emit_worker_log, emit_vector_log, emit_llm_log, emit_mapper_log, emit_deps_log
36
 
37
  # Prometheus aggregation
38
  try:
 
53
  logger = logging.getLogger(__name__)
54
  router = APIRouter(tags=["health"])
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  # ---------------------- SRE: Unified Health Endpoint ---------------------- #
58
 
app/routers/socket.py DELETED
@@ -1,54 +0,0 @@
1
- # app/routers/socket.py
2
- import socketio
3
- from fastapi import APIRouter, Depends, Path, Request
4
- from fastapi.responses import PlainTextResponse
5
- from app.deps import verify_api_key # your API-key guard
6
-
7
- # 1️⃣ Socket.IO server
8
- sio = socketio.AsyncServer(
9
- async_mode="asgi",
10
- cors_allowed_origins=[
11
- "https://mut-sync-hub.vercel.app",
12
- "http://localhost:3000",
13
- ],
14
- )
15
-
16
- # 2️⃣ ASGI sub-app (mounted separately in main.py)
17
- socket_app = socketio.ASGIApp(sio)
18
-
19
- # 3️⃣ FastAPI router for REST routes (no prefix β†’ /socket-push)
20
- router = APIRouter(tags=["socket"])
21
-
22
- # ---------- POST /socket-push/{org_id} ----------
23
- @router.post("/socket-push/{org_id}")
24
- async def socket_push(
25
- org_id: str = Path(...),
26
- request: Request = None,
27
- _: str = Depends(verify_api_key),
28
- ):
29
- """
30
- Receive top-N rows from n8n workflow and broadcast them
31
- live to all connected clients in the given org room.
32
- """
33
- payload = await request.json()
34
- rows = payload.get("rows", [])
35
- await sio.emit("datasource:new-rows", {"rows": rows}, room=org_id)
36
- print(f"[socket] πŸ”„ broadcasted {len(rows)} rows β†’ room={org_id}")
37
- return {"status": "ok", "emitted": len(rows)}
38
-
39
- # ---------- Health Check ----------
40
- @router.get("/health")
41
- async def health():
42
- return PlainTextResponse("ok")
43
-
44
- # ---------- Socket.IO Events ----------
45
- @sio.event
46
- async def connect(sid, environ, auth):
47
- org_id = (auth or {}).get("orgId", "demo")
48
- await sio.save_session(sid, {"orgId": org_id})
49
- await sio.enter_room(sid, org_id)
50
- print(f"[socket] βœ… {sid} connected β†’ room={org_id}")
51
-
52
- @sio.event
53
- async def disconnect(sid):
54
- print(f"[socket] ❌ {sid} disconnected")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/service/ai_service.py DELETED
@@ -1,126 +0,0 @@
1
- import json
2
- import logging
3
- from app.deps import get_vector_db
4
- from typing import TYPE_CHECKING # βœ… For forward reference
5
- import time
6
-
7
- if TYPE_CHECKING:
8
- from app.service.llm_service import LocalLLMService # βœ… Type hint only
9
-
10
- logger = logging.getLogger(__name__)
11
-
12
- class AIService:
13
- def __init__(self):
14
- try:
15
- self.vector_db = get_vector_db()
16
- self.vss_available = True
17
- logger.info("βœ… Vector DB initialized")
18
- except Exception as e:
19
- logger.warning(f"⚠️ Vector DB unavailable: {e}")
20
- self.vector_db = None
21
- self.vss_available = False
22
-
23
- self._llm = None # βœ… Lazy initialization
24
- self._embedder = None # βœ… FIXED: Use _embedder (not embedder)
25
- logger.info(f"βœ… AI Service initialized (VSS: {'ENABLED' if self.vss_available else 'DISABLED'})")
26
-
27
- @property
28
- def llm(self) -> "LocalLLMService":
29
- """Lazy property to get LLM service (avoids circular import)"""
30
- if self._llm is None:
31
- from app.service.llm_service import get_llm_service # βœ… Import INSIDE property
32
- self._llm = get_llm_service()
33
- return self._llm
34
-
35
- @property
36
- def embedder(self):
37
- """Lazy property to get embedder"""
38
- if self._embedder is None:
39
- from app.service.embedding_service import embedder # βœ… Import INSIDE property
40
- self._embedder = embedder
41
- return self._embedder
42
-
43
-
44
- def detect_entity_type(self, org_id: str, columns: list[str], filename: str) -> dict:
45
- """Detect entity type with JSON validation and timeout"""
46
- columns_str = ",".join(columns)
47
-
48
- # Check cache
49
- if self.vss_available:
50
- cached = self.vector_db.execute("""
51
- SELECT entity_type FROM vector_store.embeddings
52
- WHERE org_id = ? AND content = ?
53
- ORDER BY created_at DESC LIMIT 1
54
- """, [org_id, columns_str]).fetchone()
55
-
56
- if cached:
57
- logger.info(f"[ai_service] Cache hit: {cached[0]}")
58
- return {"entity_type": cached[0], "confidence": 0.99, "cached": True}
59
-
60
- # βœ… SIMPLE, CLEAR prompt for Phi-3
61
- prompt = f"""You are a data classification assistant.
62
-
63
- You MUST respond with ONLY valid JSON in this exact format:
64
- {{"entity_type":"sales|inventory|customer|product","confidence":0.95}}
65
-
66
- Dataset info:
67
- - Filename: {filename}
68
- - Columns: {columns_str}
69
-
70
- Analyze and respond with ONLY JSON:"""
71
-
72
- logger.info(f"[ai_service] Calling LLM for {org_id}...")
73
-
74
- try:
75
- # βœ… TIMEOUT WRAPPER (5 seconds max)
76
- start_time = time.time()
77
- response_text = self.llm.generate(prompt, max_tokens=50, temperature=0.1)
78
- elapsed = time.time() - start_time
79
-
80
- logger.info(f"[ai_service] LLM responded in {elapsed:.2f}s: {response_text}")
81
-
82
- # βœ… AGGRESSIVE JSON cleaning
83
- response_text = response_text.strip()
84
- if "```json" in response_text:
85
- response_text = response_text.split("```json")[1].split("```")[0].strip()
86
- elif "```" in response_text:
87
- response_text = response_text.split("```")[1].split("```")[0].strip()
88
-
89
- # βœ… Remove any stray text before/after JSON
90
- if "{" in response_text and "}" in response_text:
91
- response_text = response_text[response_text.find("{"):response_text.rfind("}")+1]
92
-
93
- logger.info(f"[ai_service] Cleaned response: {response_text}")
94
-
95
- # βœ… PARSE with error handling
96
- result = json.loads(response_text)
97
-
98
- # βœ… Normalize
99
- result["entity_type"] = result["entity_type"].upper()
100
- result["confidence"] = float(result["confidence"])
101
-
102
- # βœ… Cache it
103
- if self.vss_available:
104
- try:
105
- embedding = self.embedder.generate(columns_str)
106
- self.vector_db.execute("""
107
- INSERT INTO vector_store.embeddings (org_id, content, embedding, entity_type)
108
- VALUES (?, ?, ?, ?)
109
- """, [org_id, columns_str, embedding, result["entity_type"]])
110
- logger.info(f"[ai_service] Cached for {org_id}")
111
- except Exception as e:
112
- logger.warning(f"[ai_service] Cache insert failed: {e}")
113
-
114
- return result
115
-
116
- except Exception as e:
117
- logger.error(f"[ai_service] ❌ Detection failed: {e}", exc_info=True)
118
- # βœ… SAFE FALLBACK (never crash)
119
- return {
120
- "entity_type": "SALES",
121
- "confidence": 0.50,
122
- "error": str(e),
123
- "fallback": True
124
- }
125
-
126
- ai_service = AIService()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/service/llm_service.py CHANGED
@@ -25,7 +25,7 @@ from typing import Optional, Dict, Any, List, Callable
25
  from dataclasses import dataclass, asdict
26
  import psutil # For resource monitoring
27
  from fastapi import HTTPException
28
- from app.routers.health import emit_llm_log
29
  # Prometheus metrics (free tier compatible)
30
  try:
31
  from prometheus_client import Counter, Histogram, Gauge
 
25
  from dataclasses import dataclass, asdict
26
  import psutil # For resource monitoring
27
  from fastapi import HTTPException
28
+ from app.core.sre_logging import emit_llm_log
29
  # Prometheus metrics (free tier compatible)
30
  try:
31
  from prometheus_client import Counter, Histogram, Gauge
app/service/vector_service.py CHANGED
@@ -11,7 +11,7 @@ from sentence_transformers import SentenceTransformer
11
  import logging
12
  from datetime import datetime, timedelta
13
  from enum import Enum
14
- from app.routers.health import emit_vector_log
15
  logger = logging.getLogger(__name__)
16
 
17
 
 
11
  import logging
12
  from datetime import datetime, timedelta
13
  from enum import Enum
14
+ from app.core.sre_logging import emit_vector_log
15
  logger = logging.getLogger(__name__)
16
 
17
 
app/tasks/analytics_worker.py CHANGED
@@ -26,7 +26,7 @@ from app.schemas.org_schema import OrgSchema
26
  from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics
27
  from app.engine.kpi_calculators.registry import get_kpi_calculator_async
28
  from app.service.embedding_service import EmbeddingService
29
- from app.routers.health import emit_worker_log
30
 
31
  # Configure structured logging for SRE tools (Loki, etc.)
32
  logging.basicConfig(
 
26
  from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics
27
  from app.engine.kpi_calculators.registry import get_kpi_calculator_async
28
  from app.service.embedding_service import EmbeddingService
29
+ from app.core.sre_logging import emit_worker_log
30
 
31
  # Configure structured logging for SRE tools (Loki, etc.)
32
  logging.basicConfig(
app/tasks/worker.py CHANGED
@@ -1,263 +1,397 @@
1
- # app/tasks/worker.py – ENTERPRISE GRADE (WITH ENTITY DETECTION)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  import json
3
  import time
 
4
  import signal
5
  import sys
6
  import traceback
7
  from typing import Dict, Any, Callable
8
- import pandas as pd # βœ… Required for entity detection
 
9
 
10
  from app.core.event_hub import event_hub
11
- from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
  from app.hybrid_entity_detector import hybrid_detect_entity_type, hybrid_detect_industry_type
 
14
 
15
- # ── Graceful Shutdown ──────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  def shutdown(signum, frame):
17
- print("\nπŸ›‘ Worker shutting down gracefully...")
18
  sys.exit(0)
19
 
20
  signal.signal(signal.SIGINT, shutdown)
21
  signal.signal(signal.SIGTERM, shutdown)
22
 
 
23
 
24
-
25
- # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
26
- def process_detect_entity(org_id: str, **args):
27
- """🎯 Queries DuckDB for raw data instead of receiving payload"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  source_id = args["source_id"]
 
29
 
30
- print(f"πŸ”΅ [{org_id}] Entity detection STARTED for {source_id}")
 
31
 
32
  try:
33
- # 1. βœ… Query DuckDB for latest raw rows (the ones just uploaded)
34
  conn = get_duckdb(org_id)
35
  rows = conn.execute("""
36
- SELECT row_data
37
  FROM main.raw_rows
38
  WHERE row_data IS NOT NULL
39
  USING SAMPLE 40
40
  """).fetchall()
41
-
42
 
43
  if not rows:
44
  raise RuntimeError(f"No raw data found for {source_id}")
45
 
46
- # 2. Parse into DataFrame
47
  parsed = [json.loads(r[0]) for r in rows if r[0]]
48
  df = pd.DataFrame(parsed)
49
- print(f" πŸ“Š DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
50
 
51
- # 3. Detect entity (rest unchanged)
52
- entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
53
- print(f" βœ… Detected: {entity_type} ({confidence:.2%})")
54
 
55
- # 3. Store in Redis for mapper to poll (HF endpoint is waiting for this)
56
  entity_key = f"entity:{org_id}:{source_id}"
57
- event_hub.setex(
58
- entity_key,
59
- 3600, # 1 hour TTL (gives mapper plenty of time)
60
- json.dumps({
61
- "entity_type": entity_type,
62
- "confidence": confidence,
63
- "detected_at": time.time(),
64
- "source_id": source_id
65
- })
66
- )
67
- print(f" πŸ’Ύ Stored in Redis: {entity_key}")
68
 
69
- # 4. Publish event for any real-time subscribers (future-proofing)
70
  event_hub.publish(
71
  f"entity_ready:{org_id}",
72
  json.dumps({
73
  "source_id": source_id,
74
  "entity_type": entity_type,
75
- "confidence": confidence
 
76
  })
77
  )
78
- print(f" πŸ“€ Published to entity_ready:{org_id}")
79
 
80
- # 5. Return result to satisfy worker's response contract
 
 
 
 
 
 
81
  return {
82
  "entity_type": entity_type,
83
  "confidence": confidence,
84
  "source_id": source_id,
85
- "status": "stored_in_redis"
 
 
86
  }
87
 
88
  except Exception as e:
89
- print(f"❌ Entity detection failed: {e}")
90
- # CRITICAL: Re-raise so process_task logs it properly
91
- raise RuntimeError(f"Entity detection failed for {source_id}: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
- def process_detect_industry(org_id: str, **args):
94
  """
95
- 🎯 DETECTS INDUSTRY (business vertical) only.
96
- DOES NOT touch entity detection.
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  """
 
98
  source_id = args["source_id"]
 
99
 
100
- print(f"πŸ”΄πŸ”΄πŸ”΄ [WORKER] INDUSTRY detection for {org_id}/{source_id}")
 
101
 
102
  try:
103
- # Query raw data
104
  conn = get_duckdb(org_id)
105
- rows = conn.execute("SELECT row_data FROM main.raw_rows LIMIT 100").fetchall()
 
 
 
 
 
106
 
107
  if not rows:
108
- raise RuntimeError("No raw data")
109
 
 
110
  parsed = [json.loads(r[0]) for r in rows if r[0]]
111
  df = pd.DataFrame(parsed)
 
112
 
113
- # βœ… Use NEW detector (decoupled from entity)
114
- industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
 
115
 
116
- # Write to dedicated Redis key
117
- event_hub.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps({
 
118
  "industry": industry,
119
- "confidence": confidence
120
- }))
 
 
 
121
 
122
- print(f"βœ… [WORKER] INDUSTRY written: {industry} ({confidence:.2%})")
 
 
123
 
124
- # Auto-queue entity detection (separate task, independent)
 
 
 
 
 
 
 
 
 
 
 
 
125
  entity_task = {
126
  "id": f"detect_entity:{org_id}:{source_id}:{int(time.time())}",
127
  "function": "detect_entity",
128
  "args": {"org_id": org_id, "source_id": source_id}
129
  }
130
  event_hub.lpush("python:task_queue", json.dumps(entity_task))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
  except Exception as e:
133
- print(f"❌ [WORKER] Industry detection CRASHED: {e}")
 
 
 
 
 
 
 
 
 
134
  event_hub.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps({
135
  "industry": "UNKNOWN",
136
- "confidence": 0.0
 
 
 
137
  }))
138
- raise
139
-
140
- # ── Legacy Handlers (Keep for backward compatibility) ────────────────────────
141
- def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
142
- """⚠️ DEPRECATED: Remove once all ingestion uses detect_entity worker"""
143
- from app.mapper import canonify_df
144
- return canonify_df(org_id, filename, hours_window)
145
 
146
- def execute_org_sql(org_id: str, sql: str):
147
- """Execute SQL for specific org with enterprise guardrails"""
148
- conn = get_duckdb(org_id)
149
-
150
- # πŸ”’ Security: Whitelist only SELECT queries
151
- safe_sql = sql.strip().upper()
152
- if not safe_sql.startswith("SELECT"):
153
- raise ValueError("πŸ”’ Only SELECT queries allowed")
154
-
155
- # πŸ’‘ Safety: Auto-limit to prevent memory overload
156
- if "LIMIT" not in safe_sql:
157
- safe_sql += " LIMIT 10000"
158
-
159
- return conn.execute(safe_sql).fetchall()
160
 
161
- # ── Task Handler Registry ─────────────────────────────────────────────────────
162
- # ⚠️ ORDER MATTERS: Add new handlers at the top for visibility
163
  TASK_HANDLERS: Dict[str, Callable] = {
164
- "detect_entity": process_detect_entity, # 🎯 NEW: Ingestion-critical path
165
-
166
- # Legacy AI handlers (keep until fully migrated)
167
- "detect_entity_type": lambda org_id, **args: ai_service.detect_entity_type(org_id, **args),
168
- "generate_sql": lambda org_id, **args: ai_service.generate_sql(org_id, **args),
169
- "generate_insights": lambda org_id, **args: ai_service.generate_insights(org_id, **args),
170
- "similarity_search": lambda org_id, **args: ai_service.similarity_search(org_id, **args),
171
-
172
- # Legacy mapper handlers (to be deprecated)
173
- "canonify_df": canonify_df_with_entity,
174
- "execute_sql": execute_org_sql,
175
  }
176
 
177
- # ── Task Processing (UNCHANGED – BATTLE TESTED) ───────────────────────────────
178
- def process_task(task_data: Dict[str, Any]):
179
- """Process single task with full observability and error isolation"""
180
- task_id = task_data.get("id")
 
 
 
 
 
 
 
181
  function_name = task_data.get("function")
182
  args = task_data.get("args", {})
183
 
184
- # ── Validation ─────────────────────────────────────────────────────────────
185
- if not task_id or not function_name:
186
- raise ValueError("❌ Invalid task: missing id or function")
187
-
188
- if "org_id" not in args:
189
- raise ValueError(f"❌ Task {task_id} missing required org_id")
190
-
191
- org_id = args["org_id"]
192
 
193
- # ── Execution ──────────────────────────────────────────────────────────────
194
- start_time = time.time()
195
- print(f"πŸ”΅ [{org_id}] Processing {function_name} (task: {task_id})")
196
 
197
  try:
198
  handler = TASK_HANDLERS.get(function_name)
199
  if not handler:
200
- raise ValueError(f"❌ Unknown function: {function_name}")
201
 
202
  # Execute handler
203
  result = handler(org_id, **args)
204
 
205
- # ── Success ────────────────────────────────────────────────────────────
206
  duration = time.time() - start_time
207
- print(f"βœ… [{org_id}] {function_name} completed in {duration:.2f}s")
208
 
209
- event_hub.setex(
210
- f"python:response:{task_id}",
211
- 3600,
212
- json.dumps({
213
- "status": "success",
214
- "org_id": org_id,
215
- "function": function_name,
216
- "data": result,
217
- "duration": duration
218
- })
219
- )
 
 
220
 
221
  except Exception as e:
222
- # ── Error ──────────────────────────────────────────────────────────────
223
  duration = time.time() - start_time
224
- error_msg = f"{type(e).__name__}: {str(e)}"
225
- print(f"❌ [{org_id}] {function_name} FAILED after {duration:.2f}s: {error_msg}")
226
- print(traceback.format_exc()) # Full trace for debugging
227
 
228
- event_hub.setex(
229
- f"python:response:{task_id}",
230
- 3600,
231
- json.dumps({
232
- "status": "error",
233
- "org_id": org_id,
234
- "function": function_name,
235
- "message": error_msg,
236
- "duration": duration
237
- })
238
- )
 
 
 
 
 
 
 
239
 
240
- # ── Main Worker Loop (UNCHANGED – HANDLES MILLIONS OF TASKS) ──────────────────
241
  if __name__ == "__main__":
242
- print("πŸš€ Python worker listening on Redis queue...")
243
- print("Press Ctrl+C to stop")
244
 
245
  while True:
246
  try:
247
  # Blocking pop (0 = infinite wait, no CPU burn)
248
- _, task_json = event_hub.brpop("python:task_queue", timeout=0)
249
 
250
- try:
251
- task_data = json.loads(task_json)
252
- process_task(task_data)
253
- except json.JSONDecodeError as e:
254
- print(f"❌ Malformed task JSON: {e}")
255
- continue
 
 
256
 
257
  except KeyboardInterrupt:
258
- print("\nShutting down...")
259
  break
260
  except Exception as e:
261
- print(f"πŸ”΄ WORKER-LEVEL ERROR (will restart): {e}")
262
  traceback.print_exc()
263
  time.sleep(5) # Cooldown before retry
 
1
+ """
2
+ Worker v5.0: Pure LLM Detection Engine
3
+
4
+ Purpose: Detect entity_type and industry using Phi-3 LLM
5
+ - Queries DuckDB raw_rows for fresh data
6
+ - Runs hybrid detection (LLM + rules)
7
+ - Stores results in Redis for mapper to poll
8
+ - Publishes pub/sub events for real-time subscribers
9
+ - Zero legacy handlers, zero bloat
10
+
11
+ SRE Features:
12
+ - Structured JSON logging
13
+ - Prometheus metrics per detection type
14
+ - Circuit breaker for Redis failures
15
+ - Request/response tracking with task_id
16
+ - Error isolation and fallback to UNKNOWN
17
+ """
18
+
19
  import json
20
  import time
21
+ import logging
22
  import signal
23
  import sys
24
  import traceback
25
  from typing import Dict, Any, Callable
26
+ import pandas as pd
27
+ import datetime
28
 
29
  from app.core.event_hub import event_hub
 
30
  from app.deps import get_duckdb
31
  from app.hybrid_entity_detector import hybrid_detect_entity_type, hybrid_detect_industry_type
32
+ from app.core.sre_logging import emit_worker_log
33
 
34
+ # ── SRE: Prometheus Metrics ─────────────────────────────────────────────────────
35
+ try:
36
+ from prometheus_client import Counter, Histogram
37
+ detection_latency = Histogram(
38
+ 'worker_detection_duration_seconds',
39
+ 'Time to detect entity/industry',
40
+ ['detection_type', 'org_id']
41
+ )
42
+ detection_errors = Counter(
43
+ 'worker_detection_errors_total',
44
+ 'Total detection failures',
45
+ ['detection_type', 'org_id', 'error_type']
46
+ )
47
+ except ImportError:
48
+ detection_latency = None
49
+ detection_errors = None
50
+
51
+ # ── Logging Setup ───────────────────────────────────────────────────────────────
52
+ logging.basicConfig(
53
+ level=logging.INFO,
54
+ format='%(asctime)s | [%(levelname)s] [%(name)s] %(message)s'
55
+ )
56
+ logger = logging.getLogger(__name__)
57
+
58
+ # ── Graceful Shutdown ───────────────────────────────────────────────────────────
59
  def shutdown(signum, frame):
60
+ logger.info("πŸ›‘ Worker shutting down gracefully...")
61
  sys.exit(0)
62
 
63
  signal.signal(signal.SIGINT, shutdown)
64
  signal.signal(signal.SIGTERM, shutdown)
65
 
66
+ # ── CORE: LLM-Based Detection Handlers ──────────────────────────────────────────
67
 
68
+ def process_detect_entity(org_id: str, **args) -> Dict[str, Any]:
69
+ """
70
+ 🎯 MAIN: Detect entity_type using LLM queries to DuckDB
71
+
72
+ Flow:
73
+ 1. Query latest raw rows from DuckDB
74
+ 2. Run hybrid LLM detection (Phi-3 + rules)
75
+ 3. Store result in Redis (mapper polls this)
76
+ 4. Publish pub/sub event for real-time subscribers
77
+ 5. Return structured result
78
+
79
+ Args:
80
+ org_id: Organization ID
81
+ source_id: From args["source_id"]
82
+
83
+ Returns:
84
+ {"entity_type": str, "confidence": float, "source_id": str, "status": str}
85
+ """
86
+ start_time = time.time()
87
  source_id = args["source_id"]
88
+ task_id = args.get("task_id", "unknown")
89
 
90
+ emit_worker_log("info", "Entity detection started",
91
+ org_id=org_id, source_id=source_id, task_id=task_id)
92
 
93
  try:
94
+ # 1. Query DuckDB for raw data (the data just uploaded)
95
  conn = get_duckdb(org_id)
96
  rows = conn.execute("""
97
+ SELECT row_data
98
  FROM main.raw_rows
99
  WHERE row_data IS NOT NULL
100
  USING SAMPLE 40
101
  """).fetchall()
 
102
 
103
  if not rows:
104
  raise RuntimeError(f"No raw data found for {source_id}")
105
 
106
+ # 2. Parse to DataFrame for LLM detection
107
  parsed = [json.loads(r[0]) for r in rows if r[0]]
108
  df = pd.DataFrame(parsed)
109
+ logger.info(f"[WORKER] πŸ“Š Entity detection DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
110
 
111
+ # 3. Run hybrid LLM detection (Phi-3 + rules)
112
+ entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, source_id, use_llm=True)
113
+ logger.info(f"[WORKER] βœ… Entity detected: {entity_type} ({confidence:.2%})")
114
 
115
+ # 4. Store in Redis (mapper's poll_for_entity() reads this)
116
  entity_key = f"entity:{org_id}:{source_id}"
117
+ entity_data = {
118
+ "entity_type": entity_type,
119
+ "confidence": confidence,
120
+ "detected_at": time.time(),
121
+ "source_id": source_id,
122
+ "detected_by": "llm-worker"
123
+ }
124
+
125
+ event_hub.setex(entity_key, 3600, json.dumps(entity_data))
126
+ emit_worker_log("info", "Entity stored in Redis",
127
+ org_id=org_id, source_id=source_id, entity_type=entity_type)
128
 
129
+ # 5. Publish pub/sub event for real-time subscribers
130
  event_hub.publish(
131
  f"entity_ready:{org_id}",
132
  json.dumps({
133
  "source_id": source_id,
134
  "entity_type": entity_type,
135
+ "confidence": confidence,
136
+ "timestamp": datetime.utcnow().isoformat()
137
  })
138
  )
139
+ emit_worker_log("debug", "Pub/sub event published", channel=f"entity_ready:{org_id}")
140
 
141
+ # 6. SRE: Record metrics
142
+ if detection_latency:
143
+ detection_latency.labels(detection_type="entity", org_id=org_id).observe(
144
+ (time.time() - start_time)
145
+ )
146
+
147
+ # 7. Return structured result
148
  return {
149
  "entity_type": entity_type,
150
  "confidence": confidence,
151
  "source_id": source_id,
152
+ "status": "stored_in_redis",
153
+ "task_id": task_id,
154
+ "duration_ms": round((time.time() - start_time) * 1000, 2)
155
  }
156
 
157
  except Exception as e:
158
+ error_msg = f"Entity detection failed for {source_id}: {str(e)}"
159
+ logger.error(f"[WORKER] {error_msg}")
160
+
161
+ # SRE: Record error
162
+ if detection_errors:
163
+ detection_errors.labels(detection_type="entity", org_id=org_id, error_type=type(e).__name__).inc()
164
+
165
+ emit_worker_log("error", "Entity detection failed",
166
+ org_id=org_id, source_id=source_id, error=error_msg)
167
+
168
+ # Fallback: Store UNKNOWN to unblock mapper
169
+ event_hub.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps({
170
+ "entity_type": "UNKNOWN",
171
+ "confidence": 0.0,
172
+ "detected_at": time.time(),
173
+ "source_id": source_id,
174
+ "error": error_msg
175
+ }))
176
+
177
+ raise RuntimeError(error_msg)
178
 
179
+ def process_detect_industry(org_id: str, **args) -> Dict[str, Any]:
180
  """
181
+ 🎯 MAIN: Detect industry vertical using LLM
182
+
183
+ Flow:
184
+ 1. Query DuckDB raw rows
185
+ 2. Run hybrid LLM detection
186
+ 3. Store result in Redis
187
+ 4. Publish pub/sub event
188
+ 5. Also triggers entity detection (independent task)
189
+
190
+ Args:
191
+ org_id: Organization ID
192
+ source_id: From args["source_id"]
193
+
194
+ Returns:
195
+ {"industry": str, "confidence": float, "source_id": str, "status": str}
196
  """
197
+ start_time = time.time()
198
  source_id = args["source_id"]
199
+ task_id = args.get("task_id", "unknown")
200
 
201
+ emit_worker_log("info", "Industry detection started",
202
+ org_id=org_id, source_id=source_id, task_id=task_id)
203
 
204
  try:
205
+ # 1. Query DuckDB
206
  conn = get_duckdb(org_id)
207
+ rows = conn.execute("""
208
+ SELECT row_data
209
+ FROM main.raw_rows
210
+ WHERE row_data IS NOT NULL
211
+ USING SAMPLE 40
212
+ """).fetchall()
213
 
214
  if not rows:
215
+ raise RuntimeError(f"No raw data found for {source_id}")
216
 
217
+ # 2. Parse DataFrame
218
  parsed = [json.loads(r[0]) for r in rows if r[0]]
219
  df = pd.DataFrame(parsed)
220
+ logger.info(f"[WORKER] πŸ“Š Industry detection DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
221
 
222
+ # 3. Run hybrid LLM detection
223
+ industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id, use_llm=True)
224
+ logger.info(f"[WORKER] βœ… Industry detected: {industry} ({confidence:.2%})")
225
 
226
+ # 4. Store in Redis
227
+ industry_key = f"industry:{org_id}:{source_id}"
228
+ industry_data = {
229
  "industry": industry,
230
+ "confidence": confidence,
231
+ "detected_at": time.time(),
232
+ "source_id": source_id,
233
+ "detected_by": "llm-worker"
234
+ }
235
 
236
+ event_hub.setex(industry_key, 3600, json.dumps(industry_data))
237
+ emit_worker_log("info", "Industry stored in Redis",
238
+ org_id=org_id, source_id=source_id, industry=industry)
239
 
240
+ # 5. Publish pub/sub event
241
+ event_hub.publish(
242
+ f"industry_ready:{org_id}",
243
+ json.dumps({
244
+ "source_id": source_id,
245
+ "industry": industry,
246
+ "confidence": confidence,
247
+ "timestamp": datetime.utcnow().isoformat()
248
+ })
249
+ )
250
+
251
+ # 6. Auto-trigger entity detection (independent task)
252
+ # This ensures both entity and industry are eventually detected
253
  entity_task = {
254
  "id": f"detect_entity:{org_id}:{source_id}:{int(time.time())}",
255
  "function": "detect_entity",
256
  "args": {"org_id": org_id, "source_id": source_id}
257
  }
258
  event_hub.lpush("python:task_queue", json.dumps(entity_task))
259
+ emit_worker_log("debug", "Auto-triggered entity detection",
260
+ org_id=org_id, source_id=source_id)
261
+
262
+ # 7. SRE: Record metrics
263
+ if detection_latency:
264
+ detection_latency.labels(detection_type="industry", org_id=org_id).observe(
265
+ (time.time() - start_time)
266
+ )
267
+
268
+ return {
269
+ "industry": industry,
270
+ "confidence": confidence,
271
+ "source_id": source_id,
272
+ "status": "stored_in_redis",
273
+ "task_id": task_id,
274
+ "duration_ms": round((time.time() - start_time) * 1000, 2)
275
+ }
276
 
277
  except Exception as e:
278
+ error_msg = f"Industry detection failed for {source_id}: {str(e)}"
279
+ logger.error(f"[WORKER] {error_msg}")
280
+
281
+ if detection_errors:
282
+ detection_errors.labels(detection_type="industry", org_id=org_id, error_type=type(e).__name__).inc()
283
+
284
+ emit_worker_log("error", "Industry detection failed",
285
+ org_id=org_id, source_id=source_id, error=error_msg)
286
+
287
+ # Fallback: Store UNKNOWN
288
  event_hub.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps({
289
  "industry": "UNKNOWN",
290
+ "confidence": 0.0,
291
+ "detected_at": time.time(),
292
+ "source_id": source_id,
293
+ "error": error_msg
294
  }))
295
+
296
+ raise RuntimeError(error_msg)
 
 
 
 
 
297
 
298
+ # ── Task Registry (CLEAN – Only LLM Detection) ──────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
299
 
 
 
300
  TASK_HANDLERS: Dict[str, Callable] = {
301
+ "detect_entity": process_detect_entity, # 🎯 LLM entity detection
302
+ "detect_industry": process_detect_industry, # 🎯 LLM industry detection
303
+ # βœ… All legacy handlers removed – mapper handles the rest via polling
 
 
 
 
 
 
 
 
304
  }
305
 
306
+ # ── Task Processing (SIMPLIFIED – No Legacy) ────────────────────────────────────
307
+
308
+ def process_task(task_data: Dict[str, Any]) -> None:
309
+ """
310
+ Process single detection task with SRE observability
311
+
312
+ Args:
313
+ task_data: {"id": str, "function": str, "args": dict}
314
+ """
315
+ start_time = time.time()
316
+ task_id = task_data.get("id", "unknown")
317
  function_name = task_data.get("function")
318
  args = task_data.get("args", {})
319
 
320
+ org_id = args.get("org_id", "unknown")
321
+ source_id = args.get("source_id", "unknown")
 
 
 
 
 
 
322
 
323
+ emit_worker_log("info", "Task processing started",
324
+ task_id=task_id, function=function_name, org_id=org_id, source_id=source_id)
 
325
 
326
  try:
327
  handler = TASK_HANDLERS.get(function_name)
328
  if not handler:
329
+ raise ValueError(f"Unknown detection function: {function_name}")
330
 
331
  # Execute handler
332
  result = handler(org_id, **args)
333
 
 
334
  duration = time.time() - start_time
 
335
 
336
+ # Store success response
337
+ response_key = f"python:response:{task_id}"
338
+ event_hub.setex(response_key, 3600, json.dumps({
339
+ "status": "success",
340
+ "function": function_name,
341
+ "org_id": org_id,
342
+ "data": result,
343
+ "duration": duration
344
+ }))
345
+
346
+ emit_worker_log("info", "Task completed",
347
+ task_id=task_id, function=function_name,
348
+ duration_ms=round(duration * 1000, 2))
349
 
350
  except Exception as e:
 
351
  duration = time.time() - start_time
352
+ error_type = type(e).__name__
 
 
353
 
354
+ # Store error response
355
+ response_key = f"python:response:{task_id}"
356
+ event_hub.setex(response_key, 3600, json.dumps({
357
+ "status": "error",
358
+ "function": function_name,
359
+ "org_id": org_id,
360
+ "message": str(e),
361
+ "duration": duration
362
+ }))
363
+
364
+ emit_worker_log("error", "Task failed",
365
+ task_id=task_id, function=function_name,
366
+ error=str(e), error_type=error_type)
367
+
368
+ # Re-raise to let caller know
369
+ raise
370
+
371
+ # ── Main Worker Loop (UNCHANGED – BATTLE TESTED) ───────────────────────────────
372
 
 
373
  if __name__ == "__main__":
374
+ logger.info("πŸš€ Python detection worker listening on Redis queue...")
375
+ logger.info("Press Ctrl+C to stop")
376
 
377
  while True:
378
  try:
379
  # Blocking pop (0 = infinite wait, no CPU burn)
380
+ result = event_hub.brpop("python:task_queue", timeout=0)
381
 
382
+ if result:
383
+ _, task_json = result
384
+ try:
385
+ task_data = json.loads(task_json)
386
+ process_task(task_data)
387
+ except json.JSONDecodeError as e:
388
+ logger.error(f"Malformed task JSON: {e}")
389
+ continue
390
 
391
  except KeyboardInterrupt:
392
+ logger.info("Shutting down...")
393
  break
394
  except Exception as e:
395
+ logger.error(f"πŸ”΄ WORKER-LEVEL ERROR (will restart): {e}")
396
  traceback.print_exc()
397
  time.sleep(5) # Cooldown before retry