Peter Mutwiri commited on
Commit
98106cb
Β·
1 Parent(s): 0faa253

added detect industry

Browse files
app/hybrid_industry_detector.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/hybrid_industry_detector.py
2
+ import logging
3
+ import pandas as pd
4
+ from typing import Tuple, Dict
5
+ from app.utils.detect_industry import detect_industry as rule_based_detect
6
+ from app.service.ai_service import ai_service
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ def hybrid_detect_industry_type(org_id: str, df: pd.DataFrame, filename: str = "") -> Tuple[str, float, bool]:
11
+ """
12
+ Detects BUSINESS VERTICAL (SUPERMARKET/MANUFACTURING/PHARMA/RETAIL/WHOLESALE/HEALTHCARE)
13
+
14
+ Returns: (industry, confidence, is_confident)
15
+ """
16
+ # 1. Rule-based detection from utils (<10ms, zero LLM cost)
17
+ industry, confidence = rule_based_detect(df)
18
+ industry = industry.upper() # Normalize
19
+
20
+ logger.info(f"[hybrid_industry] RULE-BASED ONLY: {industry} ({confidence:.2f})")
21
+
22
+ # 2. [FUTURE] LLM fallback if confidence < 0.75
23
+ # if confidence < 0.75:
24
+ # logger.info(f"[hybrid_industry] β†’ LLM fallback needed")
25
+ # # ... LLM logic here ...
26
+
27
+ # 3. Always return as confident (rule-based is authoritative)
28
+ return industry, confidence, True
app/mapper.py CHANGED
@@ -82,7 +82,7 @@ def save_dynamic_aliases() -> None:
82
  json.dump(CANONICAL, f, indent=2)
83
  # βœ… Module-level cache: (org_id, source_id) -> entity_info
84
  _ENTITY_CACHE = {}
85
-
86
  def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
87
  """
88
  🎯 Capped at 2 Redis calls (immediate + after 5s sleep).
@@ -155,7 +155,33 @@ def _fallback_detection(org_id: str, source_id: str) -> dict:
155
  redis.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps(entity_info))
156
 
157
  return entity_info
 
 
 
 
 
 
158
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
160
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
161
  """
@@ -273,12 +299,16 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
273
  except Exception as e:
274
  print(f"[canonify] Type conversion warning (non-critical): {e}")
275
 
276
- # βœ… REPLACE WITH THIS (poll from Redis):
277
  entity_info = poll_for_entity(org_id, source_id)
278
  entity_type = entity_info["entity_type"]
279
- confidence = entity_info["confidence"]
280
- industry = entity_type
281
- print(f"[canonify] 🎯 Entity from Redis: {entity_type} ({confidence:.2%})")
 
 
 
 
282
 
283
  # 7) Dynamic schema evolution
284
  os.makedirs("./db", exist_ok=True)
@@ -313,4 +343,4 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
313
  duck.close()
314
  print(f"[canonify] βœ… Pipeline complete for {org_id}")
315
 
316
- return df, industry, confidence
 
82
  json.dump(CANONICAL, f, indent=2)
83
  # βœ… Module-level cache: (org_id, source_id) -> entity_info
84
  _ENTITY_CACHE = {}
85
+ _INDUSTRY_CACHE = {} # NEW
86
  def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
87
  """
88
  🎯 Capped at 2 Redis calls (immediate + after 5s sleep).
 
155
  redis.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps(entity_info))
156
 
157
  return entity_info
158
+ #poll for industry from redis
159
+ # app/mapper.py
160
+
161
+ # βœ… Add this at top
162
+ _ENTITY_CACHE = {}
163
+ _INDUSTRY_CACHE = {} # NEW
164
 
165
+ def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
166
+ """Returns INDUSTRY for user dashboard"""
167
+ cache_key = (org_id, source_id)
168
+
169
+ # Check cache FIRST
170
+ if cache_key in _INDUSTRY_CACHE:
171
+ print(f"[poll_industry] πŸ’Ύ CACHE HIT: {cache_key}")
172
+ return _INDUSTRY_CACHE[cache_key]
173
+
174
+ key = f"industry:{org_id}:{source_id}"
175
+
176
+ for attempt in range(2):
177
+ data = redis.get(key)
178
+ if data:
179
+ info = json.loads(data)
180
+ _INDUSTRY_CACHE[cache_key] = info # βœ… Cache it
181
+ return info
182
+ time.sleep(5.0)
183
+
184
+ return {"industry": "UNKNOWN", "confidence": 0.0}
185
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
186
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
187
  """
 
299
  except Exception as e:
300
  print(f"[canonify] Type conversion warning (non-critical): {e}")
301
 
302
+ # Get ENTITY (internal)
303
  entity_info = poll_for_entity(org_id, source_id)
304
  entity_type = entity_info["entity_type"]
305
+ entity_confidence = entity_info.get("confidence", 0.0) # Not used for frontend
306
+
307
+ # Get INDUSTRY (user-facing)
308
+ industry_info = poll_for_industry(org_id, source_id)
309
+ industry = industry_info["industry"]
310
+ industry_confidence = industry_info["confidence"]
311
+ print(f"[canonify] Industry: {industry} ({industry_confidence:.2%}), Entity: {entity_type}")
312
 
313
  # 7) Dynamic schema evolution
314
  os.makedirs("./db", exist_ok=True)
 
343
  duck.close()
344
  print(f"[canonify] βœ… Pipeline complete for {org_id}")
