shaliz-kong commited on
Commit
bcb1320
Β·
1 Parent(s): eb81ac1

-check canonify triger timing-review worker startup and wait logic

Browse files
.gitattributes CHANGED
@@ -1 +1,5 @@
1
- *.duckdb filter=lfs diff=lfs merge=lfs -text
 
 
 
 
 
1
+ # Do not LFS large runtime DBs; keep templates if needed
2
+ *.duckdb -filter -merge -diff -text
3
+
4
+ # If you want templates/fixtures to remain tracked, add an override
5
+ # templates/*.duckdb filter=lfs diff=lfs merge=lfs -text
app/core/event_hub.py CHANGED
@@ -19,7 +19,11 @@ class EventHub:
19
  return self.redis.get(key)
20
 
21
  def setex(self, key: str, ttl: int, value: str):
22
- return self.redis.setex(key, ttl, value)
 
 
 
 
23
 
24
  def exists(self, key: str) -> bool:
25
  return self.redis.exists(key)
@@ -147,6 +151,17 @@ class EventHub:
147
  def keys(self, pattern: str):
148
  return self.redis.keys(pattern)
149
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  # Singleton
152
  event_hub = EventHub()
 
19
  return self.redis.get(key)
20
 
21
  def setex(self, key: str, ttl: int, value: str):
22
+ try:
23
+ return self.redis.setex(key, ttl, value)
24
+ except Exception as e:
25
+ logger.error(f"[hub] ❌ setex failed for {key}: {e}", exc_info=True)
26
+ raise
27
 
28
  def exists(self, key: str) -> bool:
29
  return self.redis.exists(key)
 
151
  def keys(self, pattern: str):
152
  return self.redis.keys(pattern)
153
 
154
+ def pipeline(self):
155
+ """Return a redis pipeline-like object if supported by client.
156
+
157
+ Note: Upstash client may not support classic pipelines; callers should
158
+ handle attribute errors and fall back to sequential commands.
159
+ """
160
+ try:
161
+ return self.redis.pipeline()
162
+ except Exception:
163
+ return None
164
+
165
 
166
  # Singleton
167
  event_hub = EventHub()
app/core/worker_manager.py CHANGED
@@ -26,69 +26,65 @@ class WorkerManager:
26
  self.consecutive_empty_polls = 0
27
 
28
  async def start_listener(self):
29
- """🎧 Adaptive polling: fast when busy, slow when idle"""
30
- logger.info(
31
- f"[manager] 🎧 listening (active: {self.poll_interval}s, idle: {self.idle_poll_interval}s)"
32
- )
33
-
 
 
 
 
 
 
 
 
 
 
 
 
34
  while True:
35
  try:
36
- # Poll Redis stream (non-blocking)
37
- result = redis.xread({self.stream_key: self.last_id}, count=10)
38
-
39
- # Check if we got messages
 
 
 
 
40
  has_messages = bool(result and isinstance(result, dict) and result.get(self.stream_key))
41
-
42
- if has_messages:
43
- # Reset idle counter
44
- self.consecutive_empty_polls = 0
45
-
46
- messages = result[self.stream_key]
47
- logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
48
-
49
- for msg_id, msg_data in messages:
50
- try:
51
- payload = json.loads(msg_data.get("message", "{}"))
52
- org_id = payload.get("org_id")
53
- source_id = payload.get("source_id")
54
-
55
- if org_id and source_id:
56
- logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
57
- await self.spawn_worker(org_id, source_id)
58
- self.last_id = msg_id
59
- else:
60
- logger.warning(f"[manager] ⚠️ missing IDs: {payload}")
61
-
62
- except json.JSONDecodeError as e:
63
- logger.error(f"[manager] ❌ JSON error: {e}")
64
- except Exception as e:
65
- logger.error(f"[manager] ❌ message processing error: {e}", exc_info=True)
66
-
67
- # Brief pause after batch, then continue polling fast
68
- await asyncio.sleep(0.1)
69
- continue # Skip to next iteration (stay in active mode)
70
-
71
- # No messages found
72
- self.consecutive_empty_polls += 1
73
-
74
- # Choose sleep duration based on idle state
75
- if self.consecutive_empty_polls > self.idle_threshold:
76
- delay = self.idle_poll_interval
77
- logger.debug(
78
- f"[manager] πŸ’€ idle mode, sleeping {delay}s (empty polls: {self.consecutive_empty_polls})"
79
- )
80
- else:
81
- delay = self.poll_interval
82
- logger.debug(
83
- f"[manager] πŸ’€ active mode, sleeping {delay}s (empty polls: {self.consecutive_empty_polls})"
84
- )
85
-
86
- await asyncio.sleep(delay)
87
-
88
  except Exception as e:
89
- logger.error(f"[manager] ❌ polling error: {e}", exc_info=True)
90
- await asyncio.sleep(5) # Back off on error
91
- self.consecutive_empty_polls = 0 # Reset after error
92
 
93
  async def spawn_worker(self, org_id: str, source_id: str):
94
  """Spawn worker with distributed lock"""
 
26
  self.consecutive_empty_polls = 0
27
 
28
  async def start_listener(self):
29
+ """🎧 Stream-based listener: block on Redis stream reads to avoid polling
30
+
31
+ NOTE: The original adaptive polling loop is left commented below for
32
+ reference. This implementation uses blocking XREAD (if supported) so
33
+ workers are only spawned when a readiness message is available.
34
+ """
35
+ logger.info(f"[manager] 🎧 stream listener (blocking read) on {self.stream_key}")
36
+
37
+ # --- Original polling loop (commented) ---
38
+ # (kept for reference during refactor)
39
+ # logger.info(
40
+ # f"[manager] 🎧 listening (active: {self.poll_interval}s, idle: {self.idle_poll_interval}s)"
41
+ # )
42
+ # while True:
43
+ # ...
44
+
45
+ # New: blocking xread loop
46
  while True:
47
  try:
48
+ try:
49
+ # Block for up to 5s waiting for new messages; fall back
50
+ # to non-blocking if 'block' not supported by client
51
+ result = redis.xread({self.stream_key: self.last_id}, count=10, block=5000)
52
+ except TypeError:
53
+ # Client may not accept block kwarg; try without block
54
+ result = redis.xread({self.stream_key: self.last_id}, count=10)
55
+
56
  has_messages = bool(result and isinstance(result, dict) and result.get(self.stream_key))
57
+
58
+ if not has_messages:
59
+ # No messages within block window; continue (loop will block again)
60
+ continue
61
+
62
+ # We have messages β€” process them
63
+ self.consecutive_empty_polls = 0
64
+ messages = result[self.stream_key]
65
+ logger.info(f"[manager] πŸ“₯ received {len(messages)} messages")
66
+
67
+ for msg_id, msg_data in messages:
68
+ try:
69
+ payload = json.loads(msg_data.get("message", "{}"))
70
+ org_id = payload.get("org_id")
71
+ source_id = payload.get("source_id")
72
+
73
+ if org_id and source_id:
74
+ logger.info(f"[manager] πŸš€ processing {org_id}:{source_id}")
75
+ await self.spawn_worker(org_id, source_id)
76
+ self.last_id = msg_id
77
+ else:
78
+ logger.warning(f"[manager] ⚠️ missing IDs: {payload}")
79
+
80
+ except json.JSONDecodeError as e:
81
+ logger.error(f"[manager] ❌ JSON error: {e}")
82
+ except Exception as e:
83
+ logger.error(f"[manager] ❌ message processing error: {e}", exc_info=True)
84
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  except Exception as e:
86
+ logger.error(f"[manager] ❌ streaming error: {e}", exc_info=True)
87
+ await asyncio.sleep(2)
 
88
 
89
  async def spawn_worker(self, org_id: str, source_id: str):
90
  """Spawn worker with distributed lock"""
app/mapper.py CHANGED
@@ -119,59 +119,130 @@ def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
119
 
120
  # 5. Emergency fallback (worker is dead)
121
  print("[poll] ⚠️ Both attempts failed - using direct detection")
122
- entity_info = _fallback_detection(org_id, source_id)
123
-
124
- # 🎯 NEW: Force write to Redis (ensure it's there)
125
- event_hub.setex(
126
- f"entity:{org_id}:{source_id}",
127
- 3600,
128
- json.dumps(entity_info)
129
- )
130
-
131
- # 🎯 NEW: Clear stale cache so next read is fresh
132
- if (org_id, source_id) in _ENTITY_CACHE:
133
- del _ENTITY_CACHE[(org_id, source_id)]
134
-
135
  return entity_info
136
 
