petermutwiri commited on
Commit
4f1cf86
Β·
verified Β·
1 Parent(s): e41d979

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +95 -56
app/mapper.py CHANGED
@@ -113,126 +113,165 @@ def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) ->
113
  return table_name
114
 
115
  # ---------- Main Canonify Function (WITH INDUSTRY DETECTION) ---------- #
 
 
116
  def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
117
  """
118
- Normalize, evolve schema dynamically, persist to canonical table, AND detect industry.
119
- Returns: (dataframe, industry_name, confidence_score)
 
 
 
 
120
  """
121
  load_dynamic_aliases()
122
  conn = get_conn(org_id)
123
  ensure_raw_table(conn)
124
 
125
- # 1) Pull raw rows safely
126
  try:
127
- rows = conn.execute("SELECT row_data FROM main.raw_rows WHERE row_data IS NOT NULL AND LENGTH(row_data) > 0").fetchall()
 
 
 
 
128
  except Exception as e:
129
  print(f"[canonify] SQL read error: {e}")
130
  rows = []
131
 
132
  if not rows:
133
- print("[canonify] no rows to process")
134
  return pd.DataFrame(), "unknown", 0.0
135
 
136
- # 2) Parse JSON safely
137
  parsed = []
138
  malformed_count = 0
 
139
  for r in rows:
140
  raw = r[0]
141
- if not raw or not isinstance(raw, str):
 
 
142
  malformed_count += 1
143
  continue
 
144
  try:
145
- obj = json.loads(raw)
 
 
 
 
 
146
  except Exception:
147
  malformed_count += 1
148
  continue
149
 
 
150
  if isinstance(obj, dict):
151
  if "rows" in obj and isinstance(obj["rows"], list):
152
  parsed.extend(obj["rows"])
153
  elif "data" in obj and isinstance(obj["data"], list):
154
  parsed.extend(obj["data"])
155
  elif "tables" in obj and isinstance(obj["tables"], dict):
156
- for t_rows in obj["tables"].values():
157
- if isinstance(t_rows, list):
158
- parsed.extend(t_rows)
 
159
  else:
 
160
  parsed.append(obj)
161
  elif isinstance(obj, list):
162
  parsed.extend(obj)
163
  else:
164
  malformed_count += 1
165
- continue
166
 
167
  if malformed_count:
168
- print(f"[canonify] skipped {malformed_count} malformed rows")
169
 
170
  if not parsed:
171
- print("[canonify] no valid parsed rows")
172
  return pd.DataFrame(), "unknown", 0.0
173
 
174
- # 3) Build DataFrame and normalize
175
- raw_df = pd.DataFrame(parsed)
176
- if raw_df.empty:
177
- print("[canonify] dataframe empty after parse")
 
 
178
  return pd.DataFrame(), "unknown", 0.0
179
 
180
- raw_df.columns = raw_df.columns.str.lower().str.strip()
 
 
 
181
  mapping = {}
182
  for canon, aliases in CANONICAL.items():
183
- for col in raw_df.columns:
184
- if any(a in col for a in aliases):
 
185
  mapping[col] = canon
186
  break
187
 
188
- # Learn dynamic aliases
189
- for col in raw_df.columns:
190
- if col not in sum(CANONICAL.values(), []):
191
  for canon in CANONICAL.keys():
192
- if canon in col and col not in CANONICAL[canon]:
193
  CANONICAL[canon].append(col)
 
194
  save_dynamic_aliases()
195
 
196
- renamed = raw_df.rename(columns=mapping)
197
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
198
  df = renamed[cols].copy() if cols else renamed.copy()
199
 
200
- # 4) Type conversions
201
- if "timestamp" in df:
202
- df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
203
- if "expiry_date" in df:
204
- df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
205
- if "promo_flag" in df:
206
- df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
207
- for col in ("qty", "total"):
208
- if col in df:
209
- df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
210
-
211
- # 5) βœ… DETECT INDUSTRY BEFORE INSERTING
212
- industry, confidence = detect_industry_from_df(df)
213
- print(f"[canonify] 🎯 Detected industry: {industry} (confidence: {confidence:.2%})")
 
 
214
 
215
- # 6) Dynamic schema evolution
216
  os.makedirs("./db", exist_ok=True)
217
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
218
 
219
  table_name = ensure_canonical_table(duck, df)
220
 
221
- # βœ… Safe insert with explicit column matching
222
- cols_str = ", ".join(df.columns)
223
- placeholders = ", ".join(["?"] * len(df.columns))
224
-
225
- try:
226
- duck.executemany(
227
- f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
228
- df.values.tolist()
229
- )
230
- print(f"[canonify] βœ… Inserted {len(df)} rows into {table_name}")
231
- except Exception as e:
232
- print(f"[canonify] ❌ Insert failed: {e}")
233
- raise
 
 
 
 
 
 
 
 
 
234
 
235
  duck.close()
236
- print(f"[canonify] βœ… canonical snapshot updated for {org_id}")
237
 
238
- return df, industry, confidence # βœ… Return all three values
 
113
  return table_name
114
 
115
  # ---------- Main Canonify Function (WITH INDUSTRY DETECTION) ---------- #
116
+ # app/mapper.py - FIX with bulletproof error handling
117
+
118
  def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
119
  """
120
+ Enterprise-grade normalization:
121
+ - Pulls raw audit data
122
+ - Safely parses JSON
123
+ - Auto-detects industry
124
+ - Dynamically evolves schema
125
+ - Returns (df, industry, confidence)
126
  """
127
  load_dynamic_aliases()