345
 
346
+ return df, industry, industry_confidence
app/routers/datasources.py CHANGED
@@ -109,17 +109,13 @@ async def create_source_json(
109
  bootstrap(orgId, payload.data)
110
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
111
 
112
- task = {
113
- "id": f"detect_entity:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
114
- "function": "detect_entity",
115
- "args": {
116
- "org_id": org_id,
117
- "source_id": source_id
118
- # No raw_data - worker queries DB
119
- }
120
  }
121
- redis.lpush("python:task_queue", json.dumps(task))
122
- print(f"[api/json] πŸš€ Task queued: {task['id']}")
123
 
124
  df, industry, confidence = canonify_df(org_id, source_id)
125
  # 3. 🎯 Prepare preview for real-time broadcast
 
109
  bootstrap(orgId, payload.data)
110
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
111
 
112
+ industry_task = {
113
+ "id": f"detect_industry:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
114
+ "function": "detect_industry",
115
+ "args": {"org_id": org_id, "source_id": source_id}
 
 
 
 
116
  }
117
+ redis.lpush("python:task_queue", json.dumps(industry_task))
118
+ # Entity will be auto-queued by process_detect_industry()
119
 
120
  df, industry, confidence = canonify_df(org_id, source_id)
121
  # 3. 🎯 Prepare preview for real-time broadcast
app/tasks/worker.py CHANGED
@@ -10,7 +10,8 @@ import pandas as pd # βœ… Required for entity detection
10
  from app.redis_client import redis
11
  from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
- from app.hybrid_entity_detector import hybrid_detect_entity_type # βœ… New import
 
14
 
15
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
16
  def shutdown(signum, frame):
@@ -20,6 +21,13 @@ def shutdown(signum, frame):
20
  signal.signal(signal.SIGINT, shutdown)
21
  signal.signal(signal.SIGTERM, shutdown)
22
 
 
 
 
 
 
 
 
23
  # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
24
  def process_detect_entity(org_id: str, **args):
25
  """🎯 Queries DuckDB for raw data instead of receiving payload"""
@@ -88,6 +96,53 @@ def process_detect_entity(org_id: str, **args):
88
  # CRITICAL: Re-raise so process_task logs it properly
89
  raise RuntimeError(f"Entity detection failed for {source_id}: {str(e)}")
90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  # ── Legacy Handlers (Keep for backward compatibility) ────────────────────────
92
  def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
93
  """⚠️ DEPRECATED: Remove once all ingestion uses detect_entity worker"""
 
10
  from app.redis_client import redis
11
  from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
+ from app.hybrid_entity_detector import hybrid_detect_entity_type
14
+ from app.industry_detector import detect_industry_type as rule_based_detect
15
 
16
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
17
  def shutdown(signum, frame):
 
21
  signal.signal(signal.SIGINT, shutdown)
22
  signal.signal(signal.SIGTERM, shutdown)
23
 
24
+ TASK_HANDLERS = {
25
+ "detect_industry": process_detect_industry,
26
+ "detect_entity": process_detect_entity,
27
+ "precompute_kpis": process_precompute_kpis,
28
+ # ...
29
+ }
30
+
31
  # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
32
  def process_detect_entity(org_id: str, **args):
33
  """🎯 Queries DuckDB for raw data instead of receiving payload"""
 
96
  # CRITICAL: Re-raise so process_task logs it properly
97
  raise RuntimeError(f"Entity detection failed for {source_id}: {str(e)}")
98
 
99
+ def process_detect_industry(org_id: str, **args):
100
+ """
101
+ 🎯 DETECTS INDUSTRY (business vertical) only.
102
+ DOES NOT touch entity detection.
103
+ """
104
+ source_id = args["source_id"]
105
+
106
+ print(f"πŸ”΄πŸ”΄πŸ”΄ [WORKER] INDUSTRY detection for {org_id}/{source_id}")
107
+
108
+ try:
109
+ # Query raw data
110
+ conn = get_duckdb(org_id)
111
+ rows = conn.execute("SELECT row_data FROM main.raw_rows LIMIT 100").fetchall()
112
+
113
+ if not rows:
114
+ raise RuntimeError("No raw data")
115
+
116
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
117
+ df = pd.DataFrame(parsed)
118
+
119
+ # βœ… Use NEW detector (decoupled from entity)
120
+ industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
121
+
122
+ # Write to dedicated Redis key
123
+ redis.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps({
124
+ "industry": industry,
125
+ "confidence": confidence
126
+ }))
127
+
128
+ print(f"βœ… [WORKER] INDUSTRY written: {industry} ({confidence:.2%})")
129
+
130
+ # Auto-queue entity detection (separate task, independent)
131
+ entity_task = {
132
+ "id": f"detect_entity:{org_id}:{source_id}:{int(time.time())}",
133
+ "function": "detect_entity",
134
+ "args": {"org_id": org_id, "source_id": source_id}
135
+ }
136
+ redis.lpush("python:task_queue", json.dumps(entity_task))
137
+
138
+ except Exception as e:
139
+ print(f"❌ [WORKER] Industry detection CRASHED: {e}")
140
+ redis.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps({
141
+ "industry": "UNKNOWN",
142
+ "confidence": 0.0
143
+ }))
144
+ raise
145
+
146
  # ── Legacy Handlers (Keep for backward compatibility) ────────────────────────
147
  def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
148
  """⚠️ DEPRECATED: Remove once all ingestion uses detect_entity worker"""