137
 
 
 
138
  def _fallback_detection(org_id: str, source_id: str) -> dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  """
140
- Emergency: Detect entity directly from DuckDB.
141
- Writes result to Redis and cache for recovery.
142
- """
143
- print(f"[fallback] 🚨 Running fallback for {org_id}/{source_id}")
144
-
145
- conn = get_conn(org_id)
146
- rows = conn.execute("""
 
 
147
  SELECT row_data
148
  FROM main.raw_rows
149
  WHERE row_data IS NOT NULL
150
  USING SAMPLE 100
151
  """).fetchall()
152
-
153
- if not rows:
154
- print("[fallback] ❌ No data found, returning UNKNOWN")
155
- entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0}
156
- else:
157
- parsed = [json.loads(r[0]) for r in rows if r[0]]
158
- df = pd.DataFrame(parsed)
159
-
160
- # This is fast (<50ms) with rule-based detection
161
- entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
162
- entity_info = {"entity_type": entity_type, "confidence": confidence}
163
- print(f"[fallback] βœ… Direct detection: {entity_type} ({confidence:.2%})")
164
-
165
- # βœ… CRITICAL: Write to Redis BEFORE returning
166
- redis_key = f"entity:{org_id}:{source_id}"
167
- event_hub.setex(redis_key, 3600, json.dumps(entity_info))
168
- print(f"[fallback] πŸ’Ύ WRITTEN TO REDIS: {redis_key}")
169
-
170
- # βœ… Also populate module cache for immediate reuse
171
- cache_key = (org_id, source_id)
172
- _ENTITY_CACHE[cache_key] = entity_info
173
-
174
- return entity_info
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  #poll for industry from redis
176
  def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
177
  """
@@ -526,16 +597,53 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
526
 
527
  # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
528
  if not df.empty:
529
- # At the end of the canonify pipeline
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
530
  try:
531
- # Use central event hub to trigger analytics (pubsub + stream)
532
  event_hub.emit_analytics_trigger(org_id, source_id, {
533
  "type": "kpi_compute",
534
  "entity_type": entity_type,
535
  "industry": industry,
536
- "rows_inserted": len(df) })
 
537
  print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
538
  except Exception as e:
539
  print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
 
 
540
 
541
  return df, industry, industry_confidence
 
119
 
120
  # 5. Emergency fallback (worker is dead)
121
  print("[poll] ⚠️ Both attempts failed - using direct detection")
122
+
123
+ # Use the combined fallback so we only hit DuckDB once and write both
124
+ # entity AND industry keys atomically when possible.
125
+ entity_info, industry_info = _fallback_combined(org_id, source_id)
126
+
127
+ # Invalidate local cache entry so that subsequent callers read Redis first
128
+ _ENTITY_CACHE.pop((org_id, source_id), None)
129
+
 
 
 
 
 
130
  return entity_info
131
 
132
 
133
+ # OLD: _fallback_detection kept for reference (commented out during refactor)
134
+ """
135
  def _fallback_detection(org_id: str, source_id: str) -> dict:
136
+ # (original implementation)
137
+ ...
138
+ """
139
+
140
+
141
+ def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]:
142
+ """Single DuckDB query to produce both entity and industry detections.
143
+
144
+ Guarantees:
145
+ - Always writes `entity:{org_id}:{source_id}` and
146
+ `industry:{org_id}:{source_id}` to Redis (or logs if write fails).
147
+ - Updates module caches then invalidates them so readers re-check Redis.
148
+ - Attempts parallel detection to reduce latency.
149
  """
150
+ print(f"[fallback_combined] 🚨 Running combined fallback for {org_id}/{source_id}")
151
+
152
+ # Default UNKNOWN placeholders
153
+ entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0}
154
+ industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
155
+
156
+ try:
157
+ conn = get_conn(org_id)
158
+ rows = conn.execute("""
159
  SELECT row_data
160
  FROM main.raw_rows
161
  WHERE row_data IS NOT NULL
162
  USING SAMPLE 100
