shaliz-kong commited on
Commit
1e94ca0
Β·
1 Parent(s): 5549d4c

added redis emiter

Browse files
Files changed (1) hide show
  1. app/mapper.py +17 -6
app/mapper.py CHANGED
@@ -162,10 +162,14 @@ def _fallback_detection(org_id: str, source_id: str) -> dict:
162
  entity_info = {"entity_type": entity_type, "confidence": confidence}
163
  print(f"[fallback] βœ… Direct detection: {entity_type} ({confidence:.2%})")
164
 
165
- # Cache and write to Redis so next time it works
 
 
 
 
 
166
  cache_key = (org_id, source_id)
167
  _ENTITY_CACHE[cache_key] = entity_info
168
- event_hub.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps(entity_info))
169
 
170
  return entity_info
171
  #poll for industry from redis
@@ -258,15 +262,22 @@ def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
258
  industry_info = {"industry": industry, "confidence": confidence}
259
  print(f"[fallback_industry] βœ… Direct detection: {industry} ({confidence:.2%})")
260
 
261
- # Cache and write to Redis via hub
 
 
 
 
 
262
  cache_key = (org_id, source_id)
263
  _INDUSTRY_CACHE[cache_key] = industry_info
264
- event_hub.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps(industry_info))
265
 
266
  return industry_info
267
 
268
  except Exception as e:
269
  print(f"[fallback_industry] ❌ Failed: {e}")
 
 
 
270
  return {"industry": "UNKNOWN", "confidence": 0.0}
271
 
272
 
@@ -521,8 +532,8 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
521
  event_hub.emit_analytics_trigger(org_id, source_id, {
522
  "type": "kpi_compute",
523
  "entity_type": entity_type,
524
- "industry": industry
525
- })
526
  print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
527
  except Exception as e:
528
  print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
 
162
  entity_info = {"entity_type": entity_type, "confidence": confidence}
163
  print(f"[fallback] βœ… Direct detection: {entity_type} ({confidence:.2%})")
164
 
165
+ # βœ… CRITICAL: Write to Redis BEFORE returning
166
+ redis_key = f"entity:{org_id}:{source_id}"
167
+ event_hub.setex(redis_key, 3600, json.dumps(entity_info))
168
+ print(f"[fallback] πŸ’Ύ WRITTEN TO REDIS: {redis_key}")
169
+
170
+ # βœ… Also populate module cache for immediate reuse
171
  cache_key = (org_id, source_id)
172
  _ENTITY_CACHE[cache_key] = entity_info
 
173
 
174
  return entity_info
175
  #poll for industry from redis
 
262
  industry_info = {"industry": industry, "confidence": confidence}
263
  print(f"[fallback_industry] βœ… Direct detection: {industry} ({confidence:.2%})")
264
 
265
+ # βœ… CRITICAL: Write to Redis BEFORE returning
266
+ redis_key = f"industry:{org_id}:{source_id}"
267
+ event_hub.setex(redis_key, 3600, json.dumps(industry_info))
268
+ print(f"[fallback_industry] πŸ’Ύ WRITTEN TO REDIS: {redis_key}")
269
+
270
+ # βœ… Also populate module cache
271
  cache_key = (org_id, source_id)
272
  _INDUSTRY_CACHE[cache_key] = industry_info
 
273
 
274
  return industry_info
275
 
276
  except Exception as e:
277
  print(f"[fallback_industry] ❌ Failed: {e}")
278
+ # βœ… Even on error, write UNKNOWN to Redis so worker doesn't hang
279
+ redis_key = f"industry:{org_id}:{source_id}"
280
+ event_hub.setex(redis_key, 3600, json.dumps({"industry": "UNKNOWN", "confidence": 0.0}))
281
  return {"industry": "UNKNOWN", "confidence": 0.0}
282
 
283
 
 
532
  event_hub.emit_analytics_trigger(org_id, source_id, {
533
  "type": "kpi_compute",
534
  "entity_type": entity_type,
535
+ "industry": industry,
536
+ "rows_inserted": len(df) })
537
  print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
538
  except Exception as e:
539
  print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")