Peter Mutwiri commited on
Commit
409b44f
Β·
1 Parent(s): b14c25f

entity type detection moved to worker file

Browse files
Files changed (3) hide show
  1. app/mapper.py +37 -5
  2. app/routers/datasources.py +14 -3
  3. app/tasks/worker.py +99 -37
app/mapper.py CHANGED
@@ -78,9 +78,39 @@ def save_dynamic_aliases() -> None:
78
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
79
  with open(ALIAS_FILE, "w") as f:
80
  json.dump(CANONICAL, f, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
83
- def canonify_df(org_id: str, filename: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
84
  """
85
  Enterprise ingestion pipeline:
86
  - Accepts ANY raw data shape
@@ -196,10 +226,12 @@ def canonify_df(org_id: str, filename: str, hours_window: int = 24) -> tuple[pd.
196
  except Exception as e:
197
  print(f"[canonify] Type conversion warning (non-critical): {e}")
198
 
199
- # 6) βœ… Hybrid entity detection (rule-based + LLM fallback)
200
- entity_type, confidence, is_confident = hybrid_detect_entity_type(org_id, df, filename)
201
- print(f"[canonify] 🎯 Entity: {entity_type} ({confidence:.1%} confidence, AI: {not is_confident})")
202
- industry = entity_type
 
 
203
 
204
  # 7) Dynamic schema evolution
205
  os.makedirs("./db", exist_ok=True)
 
78
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
79
  with open(ALIAS_FILE, "w") as f:
80
  json.dump(CANONICAL, f, indent=2)
81
+ def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
82
+ """
83
+ 🎯 BLOCKS until worker writes entity to Redis.
84
+ Fallback to direct detection if Redis/Worker fails.
85
+ """
86
+ from app.redis_client import redis # Local import to avoid circular deps
87
+
88
+ entity_key = f"entity:{org_id}:{source_id}"
89
+ start_time = time.time()
90
+
91
+ print(f"[poll] ⏳ Waiting for entity detection... (key: {entity_key})")
92
+
93
+ while time.time() - start_time < timeout:
94
+ data = redis.get(entity_key)
95
+ if data:
96
+ entity_info = json.loads(data)
97
+ print(f"[poll] βœ… Received: {entity_info['entity_type']} ({entity_info['confidence']:.2%})")
98
+ return entity_info
99
+ time.sleep(0.5) # Poll every 500ms (reduces Redis load)
100
+
101
+ # ⚠️ EMERGENCY FALLBACK: If worker fails, detect synchronously
102
+ print(f"[poll] ⚠️ TIMEOUT after {timeout}s - running direct detection")
103
+ conn = get_conn(org_id)
104
+ rows = conn.execute("SELECT row_data FROM main.raw_rows LIMIT 500").fetchall()
105
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
106
+ df = pd.DataFrame(parsed)
107
+
108
+ # Direct detection (mapper has hybrid_detector imported)
109
+ entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
110
+ return {"entity_type": entity_type, "confidence": confidence}
111
 
112
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
113
+ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
114
  """
115
  Enterprise ingestion pipeline:
116
  - Accepts ANY raw data shape
 
226
  except Exception as e:
227
  print(f"[canonify] Type conversion warning (non-critical): {e}")
228
 
229
+ # βœ… REPLACE WITH THIS (poll from Redis):
230
+ entity_info = poll_for_entity(org_id, source_id)
231
+ entity_type = entity_info["entity_type"]
232
+ confidence = entity_info["confidence"]
233
+ industry = entity_type
234
+ print(f"[canonify] 🎯 Entity from Redis: {entity_type} ({confidence:.2%})")
235
 
236
  # 7) Dynamic schema evolution
237
  os.makedirs("./db", exist_ok=True)
app/routers/datasources.py CHANGED
@@ -101,9 +101,20 @@ async def create_source_json(
101
  # 1. πŸ’Ύ Store raw data for audit & lineage
102
  bootstrap(orgId, payload.data)
103
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
104
-
105
- # 2. 🧭 Normalize schema + auto-detect industry (single pass)
106
- #
 
 
 
 
 
 
 
 
 
 
 
107
  df, industry, confidence = canonify_df(orgId, f"{sourceId}.json")
108
  # 3. 🎯 Prepare preview for real-time broadcast
109
 
 
101
  # 1. πŸ’Ύ Store raw data for audit & lineage
102
  bootstrap(orgId, payload.data)
103
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
104
+ # βœ… NEW: Pass RAW PAYLOAD to worker (not just ID)
105
+ # In your HF datasource router
106
+ task = {
107
+ "id": f"detect_entity_{source_id}_{int(time.time())}", # Unique ID
108
+ "function": "detect_entity", # 🎯 Must match registry
109
+ "args": {
110
+ "org_id": org_id,
111
+ "source_id": source_id,
112
+ "raw_data": payload.data, # Actual data rows
113
+ "filename": f"{source_id}.json"
114
+ }
115
+ }
116
+ redis.lpush("python:task_queue", json.dumps(task))
117
+
118
  df, industry, confidence = canonify_df(orgId, f"{sourceId}.json")
119
  # 3. 🎯 Prepare preview for real-time broadcast
120
 
app/tasks/worker.py CHANGED
@@ -1,13 +1,16 @@
1
- # app/tasks/worker.py – ENTERPRISE GRADE
2
  import json
3
  import time
4
  import signal
5
  import sys
6
  import traceback
7
  from typing import Dict, Any, Callable
 
 
8
  from app.redis_client import redis
9
  from app.service.ai_service import ai_service
10
  from app.deps import get_duckdb
 
11
 
12
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
13
  def shutdown(signum, frame):
@@ -17,44 +20,107 @@ def shutdown(signum, frame):
17
  signal.signal(signal.SIGINT, shutdown)
18
  signal.signal(signal.SIGTERM, shutdown)
19
 
20
- # ── Task Handler Registry ─────────────────────────────────────────────────────
21
- # All handlers MUST accept org_id as first argument
22
- TASK_HANDLERS: Dict[str, Callable] = {
23
- "detect_entity_type": lambda org_id, **args: ai_service.detect_entity_type(org_id, **args),
24
- "generate_sql": lambda org_id, **args: ai_service.generate_sql(org_id, **args),
25
- "generate_insights": lambda org_id, **args: ai_service.generate_insights(org_id, **args),
26
- "similarity_search": lambda org_id, **args: ai_service.similarity_search(org_id, **args),
 
 
27
 
28
- # Mapper integration
29
- "canonify_df": lambda org_id, **args: canonify_df_with_entity(org_id, **args),
30
- "execute_sql": lambda org_id, **args: execute_org_sql(org_id, **args),
31
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
- # ── Wrapper for Legacy Functions ──────────────────────────────────────────────
34
  def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
35
- """Bridge to your existing mapper.canoify_df"""
36
  from app.mapper import canonify_df
37
- # This now uses hybrid detection internally
38
  return canonify_df(org_id, filename, hours_window)
39
 
40
  def execute_org_sql(org_id: str, sql: str):
41
- """Execute SQL for specific org with safety limits"""
42
  conn = get_duckdb(org_id)
43
 
44
- # Security: Only allow SELECT queries
45
  safe_sql = sql.strip().upper()
46
  if not safe_sql.startswith("SELECT"):
47
  raise ValueError("πŸ”’ Only SELECT queries allowed")
48
 
49
- # Add LIMIT 10000 if not present to prevent overload
50
  if "LIMIT" not in safe_sql:
51
  safe_sql += " LIMIT 10000"
52
 
53
  return conn.execute(safe_sql).fetchall()
54
 
55
- # ── Task Processing ────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  def process_task(task_data: Dict[str, Any]):
57
- """Process a single task with full error handling and logging"""
58
  task_id = task_data.get("id")
59
  function_name = task_data.get("function")
60
  args = task_data.get("args", {})
@@ -68,25 +134,25 @@ def process_task(task_data: Dict[str, Any]):
68
 
69
  org_id = args["org_id"]
70
 
71
- # ── Handler Execution ──────────────────────────────────────────────────────
72
  start_time = time.time()
73
  print(f"πŸ”΅ [{org_id}] Processing {function_name} (task: {task_id})")
74
 
75
  try:
76
  handler = TASK_HANDLERS.get(function_name)
77
  if not handler:
78
- raise ValueError(f"Unknown function: {function_name}")
79
 
80
- # Execute handler (org_id is passed explicitly, rest via **args)
81
  result = handler(org_id, **args)
82
 
83
- # ── Success Response ───────────────────────────────────────────────────
84
  duration = time.time() - start_time
85
  print(f"βœ… [{org_id}] {function_name} completed in {duration:.2f}s")
86
 
87
  redis.setex(
88
  f"python:response:{task_id}",
89
- 3600, # 1 hour TTL
90
  json.dumps({
91
  "status": "success",
92
  "org_id": org_id,
@@ -97,11 +163,11 @@ def process_task(task_data: Dict[str, Any]):
97
  )
98
 
99
  except Exception as e:
100
- # ── Error Response ─────────────────────────────────────────────────────
101
  duration = time.time() - start_time
102
  error_msg = f"{type(e).__name__}: {str(e)}"
103
- print(f"❌ [{org_id}] {function_name} failed after {duration:.2f}s: {error_msg}")
104
- print(traceback.format_exc()) # Full stack trace for debugging
105
 
106
  redis.setex(
107
  f"python:response:{task_id}",
@@ -115,31 +181,27 @@ def process_task(task_data: Dict[str, Any]):
115
  })
116
  )
117
 
118
- # ── Main Worker Loop ────────────────────────────────────────────���──────────────
119
  if __name__ == "__main__":
120
  print("πŸš€ Python worker listening on Redis queue...")
121
  print("Press Ctrl+C to stop")
122
 
123
  while True:
124
  try:
125
- # Blocking pop with timeout (0 = infinite wait)
126
  _, task_json = redis.brpop("python:task_queue", timeout=0)
127
 
128
- # Deserialize with error handling
129
  try:
130
  task_data = json.loads(task_json)
 
131
  except json.JSONDecodeError as e:
132
  print(f"❌ Malformed task JSON: {e}")
133
  continue
134
 
135
- # Process task
136
- process_task(task_data)
137
-
138
  except KeyboardInterrupt:
139
  print("\nShutting down...")
140
  break
141
  except Exception as e:
142
- # Worker-level error (Redis connection, etc.)
143
- print(f"πŸ”΄ Worker error: {e}")
144
  traceback.print_exc()
145
- time.sleep(5) # Longer cooldown for worker errors
 
1
+ # app/tasks/worker.py – ENTERPRISE GRADE (WITH ENTITY DETECTION)
2
  import json
3
  import time
4
  import signal
5
  import sys
6
  import traceback
7
  from typing import Dict, Any, Callable
8
+ import pandas as pd # βœ… Required for entity detection
9
+
10
  from app.redis_client import redis
11
  from app.service.ai_service import ai_service
12
  from app.deps import get_duckdb
13
+ from app.hybrid_entity_detector import hybrid_detect_entity_type # βœ… New import
14
 
15
  # ── Graceful Shutdown ──────────────────────────────────────────────────────────
16
  def shutdown(signum, frame):
 
20
  signal.signal(signal.SIGINT, shutdown)
21
  signal.signal(signal.SIGTERM, shutdown)
22
 
23
+ # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
24
+ def process_detect_entity(org_id: str, **args):
25
+ """
26
+ 🎯 CRITICAL PATH: Receives raw data from Vercel, detects entity, stores in Redis
27
+ **DO NOT MODIFY without architect approval**
28
+ """
29
+ source_id = args["source_id"]
30
+ raw_data = args["raw_data"]
31
+ filename = args.get("filename", f"{source_id}.json")
32
 
33
+ print(f"πŸ”΅ [{org_id}] Entity detection starting for {source_id}")
34
+
35
+ try:
36
+ # 1. Convert raw payload to DataFrame (this is why we pass data directly)
37
+ df = pd.DataFrame(raw_data)
38
+ print(f" πŸ“Š DataFrame created: {len(df)} rows Γ— {len(df.columns)} cols")
39
+
40
+ # 2. Run hybrid detection (rule-based, ~10ms, zero LLM cost)
41
+ entity_type, confidence, _ = hybrid_detect_entity_type(
42
+ org_id, df, filename
43
+ )
44
+ print(f" βœ… Detected: {entity_type} ({confidence:.2%})")
45
+
46
+ # 3. Store in Redis for mapper to poll (HF endpoint is waiting for this)
47
+ entity_key = f"entity:{org_id}:{source_id}"
48
+ redis.setex(
49
+ entity_key,
50
+ 3600, # 1 hour TTL (gives mapper plenty of time)
51
+ json.dumps({
52
+ "entity_type": entity_type,
53
+ "confidence": confidence,
54
+ "detected_at": time.time(),
55
+ "source_id": source_id
56
+ })
57
+ )
58
+ print(f" πŸ’Ύ Stored in Redis: {entity_key}")
59
+
60
+ # 4. Publish event for any real-time subscribers (future-proofing)
61
+ redis.publish(
62
+ f"entity_ready:{org_id}",
63
+ json.dumps({
64
+ "source_id": source_id,
65
+ "entity_type": entity_type,
66
+ "confidence": confidence
67
+ })
68
+ )
69
+ print(f" πŸ“€ Published to entity_ready:{org_id}")
70
+
71
+ # 5. Return result to satisfy worker's response contract
72
+ return {
73
+ "entity_type": entity_type,
74
+ "confidence": confidence,
75
+ "source_id": source_id,
76
+ "status": "stored_in_redis"
77
+ }
78
+
79
+ except Exception as e:
80
+ print(f"❌ Entity detection failed: {e}")
81
+ # CRITICAL: Re-raise so process_task logs it properly
82
+ raise RuntimeError(f"Entity detection failed for {source_id}: {str(e)}")
83
 
84
+ # ── Legacy Handlers (Keep for backward compatibility) ────────────────────────
85
  def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
86
+ """⚠️ DEPRECATED: Remove once all ingestion uses detect_entity worker"""
87
  from app.mapper import canonify_df
 
88
  return canonify_df(org_id, filename, hours_window)
89
 
90
  def execute_org_sql(org_id: str, sql: str):
91
+ """Execute SQL for specific org with enterprise guardrails"""
92
  conn = get_duckdb(org_id)
93
 
94
+ # πŸ”’ Security: Whitelist only SELECT queries
95
  safe_sql = sql.strip().upper()
96
  if not safe_sql.startswith("SELECT"):
97
  raise ValueError("πŸ”’ Only SELECT queries allowed")
98
 
99
+ # πŸ’‘ Safety: Auto-limit to prevent memory overload
100
  if "LIMIT" not in safe_sql:
101
  safe_sql += " LIMIT 10000"
102
 
103
  return conn.execute(safe_sql).fetchall()
104
 
105
+ # ── Task Handler Registry ─────────────────────────────────────────────────────
106
+ # ⚠️ ORDER MATTERS: Add new handlers at the top for visibility
107
+ TASK_HANDLERS: Dict[str, Callable] = {
108
+ "detect_entity": process_detect_entity, # 🎯 NEW: Ingestion-critical path
109
+
110
+ # Legacy AI handlers (keep until fully migrated)
111
+ "detect_entity_type": lambda org_id, **args: ai_service.detect_entity_type(org_id, **args),
112
+ "generate_sql": lambda org_id, **args: ai_service.generate_sql(org_id, **args),
113
+ "generate_insights": lambda org_id, **args: ai_service.generate_insights(org_id, **args),
114
+ "similarity_search": lambda org_id, **args: ai_service.similarity_search(org_id, **args),
115
+
116
+ # Legacy mapper handlers (to be deprecated)
117
+ "canonify_df": canonify_df_with_entity,
118
+ "execute_sql": execute_org_sql,
119
+ }
120
+
121
+ # ── Task Processing (UNCHANGED – BATTLE TESTED) ───────────────────────────────
122
  def process_task(task_data: Dict[str, Any]):
123
+ """Process single task with full observability and error isolation"""
124
  task_id = task_data.get("id")
125
  function_name = task_data.get("function")
126
  args = task_data.get("args", {})
 
134
 
135
  org_id = args["org_id"]
136
 
137
+ # ── Execution ──────────────────────────────────────────────────────────────
138
  start_time = time.time()
139
  print(f"πŸ”΅ [{org_id}] Processing {function_name} (task: {task_id})")
140
 
141
  try:
142
  handler = TASK_HANDLERS.get(function_name)
143
  if not handler:
144
+ raise ValueError(f"❌ Unknown function: {function_name}")
145
 
146
+ # Execute handler
147
  result = handler(org_id, **args)
148
 
149
+ # ── Success ────────────────────────────────────────────────────────────
150
  duration = time.time() - start_time
151
  print(f"βœ… [{org_id}] {function_name} completed in {duration:.2f}s")
152
 
153
  redis.setex(
154
  f"python:response:{task_id}",
155
+ 3600,
156
  json.dumps({
157
  "status": "success",
158
  "org_id": org_id,
 
163
  )
164
 
165
  except Exception as e:
166
+ # ── Error ──────────────────────────────────────────────────────────────
167
  duration = time.time() - start_time
168
  error_msg = f"{type(e).__name__}: {str(e)}"
169
+ print(f"❌ [{org_id}] {function_name} FAILED after {duration:.2f}s: {error_msg}")
170
+ print(traceback.format_exc()) # Full trace for debugging
171
 
172
  redis.setex(
173
  f"python:response:{task_id}",
 
181
  })
182
  )
183
 
184
+ # ── Main Worker Loop (UNCHANGED – HANDLES MILLIONS OF TASKS) ──────────────────
185
  if __name__ == "__main__":
186
  print("πŸš€ Python worker listening on Redis queue...")
187
  print("Press Ctrl+C to stop")
188
 
189
  while True:
190
  try:
191
+ # Blocking pop (0 = infinite wait, no CPU burn)
192
  _, task_json = redis.brpop("python:task_queue", timeout=0)
193
 
 
194
  try:
195
  task_data = json.loads(task_json)
196
+ process_task(task_data)
197
  except json.JSONDecodeError as e:
198
  print(f"❌ Malformed task JSON: {e}")
199
  continue
200
 
 
 
 
201
  except KeyboardInterrupt:
202
  print("\nShutting down...")
203
  break
204
  except Exception as e:
205
+ print(f"πŸ”΄ WORKER-LEVEL ERROR (will restart): {e}")
 
206
  traceback.print_exc()
207
+ time.sleep(5) # Cooldown before retry