Peter Mutwiri commited on
Commit
88fd951
Β·
2 Parent(s): ea24fcc14651bf

Merge HF Space state

Browse files
Files changed (8) hide show
  1. Dockerfile +1 -1
  2. README.md +1 -0
  3. app/db.py +85 -77
  4. app/entity_detector.py +80 -0
  5. app/main.py +3 -0
  6. app/mapper.py +176 -128
  7. app/routers/datasources.py +76 -19
  8. requirements.txt +28 -13
Dockerfile CHANGED
@@ -37,4 +37,4 @@ COPY scheduler_loop.py /app/scheduler_loop.py
37
  ENV API_KEYS=dev-analytics-key-123
38
 
39
  # ---- 7. start both services -----------------------------------------------
40
- CMD sh -c "python -m uvicorn app.main:app --host 0.0.0.0 --port 8080 & python /app/scheduler_loop.py"
 
37
  ENV API_KEYS=dev-analytics-key-123
38
 
39
  # ---- 7. start both services -----------------------------------------------
40
+ CMD sh -c "python -m uvicorn app.main:app --host 0.0.0.0 --port 7860 & python /app/scheduler_loop.py"
README.md CHANGED
@@ -5,6 +5,7 @@ colorFrom: blue
5
  colorTo: green
6
  sdk: docker
7
  pinned: false
 
8
  ---
9
 
10
  FastAPI analytics webhook container.
 
5
  colorTo: green
6
  sdk: docker
7
  pinned: false
8
+ port: 8080
9
  ---
10
 
11
  FastAPI analytics webhook container.
app/db.py CHANGED
@@ -1,24 +1,21 @@
1
- import duckdb, os, pathlib, json
2
- from datetime import datetime
 
 
 
3
  from typing import Any, Dict, List
 
4
 
5
  DB_DIR = pathlib.Path("./data/duckdb")
6
  DB_DIR.mkdir(parents=True, exist_ok=True)
7
 
8
-
9
  def get_conn(org_id: str):
10
  """Get or create a DuckDB connection for an organization."""
11
  db_file = DB_DIR / f"{org_id}.duckdb"
12
  return duckdb.connect(str(db_file), read_only=False)
13
 
14
-
15
- # ------------------------------------------------------------
16
- # πŸ”Ή Backward-compatible table for raw JSON ingestion
17
- # ------------------------------------------------------------
18
  def ensure_raw_table(conn):
19
- """
20
- Maintains legacy compatibility for ingestion from webhooks / file uploads.
21
- """
22
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
23
  conn.execute("""
24
  CREATE TABLE IF NOT EXISTS main.raw_rows(
@@ -27,101 +24,112 @@ def ensure_raw_table(conn):
27
  )
28
  """)
29
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
- # ------------------------------------------------------------
32
- # πŸ”Ή Flexible dynamic schema table creation
33
- # ------------------------------------------------------------
34
  def ensure_table(conn, table_name: str, sample_record: Dict[str, Any]):
35
  """
36
- Ensures a DuckDB table exists with columns inferred from sample_record.
37
- If new columns appear later, adds them automatically.
38
  """
 
 
 
39
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
 
 
40
  conn.execute(
41
  f"CREATE TABLE IF NOT EXISTS main.{table_name} ("
42
  "id UUID DEFAULT uuid(), "
43
  "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
44
  )
45
 
46
- if not sample_record:
47
- return
48
-
49
- existing_cols = {r[0] for r in conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall()}
 
 
 
50
 
 
51
  for col, val in sample_record.items():
52
- if col in existing_cols:
 
 
53
  continue
54
- dtype = infer_duckdb_type(val)
55
- print(f"[db] βž• Adding new column '{col}:{dtype}' to main.{table_name}")
56
- conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col} {dtype}")
57
-
58
-
59
- def infer_duckdb_type(value: Any) -> str:
60
- """Infer a DuckDB-compatible column type from a Python value."""
61
- if isinstance(value, bool):
62
- return "BOOLEAN"
63
- if isinstance(value, int):
64
- return "BIGINT"
65
- if isinstance(value, float):
66
- return "DOUBLE"
67
- if isinstance(value, datetime):
68
- return "TIMESTAMP"
69
- if isinstance(value, (dict, list)):
70
- return "JSON"
71
- return "VARCHAR"
72
-
73
 
74
- # ------------------------------------------------------------
75
- # πŸ”Ή Insert records with auto-schema
76
- # ------------------------------------------------------------
77
  def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
78
  """
79
- Insert records into the specified table.
80
- Assumes ensure_table() has already been called.
81
  """
82
  if not records:
83
  return
84
 
85
- cols = records[0].keys()
 
86
  placeholders = ", ".join(["?"] * len(cols))
87
  col_list = ", ".join(cols)
 
88
  insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})"
89
 
90
- values = [tuple(r.get(c) for c in cols) for r in records]
91
- conn.executemany(insert_sql, values)
92
- print(f"[db] βœ… Inserted {len(records)} rows into {table_name}")
93
-
94
-
95
- # ------------------------------------------------------------
96
- # πŸ”Ή Unified bootstrap entrypoint
97
- # ------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
98
  def bootstrap(org_id: str, payload: Dict[str, Any]):
99
  """
100
- Main entrypoint for ingestion.
101
- Detects whether the payload contains:
102
- - A single table (list of dicts)
103
- - Multiple named tables (dict of lists)
104
- Also logs the raw payload in main.raw_rows for lineage tracking.
105
  """
106
  conn = get_conn(org_id)
107
- conn.execute("CREATE SCHEMA IF NOT EXISTS main")
108
  ensure_raw_table(conn)
109
 
110
- # Log raw payload for debugging / lineage
111
- conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (json.dumps(payload),))
112
-
113
- if isinstance(payload, dict) and "tables" in payload:
114
- # multi-table mode
115
- for table_name, rows in payload["tables"].items():
116
- if not rows:
117
- continue
118
- ensure_table(conn, table_name, rows[0])
119
- insert_records(conn, table_name, rows)
120
- elif isinstance(payload, list):
121
- # single-table mode (assume 'sales' as default)
122
- ensure_table(conn, "sales", payload[0])
123
- insert_records(conn, "sales", payload)
124
- else:
125
- print("[db] ⚠️ Unsupported payload shape")
126
-
127
- conn.close()
 
1
+ # app/db.py – BULLETPROOF VERSION
2
+ import os
3
+ import pathlib
4
+ import json
5
+ import duckdb
6
  from typing import Any, Dict, List
7
+ from datetime import datetime
8
 
9
  DB_DIR = pathlib.Path("./data/duckdb")
10
  DB_DIR.mkdir(parents=True, exist_ok=True)
11
 
 
12
  def get_conn(org_id: str):
13
  """Get or create a DuckDB connection for an organization."""
14
  db_file = DB_DIR / f"{org_id}.duckdb"
15
  return duckdb.connect(str(db_file), read_only=False)
16
 
 
 
 
 
17
  def ensure_raw_table(conn):
18
+ """Creates audit trail table for raw JSON data."""
 
 
19
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
20
  conn.execute("""
21
  CREATE TABLE IF NOT EXISTS main.raw_rows(
 
24
  )
25
  """)
26
 
27
+ def infer_duckdb_type(value: Any) -> str:
28
+ """Infer DuckDB type from Python value."""
29
+ if isinstance(value, bool):
30
+ return "BOOLEAN"
31
+ if isinstance(value, int):
32
+ return "BIGINT"
33
+ if isinstance(value, float):
34
+ return "DOUBLE"
35
+ if isinstance(value, datetime):
36
+ return "TIMESTAMP"
37
+ return "VARCHAR"
38
 
39
+ # βœ… BULLETPROOF: Handles integer column names, None values, etc.
 
 
40
  def ensure_table(conn, table_name: str, sample_record: Dict[str, Any]):
41
  """
42
+ Ensures table exists with columns from sample_record.
43
+ SAFE: Converts int column names to strings, handles missing data.
44
  """
45
+ if not sample_record:
46
+ return
47
+
48
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
49
+
50
+ # Create base table with UUID and timestamp
51
  conn.execute(
52
  f"CREATE TABLE IF NOT EXISTS main.{table_name} ("
53
  "id UUID DEFAULT uuid(), "
54
  "_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
55
  )
56
 
57
+ # βœ… SAFE: Get existing columns (handle int names from DB)
58
+ try:
59
+ existing_cols_raw = conn.execute(f"PRAGMA table_info('main.{table_name}')").fetchall()
60
+ existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
61
+ except Exception as e:
62
+ print(f"[db] ⚠️ Could not get table info: {e}")
63
+ existing_cols = set()
64
 
65
+ # βœ… BULLETPROOF: Add missing columns with safe name handling
66
  for col, val in sample_record.items():
67
+ col_name = str(col).lower().strip() # βœ… FORCE STRING
68
+
69
+ if col_name in existing_cols:
70
  continue
71
+
72
+ # Skip None/empty values which can't infer type
73
+ if val is None:
74
+ print(f"[db] ⚠️ Skipping column {col_name} (None value)")
75
+ continue
76
+
77
+ try:
78
+ dtype = infer_duckdb_type(val)
79
+ print(f"[db] βž• Adding column '{col_name}:{dtype}' to main.{table_name}")
80
+ conn.execute(f"ALTER TABLE main.{table_name} ADD COLUMN {col_name} {dtype}")
81
+ except Exception as e:
82
+ print(f"[db] ❌ Failed to add column {col_name}: {e}")
83
+ # Continue with next column - don't crash
 
 
 
 
 
 
84
 
 
 
 
85
  def insert_records(conn, table_name: str, records: List[Dict[str, Any]]):
86
  """
87
+ Insert records into table with safe column handling.
 
88
  """
89
  if not records:
90
  return
91
 
92
+ # βœ… Get columns from first record (force string names)
93
+ cols = [str(k) for k in records[0].keys()]
94
  placeholders = ", ".join(["?"] * len(cols))
95
  col_list = ", ".join(cols)
