petermutwiri commited on
Commit
1703bb3
Β·
verified Β·
1 Parent(s): f87a9e1

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +71 -113
app/mapper.py CHANGED
@@ -1,4 +1,4 @@
1
- # app/mapper.py – FIXED WITH INDUSTRY DETECTION
2
  import os
3
  import json
4
  import duckdb
@@ -28,44 +28,39 @@ def map_pandas_to_duck(col: str, series: pd.Series) -> str:
28
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
29
  return "VARCHAR"
30
 
31
- def safe_str_transform(s: pd.Series) -> pd.Series:
32
- if pd.api.types.is_string_dtype(s):
33
- return s.str.lower().str.strip()
34
- return s
35
-
36
- def sql(conn, stmt: str, *args):
37
- """Centralised parameter binding β†’ no more int-vs-tuple mistakes."""
38
- return conn.execute(stmt, args).fetchall()
39
-
40
- def add_column_if_not_exists(duck: duckdb.DuckDBPyConnection, table: str, col: str, dtype: str) -> None:
41
- existing = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table}')").fetchall()}
42
- if col.lower() not in existing:
43
- duck.execute(f"ALTER TABLE {table} ADD COLUMN {col} {dtype}")
44
- print(f"[schema] βž• added {col}:{dtype} to {table}")
45
-
46
- # ---------- INDUSTRY DETECTION INTEGRATION ---------- #
47
- def detect_industry_from_df(df: pd.DataFrame) -> tuple[str, float]:
48
  """
49
- Auto-detect industry based on column patterns using _ALIAS.
50
- Returns: (industry_name, confidence_score)
51
  """
52
- if df.empty:
53
- return "unknown", 0.0
54
 
55
- cols = set(df.columns.str.lower())
56
- scores = {}
 
 
 
 
 
57
 
58
- for industry, aliases in _ALIAS.items():
59
- matches = sum(1 for alias in aliases if any(alias in col for col in cols))
60
- scores[industry] = min(matches / len(aliases), 1.0) if aliases else 0
61
 
62
- # Get best match
63
- best_industry = max(scores, key=scores.get)
64
- confidence = scores[best_industry]
 
 
 
 
 
 
 
65
 
66
- return best_industry, confidence
67
 
68
- # ---------- Alias Memory (no changes) ---------- #
69
  def load_dynamic_aliases() -> None:
70
  if os.path.exists(ALIAS_FILE):
71
  try:
@@ -84,42 +79,12 @@ def save_dynamic_aliases() -> None:
84
  with open(ALIAS_FILE, "w") as f:
85
  json.dump(CANONICAL, f, indent=2)
86
 
87
- # ---------- Dynamic Schema Evolution ---------- #
88
- def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
89
- """
90
- Single canonical table that evolves dynamically.
91
- Adds missing columns on-the-fly without creating new versions.
92
- """
93
- table_name = "main.canonical"
94
-
95
- # Ensure base table exists
96
- duck.execute(f"""
97
- CREATE TABLE IF NOT EXISTS {table_name} (
98
- id UUID DEFAULT uuid(),
99
- _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
100
- )
101
- """)
102
-
103
- # Get existing columns
104
- existing_cols = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()}
105
-
106
- # Add missing columns dynamically
107
- for col in df.columns:
108
- if col.lower() not in existing_cols:
109
- dtype = map_pandas_to_duck(col, df[col])
110
- print(f"[mapper] βž• Adding new column '{col}:{dtype}' to {table_name}")
111
- duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} {dtype}")
112
-
113
- return table_name
114
-
115
- # ---------- Main Canonify Function (WITH INDUSTRY DETECTION) ---------- #
116
- # app/mapper.py - FIX with bulletproof error handling
117
-
118
  def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
119
  """
120
- Enterprise-grade normalization:
121
- - Pulls raw audit data
122
- - Safely parses JSON
123
  - Auto-detects industry
124
  - Dynamically evolves schema
125
  - Returns (df, industry, confidence)
@@ -128,7 +93,7 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
128
  conn = get_conn(org_id)
129
  ensure_raw_table(conn)
130
 
131
- # βœ… SAFE: Handle both string and parsed objects from Redis
132
  try:
133
  rows = conn.execute("""
