shaliz-kong commited on
Commit
5549d4c
Β·
1 Parent(s): 98c2088

added fallback entity and industry to be streamed to reddis

Browse files
Files changed (4) hide show
  1. .gitignore +3 -0
  2. app/main.py +15 -0
  3. app/mapper.py +28 -2
  4. app/tasks/analytics_worker.py +19 -2
.gitignore CHANGED
@@ -2,3 +2,6 @@ node_modules
2
  client-nextjs/googlecalendar.json
3
  .env.local
4
  analytics-service/.env.analytics
 
 
 
 
2
  client-nextjs/googlecalendar.json
3
  .env.local
4
  analytics-service/.env.analytics
5
+ analytics-data/duckdb/*.duckdb
6
+ analytics-data/duckdb/*.wal
7
+ analytics-data/duckdb/*
app/main.py CHANGED
@@ -329,6 +329,21 @@ def debug_stream(
329
  "entity_data": event_hub.get_key(entity_key),
330
  "industry_data": event_hub.get_key(industry_key),
331
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
332
  # ─── Root Endpoint ─────────────────────────────────────────────────────────────
333
  @app.get("/", tags=["root"])
334
  def read_root():
 
329
  "entity_data": event_hub.get_key(entity_key),
330
  "industry_data": event_hub.get_key(industry_key),
331
  }
332
+ @app.post("/api/v1/cache/clear")
333
+ def clear_cache(org_id: str, source_id: str, api_key: str = Depends(verify_api_key)):
334
+ """Clear entity/industry caches to force fresh reads"""
335
+ cache_key = (org_id, source_id)
336
+
337
+ # Import the cache dicts
338
+ from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
339
+
340
+ if cache_key in _ENTITY_CACHE:
341
+ del _ENTITY_CACHE[cache_key]
342
+ if cache_key in _INDUSTRY_CACHE:
343
+ del _INDUSTRY_CACHE[cache_key]
344
+
345
+ return {"status": "cleared", "cache_key": str(cache_key)}
346
+
347
  # ─── Root Endpoint ─────────────────────────────────────────────────────────────
348
  @app.get("/", tags=["root"])
349
  def read_root():
app/mapper.py CHANGED
@@ -119,7 +119,20 @@ def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
119
 
120
  # 5. Emergency fallback (worker is dead)
121
  print("[poll] ⚠️ Both attempts failed - using direct detection")
122
- return _fallback_detection(org_id, source_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
 
125
  def _fallback_detection(org_id: str, source_id: str) -> dict:
@@ -197,7 +210,20 @@ def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
197
 
198
  # 5. Emergency fallback (worker is dead)
199
  print("[poll_industry] ⚠️ Both attempts failed - using direct detection")
200
- return _fallback_industry_detection(org_id, source_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  #fallback industry detection
202
  def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
203
  """
 
119
 
120
  # 5. Emergency fallback (worker is dead)
121
  print("[poll] ⚠️ Both attempts failed - using direct detection")
122
+ entity_info = _fallback_detection(org_id, source_id)
123
+
124
+ # 🎯 NEW: Force write to Redis (ensure it's there)
125
+ event_hub.setex(
126
+ f"entity:{org_id}:{source_id}",
127
+ 3600,
128
+ json.dumps(entity_info)
129
+ )
130
+
131
+ # 🎯 NEW: Clear stale cache so next read is fresh
132
+ if (org_id, source_id) in _ENTITY_CACHE:
133
+ del _ENTITY_CACHE[(org_id, source_id)]
134
+
135
+ return entity_info
136
 
137
 
138
  def _fallback_detection(org_id: str, source_id: str) -> dict:
 
210
 
211
  # 5. Emergency fallback (worker is dead)
212
  print("[poll_industry] ⚠️ Both attempts failed - using direct detection")
213
+ industry_info = _fallback_industry_detection(org_id, source_id)
214
+
215
+ # 🎯 NEW: Force write to Redis (ensure it's there)
216
+ event_hub.setex(
217
+ f"industry:{org_id}:{source_id}",
218
+ 3600,
219
+ json.dumps(industry_info)
220
+ )
221
+
222
+ # 🎯 NEW: Clear stale cache so next read is fresh
223
+ if (org_id, source_id) in _INDUSTRY_CACHE:
224
+ del _INDUSTRY_CACHE[(org_id, source_id)]
225
+
226
+ return industry_info
227
  #fallback industry detection
228
  def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
229
  """
app/tasks/analytics_worker.py CHANGED
@@ -54,7 +54,8 @@ class AnalyticsWorker:
54
  """
55
  start_time = datetime.now()
56
  logger.info(f"\n[WORKER] πŸš€ STARTING {self.org_id}/{self.source_id}")
57
-
 
58
  try:
59
  # 1️⃣ LOAD DATA (handles missing tables)
60
  df = await self._load_dataframe()
@@ -304,7 +305,23 @@ class AnalyticsWorker:
304
  return "supermarket"
305
  except:
306
  return "supermarket"
307
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  # Change _publish() method to use streams
309
  async def _publish(self, results: Dict[str, Any]):
310
  try:
 
54
  """
55
  start_time = datetime.now()
56
  logger.info(f"\n[WORKER] πŸš€ STARTING {self.org_id}/{self.source_id}")
57
+ # 🎯 NEW: Wait for entity/industry keys to exist
58
+ await self._wait_for_entity_and_industry()
59
  try:
60
  # 1️⃣ LOAD DATA (handles missing tables)
61
  df = await self._load_dataframe()
 
305
  return "supermarket"
306
  except:
307
  return "supermarket"
308
+ async def _wait_for_entity_and_industry(self):
309
+ """Block until entity and industry are detected (max 30s)"""
310
+ max_wait = 30
311
+ start = time.time()
312
+
313
+ while (time.time() - start) < max_wait:
314
+ entity_key = f"entity:{self.org_id}:{self.source_id}"
315
+ industry_key = f"industry:{self.org_id}:{self.source_id}"
316
+
317
+ if event_hub.exists(entity_key) and event_hub.exists(industry_key):
318
+ logger.info(f"[WORKER] βœ… Entity & industry keys found")
319
+ return
320
+
321
+ logger.info(f"[WORKER] ⏳ Waiting for entity/industry keys...")
322
+ await asyncio.sleep(2)
323
+
324
+ logger.warning(f"[WORKER] ⚠️ Timeout waiting for keys, proceeding anyway")
325
  # Change _publish() method to use streams
326
  async def _publish(self, results: Dict[str, Any]):
327
  try: