Peter Mutwiri commited on
Commit
fba0bec
Β·
1 Parent(s): aa9f7a3

industyr detection last refactor

Browse files
Files changed (2) hide show
  1. app/mapper.py +87 -6
  2. app/tasks/worker.py +1 -1
app/mapper.py CHANGED
@@ -156,11 +156,93 @@ def _fallback_detection(org_id: str, source_id: str) -> dict:
156
 
157
  return entity_info
158
  #poll for industry from redis
159
- # app/mapper.py
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
 
161
- # βœ… Add this at top
162
- _ENTITY_CACHE = {}
163
- _INDUSTRY_CACHE = {} # NEW
164
 
165
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
166
  """
@@ -205,8 +287,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
205
 
206
  # Load dynamic aliases before mapping
207
  load_dynamic_aliases()
208
-
209
- # 1️⃣ FETCH RAW AUDIT DATA
210
  # 1️⃣ FETCH RAW AUDIT DATA
211
  with get_conn(org_id) as conn:
212
  ensure_raw_table(conn)
 
156
 
157
  return entity_info
158
  #poll for industry from redis
159
+ def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
160
+ """
161
+ 🎯 Polls Redis for industry detection result (user-facing dashboard label).
162
+ Capped at 2 Redis calls (immediate + after 5s sleep).
163
+ In-memory cache prevents re-polling the same source.
164
+
165
+ Returns:
166
+ dict: {"industry": str, "confidence": float}
167
+ """
168
+ cache_key = (org_id, source_id)
169
+
170
+ # 1. Check cache FIRST
171
+ if cache_key in _INDUSTRY_CACHE:
172
+ print(f"[poll_industry] πŸ’Ύ CACHE HIT: {cache_key}")
173
+ return _INDUSTRY_CACHE[cache_key]
174
+
175
+ industry_key = f"industry:{org_id}:{source_id}"
176
+ print(f"[poll_industry] ⏳ Polling for key: {industry_key}")
177
+
178
+ # 2. First attempt (immediate)
179
+ data = redis.get(industry_key)
180
+ if data:
181
+ industry_info = json.loads(data)
182
+ _INDUSTRY_CACHE[cache_key] = industry_info
183
+ print(f"[poll_industry] βœ… SUCCESS on first attempt: {industry_info['industry']}")
184
+ return industry_info
185
+
186
+ # 3. Sleep 5 seconds (gives worker time)
187
+ print(f"[poll_industry] πŸ”„ First check failed, sleeping 5s...")
188
+ time.sleep(5.0)
189
+
190
+ # 4. Second attempt (final)
191
+ data = redis.get(industry_key)
192
+ if data:
193
+ industry_info = json.loads(data)
194
+ _INDUSTRY_CACHE[cache_key] = industry_info
195
+ print(f"[poll_industry] βœ… SUCCESS on second attempt: {industry_info['industry']}")
196
+ return industry_info
197
+
198
+ # 5. Emergency fallback (worker is dead)
199
+ print(f"[poll_industry] ⚠️ Both attempts failed - using direct detection")
200
+ return _fallback_industry_detection(org_id, source_id)
201
+ #fallback industry detection
202
+ def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
203
+ """
204
+ Emergency: Run industry detection directly from DuckDB data.
205
+ Uses the actual hybrid detector module you have.
206
+ Writes result to Redis and cache for recovery.
207
+ """
208
+ print(f"[fallback_industry] 🚨 Running fallback for {org_id}/{source_id}")
209
+
210
+ try:
211
+ conn = get_conn(org_id)
212
+ rows = conn.execute("""
213
+ SELECT row_data
214
+ FROM main.raw_rows
215
+ WHERE row_data IS NOT NULL
216
+ USING SAMPLE 100
217
+ """).fetchall()
218
+
219
+ if not rows:
220
+ print(f"[fallback_industry] ❌ No data found, returning UNKNOWN")
221
+ industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
222
+ else:
223
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
224
+ df = pd.DataFrame(parsed)
225
+
226
+ # βœ… CORRECT: Import from your actual module
227
+ from app.hybrid_industry_detector import hybrid_detect_industry_type
228
+
229
+ # Call it (note: it returns 3 values: industry, confidence, is_confident)
230
+ industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
231
+ industry_info = {"industry": industry, "confidence": confidence}
232
+ print(f"[fallback_industry] βœ… Direct detection: {industry} ({confidence:.2%})")
233
+
234
+ # Cache and write to Redis
235
+ cache_key = (org_id, source_id)
236
+ _INDUSTRY_CACHE[cache_key] = industry_info
237
+ redis.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps(industry_info))
238
+
239
+ return industry_info
240
+
241
+ except Exception as e:
242
+ print(f"[fallback_industry] ❌ Failed: {e}")
243
+ return {"industry": "UNKNOWN", "confidence": 0.0}
244
+
245
 
 
 
 
246
 
247
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
248
  """
 
287
 
288
  # Load dynamic aliases before mapping
289
  load_dynamic_aliases()
290
+
 
291
  # 1️⃣ FETCH RAW AUDIT DATA
292
  with get_conn(org_id) as conn:
293
  ensure_raw_table(conn)
app/tasks/worker.py CHANGED
@@ -11,7 +11,7 @@ from app.redis_client import redis
11
  from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
  from app.hybrid_entity_detector import hybrid_detect_entity_type
14
- from app.industry_detector import detect_industry_type as rule_based_detect
15
 
16
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
17
  def shutdown(signum, frame):
 
11
  from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
  from app.hybrid_entity_detector import hybrid_detect_entity_type
14
+ from app.utils.detect_industry import detect_industry as rule_based_detect
15
 
16
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
17
  def shutdown(signum, frame):