134
  SELECT row_data FROM main.raw_rows
@@ -143,42 +108,39 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
143
  print("[canonify] no audit rows found")
144
  return pd.DataFrame(), "unknown", 0.0
145
 
146
- # βœ… SAFE: Parse JSON with type checking
147
  parsed = []
148
  malformed_count = 0
149
 
150
  for r in rows:
151
  raw = r[0]
152
 
153
- # Handle both string and parsed object
154
  if not raw:
155
  malformed_count += 1
156
  continue
157
 
158
  try:
159
- # If it's already parsed (object), use it directly
160
  if isinstance(raw, (dict, list)):
161
  obj = raw
162
  else:
163
- # If it's a string, parse it
164
  obj = json.loads(str(raw))
165
  except Exception:
166
  malformed_count += 1
167
  continue
168
 
169
- # βœ… SAFE: Extract data from various payload shapes
170
  if isinstance(obj, dict):
171
  if "rows" in obj and isinstance(obj["rows"], list):
172
  parsed.extend(obj["rows"])
173
  elif "data" in obj and isinstance(obj["data"], list):
174
  parsed.extend(obj["data"])
175
  elif "tables" in obj and isinstance(obj["tables"], dict):
176
- # Flatten multi-table into single list for canonical
177
  for table_rows in obj["tables"].values():
178
  if isinstance(table_rows, list):
179
  parsed.extend(table_rows)
180
  else:
181
- # Single record dict
182
  parsed.append(obj)
183
  elif isinstance(obj, list):
184
  parsed.extend(obj)
@@ -186,38 +148,33 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
186
  malformed_count += 1
187
 
188
  if malformed_count:
189
- print(f"[canonify] skipped {malformed_count} malformed audit rows")
190
 
191
  if not parsed:
192
  print("[canonify] no valid data after parsing")
193
  return pd.DataFrame(), "unknown", 0.0
194
 
195
- # βœ… Create DataFrame with SAFE column names
196
  df = pd.DataFrame(parsed)
197
-
198
- # Handle empty DataFrame
199
- if df.empty:
200
- print("[canonify] DataFrame is empty")
201
- return pd.DataFrame(), "unknown", 0.0
202
 
203
- # βœ… FIX: Ensure all column names are strings
204
- df.columns = df.columns.astype(str).str.lower().str.strip()
205
 
206
- # βœ… Map columns to canonical names
207
  mapping = {}
208
  for canon, aliases in CANONICAL.items():
209
  for col in df.columns:
210
- # SAFE alias matching
211
  if any(str(alias).lower() in str(col).lower() for alias in aliases):
212
  mapping[col] = canon
213
  break
214
 
215
- # βœ… Learn new aliases dynamically
216
  for col in df.columns:
217
- if col not in [str(a).lower() for sublist in CANONICAL.values() for a in sublist]:
218
- for canon in CANONICAL.keys():
219
- if str(canon).lower() in str(col).lower() and col not in CANONICAL[canon]:
220
- CANONICAL[canon].append(col)
221
 
222
  save_dynamic_aliases()
223
 
@@ -225,7 +182,7 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
225
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
226
  df = renamed[cols].copy() if cols else renamed.copy()
227
 
228
- # βœ… Type conversions with error handling
229
  try:
230
  if "timestamp" in df:
231
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
@@ -237,41 +194,42 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
237
  if col in df:
238
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
239
  except Exception as e:
240
- print(f"[canonify] Type conversion warning: {e}")
241
 
242
- # βœ… FIX: Industry detection with actual column matching
243
  industry, confidence = detect_industry(df)
244
-
245
- # βœ… Dynamic schema evolution (NO MORE versioning)
 
246
  os.makedirs("./db", exist_ok=True)
247
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
248
 
249
  table_name = ensure_canonical_table(duck, df)