96
+
97
  insert_sql = f"INSERT INTO main.{table_name} ({col_list}) VALUES ({placeholders})"
98
 
99
+ # Prepare values (handle missing keys)
100
+ values = []
101
+ for record in records:
102
+ row = []
103
+ for col in cols:
104
+ val = record.get(col)
105
+ # Convert dict/list to JSON string for DuckDB
106
+ if isinstance(val, (dict, list)):
107
+ val = json.dumps(val)
108
+ row.append(val)
109
+ values.append(tuple(row))
110
+
111
+ try:
112
+ conn.executemany(insert_sql, values)
113
+ print(f"[db] βœ… Inserted {len(records)} rows into {table_name}")
114
+ except Exception as e:
115
+ print(f"[db] ❌ Insert failed: {e}")
116
+ raise
117
+
118
+ # βœ… PURE AUDIT FUNCTION - Does NOT create tables
119
  def bootstrap(org_id: str, payload: Dict[str, Any]):
120
  """
121
+ **ENTERPRISE-GRADE**: Only stores raw JSON for audit.
122
+ Does NOT create any tables. Schema evolution is canonify_df's job.
 
 
 
123
  """
124
  conn = get_conn(org_id)
 
125
  ensure_raw_table(conn)
126
 
127
+ try:
128
+ raw_json = json.dumps(payload) if not isinstance(payload, str) else payload
129
+ if raw_json and raw_json not in ("null", "[]", "{}"):
130
+ conn.execute("INSERT INTO main.raw_rows (row_data) VALUES (?)", (raw_json,))
131
+ print(f"[bootstrap] βœ… Audit stored: {len(raw_json)} bytes")
132
+ except Exception as e:
133
+ print(f"[bootstrap] ⚠️ Audit failed: {e}")
134
+
135
+ conn.close()
 
 
 
 
 
 
 
 
 
app/entity_detector.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/entity_detector.py
2
+ import pandas as pd
3
+ from typing import Tuple
4
+
5
+ # Entity-specific canonical schemas
6
+ ENTITY_SCHEMAS = {
7
+ "sales": {
8
+ "indicators": ["timestamp", "total", "amount", "qty", "quantity", "sale_date", "transaction_id"],
9
+ "required_matches": 2,
10
+ "aliases": {
11
+ "timestamp": ["timestamp", "date", "sale_date", "created_at", "transaction_time"],
12
+ "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
13
+ "qty": ["qty", "quantity", "units", "pieces", "item_count"],
14
+ "total": ["total", "amount", "line_total", "sales_amount", "price"],
15
+ "store_id": ["store_id", "branch", "location", "outlet_id", "branch_code"],
16
+ }
17
+ },
18
+ "inventory": {
19
+ "indicators": ["stock", "quantity_on_hand", "reorder", "inventory", "current_stock", "warehouse_qty"],
20
+ "required_matches": 2,
21
+ "aliases": {
22
+ "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
23
+ "current_stock": ["stock", "quantity_on_hand", "qty_available", "current_quantity"],
24
+ "reorder_point": ["reorder_level", "min_stock", "reorder_point", "threshold"],
25
+ "supplier_id": ["supplier", "supplier_id", "vendor", "vendor_code"],
26
+ "last_stock_date": ["last_stock_date", "last_receipt", "last_updated"],
27
+ }
28
+ },
29
+ "customer": {
30
+ "indicators": ["customer_id", "email", "phone", "customer_name", "client_id", "loyalty_number"],
31
+ "required_matches": 2,
32
+ "aliases": {
33
+ "customer_id": ["customer_id", "client_id", "member_id", "loyalty_number", "phone"],
34
+ "full_name": ["customer_name", "full_name", "name", "client_name"],
35
+ "email": ["email", "email_address", "e_mail"],
36
+ "phone": ["phone", "phone_number", "mobile", "contact"],
37
+ }
38
+ },
39
+ "product": {
40
+ "indicators": ["product_name", "product_id", "sku", "category", "price", "cost", "unit_of_measure"],
41
+ "required_matches": 2,
42
+ "aliases": {
43
+ "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
44
+ "product_name": ["product_name", "name", "description", "item_name"],
45
+ "category": ["category", "department", "cat", "family", "classification"],
46
+ "unit_price": ["price", "unit_price", "selling_price", "retail_price"],
47
+ "cost_price": ["cost", "cost_price", "purchase_price", "wholesale_price"],
48
+ }
49
+ }
50
+ }
51
+
52
+ def detect_entity_type(df: pd.DataFrame) -> Tuple[str, float]:
53
+ """
54
+ AUTO-DETECT entity type from DataFrame columns.
55
+ Returns: (entity_type, confidence_score)
56
+ """
57
+ columns = {str(col).lower().strip() for col in df.columns}
58
+
59
+ scores = {}
60
+ for entity_type, config in ENTITY_SCHEMAS.items():
61
+ # Count matches between DataFrame columns and entity indicators
62
+ matches = sum(
63
+ 1 for indicator in config["indicators"]
64
+ if any(indicator in col for col in columns)
65
+ )
66
+
67
+ # Calculate confidence (0.0 to 1.0)
68
+ confidence = min(matches / config["required_matches"], 1.0)
69
+ scores[entity_type] = confidence
70
+
71
+ # Return best match if confident enough
72
+ if scores:
73
+ best_entity = max(scores, key=scores.get)
74
+ confidence = scores[best_entity]
75
+
76
+ if confidence > 0.3: # 30% threshold
77
+ return best_entity, confidence
78
+
79
+ # Default to sales if uncertain (most common)
80
+ return "sales", 0.0
app/main.py CHANGED
@@ -21,6 +21,9 @@ app = FastAPI(
21
  lifespan=lifespan
22
  )
23
 
 
 
 
24
  # ---------- Socket.IO Mount ----------
25
  app.mount("/socket.io", socket.socket_app)
26
 
 
21
  lifespan=lifespan
22
  )
23
 
24
+ @app.get("/")
25
+ def read_root():
26
+ return {"status": "ok", "service": "analytics-engine"}
27
  # ---------- Socket.IO Mount ----------
28
  app.mount("/socket.io", socket.socket_app)
29
 
app/mapper.py CHANGED
@@ -1,8 +1,11 @@
1
- import os, json, duckdb, pandas as pd
 
 
 
 
2
  from datetime import datetime, timedelta
3
  from app.db import get_conn, ensure_raw_table
4
- from app.utils.detect_industry import _ALIAS
5
-
6
 
7
  # ---------------------- Canonical schema base ---------------------- #
8
  CANONICAL = {
@@ -18,171 +21,216 @@ CANONICAL = {
18
 
19
  ALIAS_FILE = "./db/alias_memory.json"
20
 
21
- def safe_str_transform(series: pd.Series) -> pd.Series:
22
- """Apply .str.lower() & .str.strip() only if dtype is object/string."""
23
- if pd.api.types.is_string_dtype(series):
24
- return series.str.lower().str.strip()
25
- return series
26
- # ---------------------- Alias memory helpers ---------------------- #
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  def load_dynamic_aliases() -> None:
28
- """Load learned aliases and merge into CANONICAL."""
29
  if os.path.exists(ALIAS_FILE):
30
  try:
31
  with open(ALIAS_FILE) as f:
32
  dynamic_aliases = json.load(f)
33
  for k, v in dynamic_aliases.items():
34
  if k in CANONICAL:
35
- for alias in v:
36
- if alias not in CANONICAL[k]:
37
- CANONICAL[k].append(alias)
38
  else:
39
  CANONICAL[k] = v
40
  except Exception as e:
41
  print(f"[mapper] ⚠️ failed to load alias memory: {e}")
42
 
43
-
44
  def save_dynamic_aliases() -> None:
45
- """Persist learned aliases for next runs."""
46
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
47
  with open(ALIAS_FILE, "w") as f:
48
  json.dump(CANONICAL, f, indent=2)
49
 
50
-
51
- # ---------------------- Schema versioning helpers ---------------------- #
52
- def ensure_schema_version(duck, df: pd.DataFrame) -> str:
53
  """
54
- Ensure schema versioning and track evolution.
55
- Returns the active canonical table name (e.g., main.canonical_v2).
56
- """
57
- duck.execute("CREATE SCHEMA IF NOT EXISTS main")
58
- duck.execute("""
59
- CREATE TABLE IF NOT EXISTS main.schema_versions (
60
- version INTEGER PRIMARY KEY,
61
- columns JSON,
62
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
63
- )
64
- """)
65
-
66
- latest = duck.execute("SELECT * FROM main.schema_versions ORDER BY version DESC LIMIT 1").fetchone()
67
- new_signature = sorted(df.columns.tolist())
68
-
69
- if latest:
70
- latest_cols = sorted(json.loads(latest[1]))
71
- if latest_cols == new_signature:
72
- return f"main.canonical_v{latest[0]}"
73
- else:
74
- new_version = latest[0] + 1
75
- duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
76
- (new_version, json.dumps(new_signature)))
77
- print(f"[schema] β†’ new version detected: canonical_v{new_version}")
78
- return f"main.canonical_v{new_version}"
79
- else:
80
- duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
81
- (1, json.dumps(new_signature)))
82
- print("[schema] β†’ initialized canonical_v1")
83
- return "main.canonical_v1"
84
-
85
-
86
- def reconcile_latest_schema(duck):
87
- """
88
- Merge all canonical_v* tables into main.canonical_latest
89
- preserving new columns and filling missing values with NULL.
90
- """
91
- tables = [r[0] for r in duck.execute("""
92
- SELECT table_name FROM information_schema.tables
93
- WHERE table_name LIKE 'canonical_v%'
94
- """).fetchall()]
95
- if not tables:
96
- return
97
-
98
- union_query = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
99
- duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
100
- print(f"[schema] βœ… reconciled {len(tables)} schema versions β†’ canonical_latest")
101
-
102
-
103
- # ---------------------- Canonify core logic ---------------------- #
104
- def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
105
- """
106
- Normalize, version, and persist canonical data snapshot for org_id.
107
  """
108
  load_dynamic_aliases()
109
  conn = get_conn(org_id)
110
  ensure_raw_table(conn)
111
 
112
- # --------------------------
113
- # ⏱ Safe timestamp filtering
114
- # --------------------------
115
  try:
116
- # Compute cutoff in Python to avoid parameter placeholders inside INTERVAL
117
- cutoff = datetime.now() - timedelta(hours=hours_window)
118
- cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
119
- rows = conn.execute(
120
- f"""
121
- SELECT row_data
122
- FROM raw_rows
123
- WHERE strptime(json_extract(row_data, '$.timestamp'), '%Y-%m-%d %H:%M:%S')
124
- >= TIMESTAMP '{cutoff_str}'
125
- """
126
- ).fetchall()
127
  except Exception as e:
128
- print(f"[canonify] ⚠️ fallback to all rows due to timestamp parse error: {e}")
129
- rows = conn.execute("SELECT row_data FROM raw_rows").fetchall()
130
 
131
  if not rows:
132
- print("[canonify] no rows to process")
133
- return pd.DataFrame()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
 
135
- # --------------------------
136
- # 🧩 DataFrame normalization
137
- # --------------------------
138
- raw = pd.DataFrame([json.loads(r[0]) for r in rows])
139
- raw.columns = safe_str_transform(raw.columns)
140
 
141
- # Flexible alias mapping
 
 
 
142
  mapping = {}
143
  for canon, aliases in CANONICAL.items():
144
- for col in raw.columns:
145
- if any(a in col for a in aliases):
 
146
  mapping[col] = canon
147
  break
148
 
149
- # οΏ½οΏ½οΏ½οΏ½ Learn new aliases dynamically
150
- for col in raw.columns:
151
- if col not in sum(CANONICAL.values(), []):
152
- for canon in CANONICAL.keys():
153
- if canon in col and col not in CANONICAL[canon]:
154
- CANONICAL[canon].append(col)
155
  save_dynamic_aliases()
156
 
157
- # Apply canonical renaming
158
- renamed = raw.rename(columns=mapping)
159
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
160
  df = renamed[cols].copy() if cols else renamed.copy()
161
 
162
- # πŸ”’ Normalize datatypes
163
- if "timestamp" in df:
164
- df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
165
- if "expiry_date" in df:
166
- df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
167
- if "promo_flag" in df:
168
- df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
169
- for col in ("qty", "total"):
170
- if col in df:
171
- df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
172
-
173
- # --------------------------
174
- # πŸͺ£ Schema versioning + storage
175
- # --------------------------
 
 
 
 
 
176
  os.makedirs("./db", exist_ok=True)
177
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
 
179
- table_name = ensure_schema_version(duck, df)
180
- duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
181
- duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
182
-
183
- # 🧩 Always refresh canonical_latest for unified analytics
184
- reconcile_latest_schema(duck)
185
  duck.close()
186
-
187
- print(f"[canonify] βœ… canonical snapshot updated for {org_id}")
188
- return df
 
1
+ # app/mapper.py – BULLETPROOF VERSION
2
+ import os
3
+ import json
4
+ import duckdb
5
+ import pandas as pd
6
  from datetime import datetime, timedelta
7
  from app.db import get_conn, ensure_raw_table
8
+ from app.utils.detect_industry import _ALIAS, detect_industry
 
9
 
10
  # ---------------------- Canonical schema base ---------------------- #
11
  CANONICAL = {
 
21
 
22
  ALIAS_FILE = "./db/alias_memory.json"
23
 
24
+ def map_pandas_to_duck(col: str, series: pd.Series) -> str:
25
+ if pd.api.types.is_bool_dtype(series): return "BOOLEAN"
26
+ if pd.api.types.is_integer_dtype(series): return "BIGINT"
27
+ if pd.api.types.is_float_dtype(series): return "DOUBLE"
28
+ if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
29
+ return "VARCHAR"
30
+
31
+ # ---------- INDUSTRY DETECTION (uses centralized detect_industry) ---------- #
32
+ def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
33
+ """
34
+ Creates single canonical table and adds missing columns dynamically.
35
+ BULLETPROOF: Handles int column names, missing columns, race conditions.
36
+ """
37
+ table_name = "main.canonical"
38
+
39
+ # Create base table if doesn't exist
40
+ duck.execute(f"""
41
+ CREATE TABLE IF NOT EXISTS {table_name} (
42
+ id UUID DEFAULT uuid(),
43
+ _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
44
+ )
45
+ """)
46
+
47
+ # Get existing columns (lowercase for comparison)
48
+ existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
49
+ existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
50
+
51
+ # βœ… BULLETPROOF: Add missing columns with safe name handling
52
+ for col in df.columns:
53
+ col_name = str(col).lower().strip() # βœ… FORCE STRING
54
+ if col_name not in existing_cols:
55
+ try:
56
+ dtype = map_pandas_to_duck(col_name, df[col])
57
+ print(f"[mapper] βž• Adding column '{col_name}:{dtype}'")
58
+ duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}")
59
+ except Exception as e:
60
+ print(f"[mapper] ⚠️ Skipping column {col_name}: {e}")
61
+
62
+ return table_name
63
+
64
+ # ---------- Alias Memory ---------- #
65
  def load_dynamic_aliases() -> None:
 
66
  if os.path.exists(ALIAS_FILE):
67
  try:
68
  with open(ALIAS_FILE) as f:
69
  dynamic_aliases = json.load(f)
70
  for k, v in dynamic_aliases.items():
71
  if k in CANONICAL:
72
+ CANONICAL[k].extend([a for a in v if a not in CANONICAL[k]])
 
 
73
  else:
74
  CANONICAL[k] = v
75
  except Exception as e:
76
  print(f"[mapper] ⚠️ failed to load alias memory: {e}")
77
 
 
78
  def save_dynamic_aliases() -> None:
 
79
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
80
  with open(ALIAS_FILE, "w") as f:
81
  json.dump(CANONICAL, f, indent=2)
82
 
83
+ # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
84
+ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
 
85
  """
86
+ Enterprise ingestion pipeline:
87
+ - Accepts ANY raw data shape
88
+ - Forces safe column names (handles int, None, etc.)
89
+ - Auto-detects industry
90
+ - Dynamically evolves schema
91
+ - Returns (df, industry, confidence)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  """
93
  load_dynamic_aliases()
94
  conn = get_conn(org_id)
95
  ensure_raw_table(conn)
96
 
97
+ # 1) Pull raw audit data
 
 
98
  try:
99
+ rows = conn.execute("""
100
+ SELECT row_data FROM main.raw_rows
101
+ WHERE row_data IS NOT NULL
102
+ AND LENGTH(CAST(row_data AS TEXT)) > 0
103
+ """).fetchall()
 
 
 
 
 
 
104
  except Exception as e:
105
+ print(f"[canonify] SQL read error: {e}")
106
+ rows = []
107
 
108
  if not rows:
109
+ print("[canonify] no audit rows found")
110
+ return pd.DataFrame(), "unknown", 0.0
111
+
112
+ # 2) Parse JSON safely (handles both string and parsed objects)
113
+ parsed = []
114
+ malformed_count = 0
115
+
116
+ for r in rows:
117
+ raw = r[0]
118
+
119
+ if not raw:
120
+ malformed_count += 1
121
+ continue
122
+
123
+ try:
124
+ # βœ… Handle pre-parsed objects from Redis
125
+ if isinstance(raw, (dict, list)):
126
+ obj = raw
127
+ else:
128
+ # βœ… Parse string JSON
129
+ obj = json.loads(str(raw))
130
+ except Exception:
131
+ malformed_count += 1
132
+ continue
133
+
134
+ # βœ… Extract rows from various payload formats
135
+ if isinstance(obj, dict):
136
+ if "rows" in obj and isinstance(obj["rows"], list):
137
+ parsed.extend(obj["rows"])
138
+ elif "data" in obj and isinstance(obj["data"], list):
139
+ parsed.extend(obj["data"])
140
+ elif "tables" in obj and isinstance(obj["tables"], dict):
141
+ for table_rows in obj["tables"].values():
142
+ if isinstance(table_rows, list):
143
+ parsed.extend(table_rows)
144
+ else:
145
+ parsed.append(obj)
146
+ elif isinstance(obj, list):
147
+ parsed.extend(obj)
148
+ else:
149
+ malformed_count += 1
150
+
151
+ if malformed_count:
152
+ print(f"[canonify] skipped {malformed_count} malformed rows")
153
+
154
+ if not parsed:
155
+ print("[canonify] no valid data after parsing")
156
+ return pd.DataFrame(), "unknown", 0.0
157
 
158
+ # 3) βœ… BULLETPROOF: Force all column names to strings
159
+ df = pd.DataFrame(parsed)
160
+ df.columns = [str(col).lower().strip() for col in df.columns]
 
 
161
 
162
+ # βœ… Remove duplicate columns (can happen with messy data)
163
+ df = df.loc[:, ~df.columns.duplicated()]
164
+
165
+ # 4) Map to canonical schema
166
  mapping = {}
167
  for canon, aliases in CANONICAL.items():
168
+ for col in df.columns:
169
+ # βœ… SAFE: Ensure aliases are strings
170
+ if any(str(alias).lower() in str(col).lower() for alias in aliases):
171
  mapping[col] = canon
172
  break
173
 
174
+ # βœ… Learn new aliases
175
+ for col in df.columns:
176
+ for canon in CANONICAL.keys():
177
+ if str(canon).lower() in str(col).lower() and col not in CANONICAL[canon]:
178
+ CANONICAL[canon].append(col)
179
+
180
  save_dynamic_aliases()
181
 
182
+ renamed = df.rename(columns=mapping)
 
183
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
184
  df = renamed[cols].copy() if cols else renamed.copy()
185
 
