Peter Mutwiri commited on
Commit
98ef563
Β·
1 Parent(s): 85cf44d

cnaged datasource to call worker to querry stored raw data for entity classification

Browse files
Files changed (2) hide show
  1. app/routers/datasources.py +5 -6
  2. app/tasks/worker.py +19 -14
app/routers/datasources.py CHANGED
@@ -109,18 +109,17 @@ async def create_source_json(
109
  bootstrap(orgId, payload.data)
110
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
111
  # Pass RAW PAYLOAD to worker (not just ID)
112
- timestamp = int(datetime.now().timestamp())
113
  task = {
114
- "id": f"detect_entity:{org_id}:{source_id}:{timestamp}", # Unique ID
115
- "function": "detect_entity", # 🎯 Must match registry
116
  "args": {
117
  "org_id": org_id,
118
- "source_id": source_id,
119
- "raw_data": payload.data, # Actual data rows
120
- "filename": f"{source_id}.json"
121
  }
122
  }
123
  redis.lpush("python:task_queue", json.dumps(task))
 
124
 
125
  df, industry, confidence = canonify_df(org_id, source_id)
126
  # 3. 🎯 Prepare preview for real-time broadcast
 
109
  bootstrap(orgId, payload.data)
110
  print(f"[api/json] βœ… Raw data stored for org: {orgId}")
111
  # Pass RAW PAYLOAD to worker (not just ID)
 
112
  task = {
113
+ "id": f"detect_entity:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
114
+ "function": "detect_entity",
115
  "args": {
116
  "org_id": org_id,
117
+ "source_id": source_id
118
+ # No raw_data - worker queries DB
 
119
  }
120
  }
121
  redis.lpush("python:task_queue", json.dumps(task))
122
+ print(f"[api/json] πŸš€ Task queued: {task['id']}")
123
 
124
  df, industry, confidence = canonify_df(org_id, source_id)
125
  # 3. 🎯 Prepare preview for real-time broadcast
app/tasks/worker.py CHANGED
@@ -22,25 +22,30 @@ signal.signal(signal.SIGTERM, shutdown)
22
 
23
  # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
24
  def process_detect_entity(org_id: str, **args):
25
- """
26
- 🎯 CRITICAL PATH: Receives raw data from Vercel, detects entity, stores in Redis
27
- **DO NOT MODIFY without architect approval**
28
- """
29
  source_id = args["source_id"]
30
- raw_data = args["raw_data"]
31
- filename = args.get("filename", f"{source_id}.json")
32
 
33
- print(f"πŸ”΅ [{org_id}] Entity detection starting for {source_id}")
34
 
35
  try:
36
- # 1. Convert raw payload to DataFrame (this is why we pass data directly)
37
- df = pd.DataFrame(raw_data)
38
- print(f" πŸ“Š DataFrame created: {len(df)} rows Γ— {len(df.columns)} cols")
 
 
 
 
39
 
40
- # 2. Run hybrid detection (rule-based, ~10ms, zero LLM cost)
41
- entity_type, confidence, _ = hybrid_detect_entity_type(
42
- org_id, df, filename
43
- )
 
 
 
 
 
 
44
  print(f" βœ… Detected: {entity_type} ({confidence:.2%})")
45
 
46
  # 3. Store in Redis for mapper to poll (HF endpoint is waiting for this)
 
22
 
23
  # ── NEW: Entity Detection Handler ───────────────────────────────────────────────
24
  def process_detect_entity(org_id: str, **args):
25
+ """🎯 Queries DuckDB for raw data instead of receiving payload"""
 
 
 
26
  source_id = args["source_id"]
 
 
27
 
28
+ print(f"πŸ”΅ [{org_id}] Entity detection STARTED for {source_id}")
29
 
30
  try:
31
+ # 1. βœ… Query DuckDB for latest raw rows (the ones just uploaded)
32
+ conn = get_duckdb(org_id)
33
+ rows = conn.execute("""
34
+ SELECT row_data FROM main.raw_rows
35
+ WHERE row_data IS NOT NULL
36
+ ORDER BY _ingested_at DESC LIMIT 500
37
+ """).fetchall()
38
 
39
+ if not rows:
40
+ raise RuntimeError(f"No raw data found for {source_id}")
41
+
42
+ # 2. Parse into DataFrame
43
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
44
+ df = pd.DataFrame(parsed)
45
+ print(f" πŸ“Š DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
46
+
47
+ # 3. Detect entity (rest unchanged)
48
+ entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
49
  print(f" βœ… Detected: {entity_type} ({confidence:.2%})")
50
 
51
  # 3. Store in Redis for mapper to poll (HF endpoint is waiting for this)