250
 
251
- # βœ… SAFE: Explicit column matching to avoid order issues
252
  if not df.empty:
253
- # Get actual columns from table
254
  table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
255
  table_cols = [r[0] for r in table_info]
256
 
257
- # Reorder df to match table
258
- df = df.reindex(columns=[c for c in table_cols if c in df.columns], fill_value=None)
259
-
260
- # Insert
261
- cols_str = ", ".join(df.columns)
262
- placeholders = ", ".join(["?"] * len(df.columns))
263
 
264
- try:
265
- duck.executemany(
266
- f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
267
- df.values.tolist()
268
- )
269
- print(f"[canonify] βœ… Inserted {len(df)} rows into {table_name}")
270
- except Exception as e:
271
- print(f"[canonify] ❌ Insert failed: {e}")
272
- raise
 
 
 
 
273
 
274
  duck.close()
275
- print(f"[canonify] βœ… Complete for {org_id}: {len(df)} rows, {industry} ({confidence:.1%})")
276
 
277
  return df, industry, confidence
 
1
+ # app/mapper.py – BULLETPROOF VERSION
2
  import os
3
  import json
4
  import duckdb
 
28
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
29
  return "VARCHAR"
30
 
31
+ # ---------- INDUSTRY DETECTION (uses centralized detect_industry) ---------- #
32
+ def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  """
34
+ Creates single canonical table and adds missing columns dynamically.
35
+ BULLETPROOF: Handles int column names, missing columns, race conditions.
36
  """
37
+ table_name = "main.canonical"
 
38
 
39
+ # Create base table if doesn't exist
40
+ duck.execute(f"""
41
+ CREATE TABLE IF NOT EXISTS {table_name} (
42
+ id UUID DEFAULT uuid(),
43
+ _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
44
+ )
45
+ """)
46
 
47
+ # Get existing columns (lowercase for comparison)
48
+ existing_cols = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()}
 
49
 
50
+ # βœ… BULLETPROOF: Add missing columns with safe name handling
51
+ for col in df.columns:
52
+ col_name = str(col).lower().strip() # βœ… FORCE STRING
53
+ if col_name not in existing_cols:
54
+ try:
55
+ dtype = map_pandas_to_duck(col_name, df[col])
56
+ print(f"[mapper] βž• Adding column '{col_name}:{dtype}'")
57
+ duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}")
58
+ except Exception as e:
59
+ print(f"[mapper] ⚠️ Skipping column {col_name}: {e}")
60
 
61
+ return table_name
62
 
63
+ # ---------- Alias Memory ---------- #
64
  def load_dynamic_aliases() -> None:
65
  if os.path.exists(ALIAS_FILE):
66
  try:
 
79
  with open(ALIAS_FILE, "w") as f:
80
  json.dump(CANONICAL, f, indent=2)
81
 
82
+ # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
  def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
84
  """
85
+ Enterprise ingestion pipeline:
86
+ - Accepts ANY raw data shape
87
+ - Forces safe column names (handles int, None, etc.)
88
  - Auto-detects industry
89
  - Dynamically evolves schema
90
  - Returns (df, industry, confidence)
 
93
  conn = get_conn(org_id)
94
  ensure_raw_table(conn)
95
 
96
+ # 1) Pull raw audit data
97
  try:
98
  rows = conn.execute("""
99
  SELECT row_data FROM main.raw_rows
 
108
  print("[canonify] no audit rows found")
109
  return pd.DataFrame(), "unknown", 0.0
110
 
111
+ # 2) Parse JSON safely (handles both string and parsed objects)
112
  parsed = []
113
  malformed_count = 0
114
 
115
  for r in rows:
116
  raw = r[0]
117
 
 
118
  if not raw:
119
  malformed_count += 1
120
  continue
121
 
122
  try:
123
+ # βœ… Handle pre-parsed objects from Redis
124
  if isinstance(raw, (dict, list)):
125
  obj = raw
126
  else:
127
+ # βœ… Parse string JSON
128
  obj = json.loads(str(raw))
129
  except Exception:
130
  malformed_count += 1
131
  continue
132
 
133
+ # βœ… Extract rows from various payload formats
134
  if isinstance(obj, dict):
135
  if "rows" in obj and isinstance(obj["rows"], list):
136
  parsed.extend(obj["rows"])
137
  elif "data" in obj and isinstance(obj["data"], list):
138
  parsed.extend(obj["data"])
139
  elif "tables" in obj and isinstance(obj["tables"], dict):
 
140
  for table_rows in obj["tables"].values():
141
  if isinstance(table_rows, list):
142
  parsed.extend(table_rows)
143
  else:
 
144
  parsed.append(obj)
145
  elif isinstance(obj, list):
146
  parsed.extend(obj)
 
148
  malformed_count += 1
149
 
150
  if malformed_count:
151
+ print(f"[canonify] skipped {malformed_count} malformed rows")
152
 
153
  if not parsed:
154
  print("[canonify] no valid data after parsing")
155
  return pd.DataFrame(), "unknown", 0.0
156
 
157
+ # 3) βœ… BULLETPROOF: Force all column names to strings
158
  df = pd.DataFrame(parsed)
159
+ df.columns = [str(col).lower().strip() for col in df.columns]
 
 
 
 
160
 
161
+ # βœ… Remove duplicate columns (can happen with messy data)
162
+ df = df.loc[:, ~df.columns.duplicated()]
163
 
164
+ # 4) Map to canonical schema
165
  mapping = {}
166
  for canon, aliases in CANONICAL.items():
167
  for col in df.columns:
168
+ # βœ… SAFE: Ensure aliases are strings
169
  if any(str(alias).lower() in str(col).lower() for alias in aliases):
170
  mapping[col] = canon
171
  break
172
 
173
+ # βœ… Learn new aliases
174
  for col in df.columns:
175
+ for canon in CANONICAL.keys():
176
+ if str(canon).lower() in str(col).lower() and col not in CANONICAL[canon]:
177
+ CANONICAL[canon].append(col)
 
178
 
179
  save_dynamic_aliases()
180
 
 
182
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
183
  df = renamed[cols].copy() if cols else renamed.copy()
184
 
185
+ # 5) Type conversions (best effort)
186
  try:
187
  if "timestamp" in df:
188
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
 
194
  if col in df:
195
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
196
  except Exception as e:
197
+ print(f"[canonify] Type conversion warning (non-critical): {e}")
198
 
199
+ # 6) βœ… Industry detection
200
  industry, confidence = detect_industry(df)
201
+ print(f"[canonify] 🎯 Industry: {industry} ({confidence:.1%} confidence)")
202
+
203
+ # 7) Dynamic schema evolution
204
  os.makedirs("./db", exist_ok=True)
205
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
206
 
207
  table_name = ensure_canonical_table(duck, df)
208
 
209
+ # βœ… SAFE INSERT: Match columns explicitly
210
  if not df.empty:
211
+ # Get current table columns
212
  table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
213
  table_cols = [r[0] for r in table_info]
214
 
215
+ # Only insert columns that exist in table
216
+ df_to_insert = df[[col for col in df.columns if col in table_cols]]
 
 
 
 
217
 
218
+ if not df_to_insert.empty:
219
+ cols_str = ", ".join(df_to_insert.columns)
220
+ placeholders = ", ".join(["?"] * len(df_to_insert.columns))
221
+
222
+ try:
223
+ duck.executemany(
224
+ f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
225
+ df_to_insert.values.tolist()
226
+ )
227
+ print(f"[canonify] βœ… Inserted {len(df_to_insert)} rows")
228
+ except Exception as e:
229
+ print(f"[canonify] ❌ Insert failed: {e}")
230
+ # Continue anyway - data quality issues shouldn't crash pipeline
231
 
232
  duck.close()
233
+ print(f"[canonify] βœ… Pipeline complete for {org_id}")
234
 
235
  return df, industry, confidence