186
+ # 5) Type conversions (best effort)
187
+ try:
188
+ if "timestamp" in df:
189
+ df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
190
+ if "expiry_date" in df:
191
+ df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
192
+ if "promo_flag" in df:
193
+ df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
194
+ for col in ("qty", "total"):
195
+ if col in df:
196
+ df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
197
+ except Exception as e:
198
+ print(f"[canonify] Type conversion warning (non-critical): {e}")
199
+
200
+ # 6) βœ… Industry detection
201
+ industry, confidence = detect_industry(df)
202
+ print(f"[canonify] 🎯 Industry: {industry} ({confidence:.1%} confidence)")
203
+
204
+ # 7) Dynamic schema evolution
205
  os.makedirs("./db", exist_ok=True)
206
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
207
+
208
+ table_name = ensure_canonical_table(duck, df)
209
+
210
+ # βœ… SAFE INSERT: Match columns explicitly
211
+ if not df.empty:
212
+ # Get current table columns
213
+ table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
214
+ table_cols = [str(r[0]) for r in table_info] # βœ… FORCE STRING
215
+
216
+ # Only insert columns that exist in table
217
+ df_to_insert = df[[col for col in df.columns if col in table_cols]]
218
+
219
+ if not df_to_insert.empty:
220
+ cols_str = ", ".join(df_to_insert.columns)
221
+ placeholders = ", ".join(["?"] * len(df_to_insert.columns))
222
+
223
+ try:
224
+ duck.executemany(
225
+ f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
226
+ df_to_insert.values.tolist()
227
+ )
228
+ print(f"[canonify] βœ… Inserted {len(df_to_insert)} rows")
229
+ except Exception as e:
230
+ print(f"[canonify] ❌ Insert failed: {e}")
231
+ # Continue anyway - data quality issues shouldn't crash pipeline
232
 
 
 
 
 
 
 
233
  duck.close()
