shaliz-kong commited on
Commit
7ff6c79
Β·
1 Parent(s): 2863e39

loaded enity and industry from redis hub

Browse files
Files changed (1) hide show
  1. app/tasks/analytics_worker.py +23 -64
app/tasks/analytics_worker.py CHANGED
@@ -80,8 +80,8 @@ class AnalyticsWorker:
80
  try:
81
  logger.info(f"\n[WORKER] πŸš€ STARTING {worker_id}")
82
 
83
- # 🎯 STEP 2: Wait for entity/industry keys (exponential backoff)
84
- await self._wait_for_entity_and_industry()
85
 
86
  # 🎯 STEP 3: Load data with retry logic
87
  df = await self._load_dataframe()
@@ -289,75 +289,34 @@ class AnalyticsWorker:
289
  except:
290
  pass
291
 
292
- def _get_entity_info(self) -> Optional[Dict[str, Any]]:
293
- """Get entity info from Redis with cache invalidation"""
294
  try:
295
- from app.mapper import _ENTITY_CACHE
296
- cache_key = (self.org_id, self.source_id)
297
-
298
  entity_key = f"entity:{self.org_id}:{self.source_id}"
299
- data = event_hub.get_key(entity_key)
300
-
301
- if data:
302
- entity_info = json.loads(data)
303
- _ENTITY_CACHE[cache_key] = entity_info
304
- return entity_info
305
-
306
- _ENTITY_CACHE.pop(cache_key, None)
307
- return None
308
-
309
- except Exception as e:
310
- logger.error(f"[ENTITY] Error: {e}")
311
- return None
312
-
313
- # ==================== ENTITY/INDUSTRY WAITING ====================
314
-
315
- async def _wait_for_entity_and_industry(self):
316
- """Wait for entity and industry keys with exponential backoff"""
317
- MAX_WAIT = 30.0
318
- INITIAL_DELAY = 0.5
319
- MAX_DELAY = 5.0
320
 
321
- entity_key = f"entity:{self.org_id}:{self.source_id}"
322
- industry_key = f"industry:{self.org_id}:{self.source_id}"
 
 
 
323
 
324
- delay = INITIAL_DELAY
325
- start = time.time()
 
326
 
327
- while (time.time() - start) < MAX_WAIT:
328
- try:
329
- # Use EXISTS (HTTP-safe)
330
- ent_exists = event_hub.redis.exists(entity_key)
331
- ind_exists = event_hub.redis.exists(industry_key)
332
-
333
- if ent_exists and ind_exists:
334
- # Update cache
335
- from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
336
- cache_key = (self.org_id, self.source_id)
337
-
338
- if cache_key not in _ENTITY_CACHE:
339
- data = event_hub.get_key(entity_key)
340
- if data:
341
- _ENTITY_CACHE[cache_key] = json.loads(data)
342
-
343
- if cache_key not in _INDUSTRY_CACHE:
344
- data = event_hub.get_key(industry_key)
345
- if data:
346
- _INDUSTRY_CACHE[cache_key] = json.loads(data)
347
-
348
- logger.info("[WORKER] βœ… Entity & industry keys found and validated")
349
- return
350
-
351
- logger.info(f"[WORKER] ⏳ Waiting for keys (entity={ent_exists}, industry={ind_exists})...")
352
-
353
- except Exception as e:
354
- logger.debug(f"[WORKER] Redis check error: {e}")
355
 
356
- await asyncio.sleep(delay)
357
- delay = min(delay * 1.5, MAX_DELAY)
358
 
359
- logger.warning(f"[WORKER] ⚠️ Timeout waiting for keys after {MAX_WAIT}s, proceeding anyway")
360
-
 
 
 
 
361
  # ==================== SCHEMA & EMBEDDING ====================
362
 
363
  # app/tasks/analytics_worker.py - Replace your _discover_schema method
 
80
  try:
81
  logger.info(f"\n[WORKER] πŸš€ STARTING {worker_id}")
82
 
83
+ # βœ… STEP 2: INSTANT Redis read (no waiting, no polling)
84
+ entity_info = await self._load_entity_from_redis()
85
 
86
  # 🎯 STEP 3: Load data with retry logic
87
  df = await self._load_dataframe()
 
289
  except:
290
  pass
291
 
292
+ async def _load_entity_from_redis(self) -> dict:
293
+ """Instantly load entity/industry from Redis (source of truth)"""
294
  try:
295
+ # Read entity from Redis (written by mapper)
 
 
296
  entity_key = f"entity:{self.org_id}:{self.source_id}"
297
+ entity_data = await asyncio.to_thread(event_hub.get_key, entity_key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
298
 
299
+ if not entity_data:
300
+ raise ValueError(f"Entity key not found: {entity_key}")
301
+
302
+ entity_info = json.loads(entity_data)
303
+ self._entity_type = entity_info["entity_type"]
304
 
305
+ # Read industry from Redis
306
+ industry_key = f"industry:{self.org_id}:{self.source_id}"
307
+ industry_data = await asyncio.to_thread(event_hub.get_key, industry_key)
308
 
309
+ if not industry_data:
310
+ raise ValueError(f"Industry key not found: {industry_key}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
311
 
312
+ self._industry_info = json.loads(industry_data)
 
313
 
314
+ logger.info(f"[WORKER] βœ… Loaded entity={self._entity_type}, industry={self._industry_info['industry']} from Redis")
315
+ return entity_info
316
+
317
+ except Exception as e:
318
+ logger.error(f"[WORKER] ❌ Failed to load from Redis: {e}")
319
+ raise
320
  # ==================== SCHEMA & EMBEDDING ====================
321
 
322
  # app/tasks/analytics_worker.py - Replace your _discover_schema method