128
  conn = get_conn(org_id)
129
  ensure_raw_table(conn)
130
 
131
+ # βœ… SAFE: Handle both string and parsed objects from Redis
132
  try:
133
+ rows = conn.execute("""
134
+ SELECT row_data FROM main.raw_rows
135
+ WHERE row_data IS NOT NULL
136
+ AND LENGTH(CAST(row_data AS TEXT)) > 0
137
+ """).fetchall()
138
  except Exception as e:
139
  print(f"[canonify] SQL read error: {e}")
140
  rows = []
141
 
142
  if not rows:
143
+ print("[canonify] no audit rows found")
144
  return pd.DataFrame(), "unknown", 0.0
145
 
146
+ # βœ… SAFE: Parse JSON with type checking
147
  parsed = []
148
  malformed_count = 0
149
+
150
  for r in rows:
151
  raw = r[0]
152
+
153
+ # Handle both string and parsed object
154
+ if not raw:
155
  malformed_count += 1
156
  continue
157
+
158
  try:
159
+ # If it's already parsed (object), use it directly
160
+ if isinstance(raw, (dict, list)):
161
+ obj = raw
162
+ else:
163
+ # If it's a string, parse it
164
+ obj = json.loads(str(raw))
165
  except Exception:
166
  malformed_count += 1
167
  continue
168
 
169
+ # βœ… SAFE: Extract data from various payload shapes
170
  if isinstance(obj, dict):
171
  if "rows" in obj and isinstance(obj["rows"], list):
172
  parsed.extend(obj["rows"])
173
  elif "data" in obj and isinstance(obj["data"], list):
174
  parsed.extend(obj["data"])
175
  elif "tables" in obj and isinstance(obj["tables"], dict):
176
+ # Flatten multi-table into single list for canonical
177
+ for table_rows in obj["tables"].values():
178
+ if isinstance(table_rows, list):
179
+ parsed.extend(table_rows)
180
  else:
181
+ # Single record dict
182
  parsed.append(obj)
183
  elif isinstance(obj, list):
184
  parsed.extend(obj)
185
  else:
186
  malformed_count += 1
 
187
 
188
  if malformed_count:
189
+ print(f"[canonify] skipped {malformed_count} malformed audit rows")
190
 
191
  if not parsed:
192
+ print("[canonify] no valid data after parsing")
193
  return pd.DataFrame(), "unknown", 0.0
194
 
195
+ # βœ… Create DataFrame with SAFE column names
196
+ df = pd.DataFrame(parsed)
197
+
198
+ # Handle empty DataFrame
199
+ if df.empty:
200
+ print("[canonify] DataFrame is empty")
201
  return pd.DataFrame(), "unknown", 0.0
202
 
203
+ # βœ… FIX: Ensure all column names are strings
204
+ df.columns = df.columns.astype(str).str.lower().str.strip()
205
+
206
+ # βœ… Map columns to canonical names
207
  mapping = {}
208
  for canon, aliases in CANONICAL.items():
209
+ for col in df.columns:
210
+ # SAFE alias matching
211
+ if any(str(alias).lower() in str(col).lower() for alias in aliases):
212
  mapping[col] = canon
213
  break
214
 
215
+ # βœ… Learn new aliases dynamically
216
+ for col in df.columns:
217
+ if col not in [str(a).lower() for sublist in CANONICAL.values() for a in sublist]:
218
  for canon in CANONICAL.keys():
219
+ if str(canon).lower() in str(col).lower() and col not in CANONICAL[canon]:
220
  CANONICAL[canon].append(col)
221
+
222
  save_dynamic_aliases()
223
 
224
+ renamed = df.rename(columns=mapping)
225
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
226
  df = renamed[cols].copy() if cols else renamed.copy()
227
 
228
+ # βœ… Type conversions with error handling
229
+ try:
230
+ if "timestamp" in df:
231
+ df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
232
+ if "expiry_date" in df:
233
+ df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
234
+ if "promo_flag" in df:
235
+ df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
236
+ for col in ("qty", "total"):
237
+ if col in df:
238
+ df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
239
+ except Exception as e:
240
+ print(f"[canonify] Type conversion warning: {e}")
241
+
242
+ # βœ… FIX: Industry detection with actual column matching
243
+ industry, confidence = detect_industry(df)
244
 
245
+ # βœ… Dynamic schema evolution (NO MORE versioning)
246
  os.makedirs("./db", exist_ok=True)
247
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
248
 
249
  table_name = ensure_canonical_table(duck, df)
250
 
251
+ # βœ… SAFE: Explicit column matching to avoid order issues
252
+ if not df.empty:
253
+ # Get actual columns from table
254
+ table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
255
+ table_cols = [r[0] for r in table_info]
256
+
257
+ # Reorder df to match table
258
+ df = df.reindex(columns=[c for c in table_cols if c in df.columns], fill_value=None)
259
+
260
+ # Insert
261
+ cols_str = ", ".join(df.columns)
262
+ placeholders = ", ".join(["?"] * len(df.columns))
263
+
264
+ try:
265
+ duck.executemany(
266
+ f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
267
+ df.values.tolist()
268
+ )
269
+ print(f"[canonify] βœ… Inserted {len(df)} rows into {table_name}")
270
+ except Exception as e:
271
+ print(f"[canonify] ❌ Insert failed: {e}")
272
+ raise
273
 
274
  duck.close()
275
+ print(f"[canonify] βœ… Complete for {org_id}: {len(df)} rows, {industry} ({confidence:.1%})")
276
 
277
+ return df, industry, confidence