234
+ print(f"[canonify] βœ… Pipeline complete for {org_id}")
235
+
236
+ return df, industry, confidence
app/routers/datasources.py CHANGED
@@ -69,10 +69,11 @@ async def create_source(
69
  # =======================================================================
70
  # 2️⃣ SMART JSON ENDPOINT – fully schema-agnostic and multi-table aware
71
  # =======================================================================
 
 
72
  class JsonPayload(BaseModel):
73
  config: Dict[str, Any]
74
- data: Union[List[Any], Dict[str, Any]] # flexible: list or { "tables": {...} }
75
-
76
 
77
  @router.post("/datasources/json")
78
  async def create_source_json(
@@ -83,35 +84,91 @@ async def create_source_json(
83
  _: str = Depends(verify_key),
84
  ):
85
  """
86
- Accepts structured JSON (list or multi-table dict) from n8n, Render jobs, or APIs.
87
- Automatically evolves schemas, stores data, detects industry, and broadcasts live rows.
 
 
 
 
88
  """
89
  try:
 
90
  if not payload or not payload.data:
91
- raise HTTPException(status_code=400, detail="Missing payload data")
 
 
 
92
 
93
- # πŸ’Ύ Flexible insertion – handles one or multiple tables
94
  bootstrap(orgId, payload.data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
 
96
- # 🧭 Canonical normalization (only if β€œsales” or compatible table exists)
97
- df = canonify_df(orgId)
98
- industry, confidence = detect_industry(df)
99
-
100
- # 🎯 Preview last few normalized rows
101
- rows = df.head(3).to_dict("records") if not df.empty else []
102
- await sio.emit("datasource:new-rows", {"rows": rows}, room=orgId)
103
-
104
  return JSONResponse(
 
105
  content={
106
  "id": sourceId,
107
  "status": "processed",
108
  "industry": industry,
109
- "confidence": confidence,
110
- "recentRows": rows,
111
- "message": "βœ… Data ingested successfully",
 
 
 
112
  }
113
  )
114
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  except Exception as e:
116
- print(f"[datasources/json] ❌ ingestion error: {e}")
117
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
69
  # =======================================================================
70
  # 2️⃣ SMART JSON ENDPOINT – fully schema-agnostic and multi-table aware
71
  # =======================================================================
72
+ # app/routers/datasources.py
73
+
74
  class JsonPayload(BaseModel):
75
  config: Dict[str, Any]
76
+ data: Union[List[Any], Dict[str, Any]] # Flexible: list or { "tables": {...} }
 
77
 
78
  @router.post("/datasources/json")
79
  async def create_source_json(
 
84
  _: str = Depends(verify_key),
85
  ):
86
  """
87
+ Enterprise ingestion endpoint:
88
+ - Stores raw audit trail
89
+ - Normalizes to canonical schema
90
+ - Auto-detects industry
91
+ - Broadcasts real-time updates
92
+ - Returns comprehensive metadata
93
  """
94
  try:
95
+ # βœ… Validate payload
96
  if not payload or not payload.data:
97
+ raise HTTPException(
98
+ status_code=400,
99
+ detail="Missing payload.data. Expected list or dict."
100
+ )
101
 
102
+ # 1. πŸ’Ύ Store raw data for audit & lineage
103
  bootstrap(orgId, payload.data)
104
+ print(f"[api/json] βœ… Raw data stored for org: {orgId}")
105
+
106
+ # 2. 🧭 Normalize schema + auto-detect industry (single pass)
107
+ # Returns: (normalized_df, industry_name, confidence_score)
108
+ df, industry, confidence = canonify_df(orgId)
109
+ print(f"[api/json] 🎯 Industry detected: {industry} ({confidence:.1%})")
110
+
111
+ # 3. 🎯 Prepare preview for real-time broadcast
112
+
113
+ # Convert DataFrame to JSON-safe format
114
+ preview_df = df.head(3).copy()
115
+ for col in preview_df.columns:
116
+ if pd.api.types.is_datetime64_any_dtype(preview_df[col]):
117
+ preview_df[col] = preview_df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
118
+ elif pd.api.types.is_timedelta64_dtype(preview_df[col]):
119
+ preview_df[col] = preview_df[col].astype(str)
120
+
121
+ preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
122
+
123
+ # 4. πŸ“‘ Broadcast to connected dashboards
124
+ await sio.emit(
125
+ "datasource:new-rows",
126
+ {
127
+ "rows": preview_rows,
128
+ "industry": industry,
129
+ "confidence": confidence,
130
+ "totalRows": len(df),
131
+ "datasourceId": sourceId,
132
+ },
133
+ room=orgId
134
+ )
135
 
136
+ # 5. βœ… Return comprehensive response
 
 
 
 
 
 
 
137
  return JSONResponse(
138
+ status_code=200,
139
  content={
140
  "id": sourceId,
141
  "status": "processed",
142
  "industry": industry,
143
+ "confidence": round(confidence, 4),
144
+ "recentRows": preview_rows,
145
+ "message": "βœ… Data ingested and normalized successfully",
146
+ "rowsProcessed": len(df),
147
+ "schemaColumns": list(df.columns) if not df.empty else [],
148
+ "processingTimeMs": 0, # You can add timing if needed
149
  }
150
  )
151
 
152
+ except HTTPException:
153
+ raise # Re-raise FastAPI errors as-is
154
+
155
+ except pd.errors.EmptyDataError:
156
+ print(f"[api/json] ⚠️ Empty data for org: {orgId}")
157
+ return JSONResponse(
158
+ status_code=200, # Not an error - just no data
159
+ content={
160
+ "id": sourceId,
161
+ "status": "no_data",
162
+ "industry": "unknown",
163
+ "confidence": 0.0,
164
+ "message": "⚠️ No valid data rows found",
165
+ "rowsProcessed": 0,
166
+ }
167
+ )
168
+
169
  except Exception as e:
170
+ print(f"[api/json] ❌ Unexpected error: {e}")
171
+ raise HTTPException(
172
+ status_code=500,
173
+ detail=f"Ingestion pipeline failed: {str(e)}"
174
+ )
requirements.txt CHANGED
@@ -1,23 +1,38 @@
1
- # Analytics Service dependencies
2
- apscheduler>=3.10
3
- pyarrow>=15.0
4
- redis>=5.0
5
- pandas>=2.2
6
  fastapi>=0.111
7
  uvicorn[standard]>=0.29
8
- prophet==1.1.5
9
- numpy>=1.24
10
- scikit-learn>=1.3
 
 
 
11
  scipy>=1.10
 
12
  statsmodels>=0.14
13
  networkx>=3.0
14
- sqlalchemy[asyncio]>=2.0
15
- asyncpg>=0.29 # async postgres driver
16
- numpy<2.0
 
 
 
 
 
 
 
 
 
 
17
  requests>=2.31
18
- huggingface_hub>=0.20.0
19
  aiohttp>=3.9.0
20
  httpx>=0.27.0
 
 
 
21
  python-multipart==0.0.6
22
  pycryptodome==3.20.0
23
- python-socketio[asyncio]>=5.11.0
 
 
 
 
1
+ # Core API
 
 
 
 
2
  fastapi>=0.111
3
  uvicorn[standard]>=0.29
4
+
5
+ # Data Processing & Analytics
6
+ duckdb>=0.10.3
7
+ pandas>=2.2
8
+ pyarrow>=15.0
9
+ numpy>=1.24,<2.0
10
  scipy>=1.10
11
+ scikit-learn>=1.3
12
  statsmodels>=0.14
13
  networkx>=3.0
14
+ prophet>=1.1.5
15
+
16
+ # Local LLM (Free GPU)
17
+ torch==2.2.0
18
+ transformers==4.40.0
19
+ accelerate==0.28.0
20
+ sentence-transformers==2.7.0
21
+
22
+ # Redis Bridge (Upstash)
23
+ redis==5.0.0
24
+ qstash>=2.0.0,<3.0.0 # <-- ADDED VERSION PIN
25
+
26
+ # HTTP Clients
27
  requests>=2.31
 
28
  aiohttp>=3.9.0
29
  httpx>=0.27.0
30
+
31
+ # Utilities
32
+ huggingface_hub>=0.20.0
33
  python-multipart==0.0.6
34
  pycryptodome==3.20.0
35
+ python-socketio[asyncio]>=5.11.0
36
+ asyncpg>=0.29
37
+ apscheduler>=3.10
38
+ sqlalchemy[asyncio]>=2.0