Peter Mutwiri commited on
Commit ·
aae0618
1
Parent(s): 98ef563
imported time in mapper
Browse files- app/mapper.py +2 -0
- app/routers/datasources.py +1 -1
- app/tasks/worker.py +5 -3
app/mapper.py
CHANGED
|
@@ -8,6 +8,8 @@ from app.db import get_conn, ensure_raw_table
|
|
| 8 |
from app.utils.detect_industry import _ALIAS, detect_industry
|
| 9 |
# app/mapper.py (add line 1)
|
| 10 |
from app.hybrid_entity_detector import hybrid_detect_entity_type
|
|
|
|
|
|
|
| 11 |
|
| 12 |
# ---------------------- Canonical schema base ---------------------- #
|
| 13 |
CANONICAL = {
|
|
|
|
| 8 |
from app.utils.detect_industry import _ALIAS, detect_industry
|
| 9 |
# app/mapper.py (add line 1)
|
| 10 |
from app.hybrid_entity_detector import hybrid_detect_entity_type
|
| 11 |
+
import time
|
| 12 |
+
|
| 13 |
|
| 14 |
# ---------------------- Canonical schema base ---------------------- #
|
| 15 |
CANONICAL = {
|
app/routers/datasources.py
CHANGED
|
@@ -108,7 +108,7 @@ async def create_source_json(
|
|
| 108 |
# 1. 💾 Store raw data for audit & lineage
|
| 109 |
bootstrap(orgId, payload.data)
|
| 110 |
print(f"[api/json] ✅ Raw data stored for org: {orgId}")
|
| 111 |
-
|
| 112 |
task = {
|
| 113 |
"id": f"detect_entity:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
|
| 114 |
"function": "detect_entity",
|
|
|
|
| 108 |
# 1. 💾 Store raw data for audit & lineage
|
| 109 |
bootstrap(orgId, payload.data)
|
| 110 |
print(f"[api/json] ✅ Raw data stored for org: {orgId}")
|
| 111 |
+
|
| 112 |
task = {
|
| 113 |
"id": f"detect_entity:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
|
| 114 |
"function": "detect_entity",
|
app/tasks/worker.py
CHANGED
|
@@ -31,10 +31,12 @@ def process_detect_entity(org_id: str, **args):
|
|
| 31 |
# 1. ✅ Query DuckDB for latest raw rows (the ones just uploaded)
|
| 32 |
conn = get_duckdb(org_id)
|
| 33 |
rows = conn.execute("""
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
|
|
|
|
| 37 |
""").fetchall()
|
|
|
|
| 38 |
|
| 39 |
if not rows:
|
| 40 |
raise RuntimeError(f"No raw data found for {source_id}")
|
|
|
|
| 31 |
# 1. ✅ Query DuckDB for latest raw rows (the ones just uploaded)
|
| 32 |
conn = get_duckdb(org_id)
|
| 33 |
rows = conn.execute("""
|
| 34 |
+
SELECT row_data
|
| 35 |
+
FROM main.raw_rows
|
| 36 |
+
WHERE row_data IS NOT NULL
|
| 37 |
+
USING SAMPLE 40
|
| 38 |
""").fetchall()
|
| 39 |
+
|
| 40 |
|
| 41 |
if not rows:
|
| 42 |
raise RuntimeError(f"No raw data found for {source_id}")
|