163
  """).fetchall()
164
+
165
+ if rows:
166
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
167
+ df = pd.DataFrame(parsed)
168
+ df.columns = [str(col).lower().strip() for col in df.columns]
169
+
170
+ # Run both detectors concurrently (thread pool for CPU/IO-bound work)
171
+ from concurrent.futures import ThreadPoolExecutor
172
+
173
+ def run_entity():
174
+ try:
175
+ return hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
176
+ except Exception as e:
177
+ print(f"[fallback_combined] ❌ entity detection failed: {e}")
178
+ return ("UNKNOWN", 0.0, False)
179
+
180
+ def run_industry():
181
+ try:
182
+ from app.hybrid_industry_detector import hybrid_detect_industry_type
183
+ return hybrid_detect_industry_type(org_id, df, source_id)
184
+ except Exception as e:
185
+ print(f"[fallback_combined] ❌ industry detection failed: {e}")
186
+ return ("UNKNOWN", 0.0, False)
187
+
188
+ with ThreadPoolExecutor(max_workers=2) as ex:
189
+ ent_f = ex.submit(run_entity)
190
+ ind_f = ex.submit(run_industry)
191
+ ent_res = ent_f.result()
192
+ ind_res = ind_f.result()
193
+
194
+ entity_info = {"entity_type": ent_res[0], "confidence": ent_res[1]}
195
+ industry_info = {"industry": ind_res[0], "confidence": ind_res[1]}
196
+
197
+ print(f"[fallback_combined] βœ… Entity: {entity_info['entity_type']} ({entity_info['confidence']:.2%})")
198
+ print(f"[fallback_combined] βœ… Industry: {industry_info['industry']} ({industry_info['confidence']:.2%})")
199
+
200
+ except Exception as e:
201
+ print(f"[fallback_combined] ❌ Combined fallback failed: {e}")
202
+
203
+ # Persist to Redis; prefer pipeline when available for minimal window
204
+ e_key = f"entity:{org_id}:{source_id}"
205
+ i_key = f"industry:{org_id}:{source_id}"
206
+
207
+ try:
208
+ pipe = event_hub.pipeline()
209
+ if pipe is not None:
210
+ try:
211
+ pipe.setex(e_key, 3600, json.dumps(entity_info))
212
+ pipe.setex(i_key, 3600, json.dumps(industry_info))
213
+ # Also add a per-source readiness nudging stream entry
214
+ try:
215
+ # Structured readiness message for per-source stream
216
+ msg = json.dumps({
217
+ "org_id": org_id,
218
+ "source_id": source_id,
219
+ "status": "ready"
220
+ })
221
+ pipe.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
222
+ except Exception:
223
+ # Not fatal; continue
224
+ pass
225
+ pipe.execute()
226
+ except Exception as e:
227
+ print(f"[fallback_combined] ❌ Pipeline execute failed: {e}")
228
+ # Fall back to sequential writes
229
+ event_hub.setex(e_key, 3600, json.dumps(entity_info))
230
+ event_hub.setex(i_key, 3600, json.dumps(industry_info))
231
+ else:
232
+ # Pipeline not available; do sequential writes
233
+ event_hub.setex(e_key, 3600, json.dumps(entity_info))
234
+ event_hub.setex(i_key, 3600, json.dumps(industry_info))
235
+
236
+ except Exception as e:
237
+ print(f"[fallback_combined] ❌ Redis write failed: {e}")
238
+
239
+ # Update caches (then immediately invalidate to avoid stale-reads window)
240
+ _ENTITY_CACHE[(org_id, source_id)] = entity_info
241
+ _INDUSTRY_CACHE[(org_id, source_id)] = industry_info
242
+ _ENTITY_CACHE.pop((org_id, source_id), None)
243
+ _INDUSTRY_CACHE.pop((org_id, source_id), None)
244
+
245
+ return entity_info, industry_info
246
  #poll for industry from redis
247
  def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
248
  """
 
597
 
598
  # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
599
  if not df.empty:
600
+ # At the end of the canonify pipeline: ensure Redis keys for entity/industry
601
+ # are present (defensive) and nudge workers via stream AFTER commit.
602
+ try:
603
+ e_key = f"entity:{org_id}:{source_id}"
604
+ i_key = f"industry:{org_id}:{source_id}"
605
+ entity_payload = json.dumps({"entity_type": entity_type, "confidence": 1.0})
606
+ industry_payload = json.dumps({"industry": industry, "confidence": industry_confidence})
607
+
608
+ pipe = event_hub.pipeline()
609
+ if pipe is not None:
610
+ try:
611
+ pipe.setex(e_key, 3600, entity_payload)
612
+ pipe.setex(i_key, 3600, industry_payload)
613
+ # per-source readiness nudge
614
+ msg = json.dumps({"org_id": org_id, "source_id": source_id, "status": "ready"})
615
+ pipe.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
616
+ pipe.execute()
617
+ except Exception as e:
618
+ print(f"[canonify] ⚠️ Pipeline nudge failed: {e}")
619
+ # Fallback to sequential writes
620
+ try:
621
+ event_hub.setex(e_key, 3600, entity_payload)
622
+ event_hub.setex(i_key, 3600, industry_payload)
623
+ except Exception as re:
624
+ print(f"[canonify] ❌ Redis setex fallback failed: {re}")
625
+ else:
626
+ # Pipeline not available; write sequentially
627
+ try:
628
+ event_hub.setex(e_key, 3600, entity_payload)
629
+ event_hub.setex(i_key, 3600, industry_payload)
630
+ msg = json.dumps({"org_id": org_id, "source_id": source_id, "status": "ready"})
631
+ event_hub.redis.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
632
+ except Exception as e:
633
+ print(f"[canonify] ❌ Redis nudge failed: {e}")
634
+
635
+ # Emit central trigger for worker manager
636
  try:
 
637
  event_hub.emit_analytics_trigger(org_id, source_id, {
638
  "type": "kpi_compute",
639
  "entity_type": entity_type,
640
  "industry": industry,
641
+ "rows_inserted": len(df)
642
+ })
643
  print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
644
  except Exception as e:
645
  print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
646
+ except Exception as e:
647
+ print(f"[canonify] ⚠️ Finalization nudge failed: {e}")
648
 
649
  return df, industry, industry_confidence
app/tasks/analytics_worker.py CHANGED
@@ -313,14 +313,34 @@ class AnalyticsWorker:
313
  while (time.time() - start) < max_wait:
314
  entity_key = f"entity:{self.org_id}:{self.source_id}"
315
  industry_key = f"industry:{self.org_id}:{self.source_id}"
316
-
317
- if event_hub.exists(entity_key) and event_hub.exists(industry_key):
318
- logger.info(f"[WORKER] βœ… Entity & industry keys found")
319
- return
320
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321
  logger.info(f"[WORKER] ⏳ Waiting for entity/industry keys...")
322
  await asyncio.sleep(2)
323
-
324
  logger.warning(f"[WORKER] ⚠️ Timeout waiting for keys, proceeding anyway")
325
  # Change _publish() method to use streams
326
  async def _publish(self, results: Dict[str, Any]):
 
313
  while (time.time() - start) < max_wait:
314
  entity_key = f"entity:{self.org_id}:{self.source_id}"
315
  industry_key = f"industry:{self.org_id}:{self.source_id}"
316
+
317
+ try:
318
+ # If in-memory cache exists but Redis does not, invalidate cache
319
+ from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
320
+
321
+ cache_ent = _ENTITY_CACHE.get((self.org_id, self.source_id))
322
+ cache_ind = _INDUSTRY_CACHE.get((self.org_id, self.source_id))
323
+
324
+ ent_exists = event_hub.exists(entity_key)
325
+ ind_exists = event_hub.exists(industry_key)
326
+
327
+ if cache_ent and not ent_exists:
328
+ _ENTITY_CACHE.pop((self.org_id, self.source_id), None)
329
+ logger.debug(f"[WORKER] Cleared stale _ENTITY_CACHE for {self.org_id}/{self.source_id}")
330
+ if cache_ind and not ind_exists:
331
+ _INDUSTRY_CACHE.pop((self.org_id, self.source_id), None)
332
+ logger.debug(f"[WORKER] Cleared stale _INDUSTRY_CACHE for {self.org_id}/{self.source_id}")
333
+
334
+ if ent_exists and ind_exists:
335
+ logger.info(f"[WORKER] βœ… Entity & industry keys found")
336
+ return
337
+
338
+ except Exception as e:
339
+ logger.debug(f"[WORKER] Redis/cache check error: {e}")
340
+
341
  logger.info(f"[WORKER] ⏳ Waiting for entity/industry keys...")
342
  await asyncio.sleep(2)
343
+
344
  logger.warning(f"[WORKER] ⚠️ Timeout waiting for keys, proceeding anyway")
345
  # Change _publish() method to use streams
346
  async def _publish(self, results: Dict[str, Any]):