petermutwiri commited on
Commit
b6cca77
·
verified ·
1 Parent(s): 3e6d56f

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +84 -25
app/mapper.py CHANGED
@@ -121,52 +121,98 @@ def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
121
  print(f"[schema] ✅ reconciled {len(tables)} versions → canonical_latest")
122
 
123
  def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
 
 
 
 
 
124
  load_dynamic_aliases()
125
  conn = get_conn(org_id)
126
  ensure_raw_table(conn)
127
 
128
- # 1️⃣ optional timestamp filter (zero ? params)
129
- cutoff_str = ""
130
- if hours_window > 0:
131
- cutoff = datetime.utcnow() - timedelta(hours=hours_window)
132
- cutoff_str = cutoff.strftime('%Y-%m-%d %H:%M:%S')
133
-
134
- rows = sql(conn, f"""
135
- SELECT row_data
136
- FROM raw_rows
137
- WHERE row_data IS NOT NULL
138
- AND LENGTH(row_data) > 0
139
- {f"AND try_strptime(NULLIF(json_extract(row_data, '$.timestamp'), ''), '%Y-%m-%d %H:%M:%S') >= TIMESTAMP '{cutoff_str}'" if hours_window > 0 else ""}
140
- """)
141
 
142
  if not rows:
143
- print("[canonify] no rows")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  return pd.DataFrame()
145
 
146
- # 2️⃣ normalise
147
- raw = pd.DataFrame([json.loads(r[0]) for r in rows])
148
- raw.columns = safe_str_transform(raw.columns)
 
 
149
 
 
150
  mapping = {}
151
  for canon, aliases in CANONICAL.items():
152
- for col in raw.columns:
153
  if any(a in col for a in aliases):
154
  mapping[col] = canon
155
  break
156
 
157
- # dynamic aliases
158
- for col in raw.columns:
159
  if col not in sum(CANONICAL.values(), []):
160
  for canon in CANONICAL.keys():
161
  if canon in col and col not in CANONICAL[canon]:
162
  CANONICAL[canon].append(col)
163
  save_dynamic_aliases()
164
 
165
- renamed = raw.rename(columns=mapping)
166
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
167
  df = renamed[cols].copy() if cols else renamed.copy()
168
 
169
- # 3️⃣ datatype fixes
170
  if "timestamp" in df:
171
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
172
  if "expiry_date" in df:
@@ -177,15 +223,28 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
177
  if col in df:
178
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
179
 
180
- # 4️⃣ schema versioning & storage
181
  os.makedirs("./db", exist_ok=True)
182
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
183
 
184
  table_name = ensure_schema_version(duck, df)
 
185
  duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
186
- duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
 
 
 
 
 
 
 
 
 
 
 
 
187
  reconcile_latest_schema(duck)
188
  duck.close()
189
 
190
  print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
191
- return df
 
121
  print(f"[schema] ✅ reconciled {len(tables)} versions → canonical_latest")
122
 
123
  def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
124
+ """
125
+ Normalize, version, and persist canonical data snapshot for org_id.
126
+ This version pulls raw_rows as raw strings and parses JSON in Python so
127
+ malformed raw_rows don't crash the pipeline.
128
+ """
129
  load_dynamic_aliases()
130
  conn = get_conn(org_id)
131
  ensure_raw_table(conn)
132
 
133
+ # 1) pull raw strings from DB (no JSON parsing in SQL)
134
+ try:
135
+ rows = conn.execute("SELECT row_data FROM main.raw_rows WHERE row_data IS NOT NULL AND LENGTH(row_data) > 0").fetchall()
136
+ except Exception as e:
137
+ print(f"[canonify] SQL read error: {e}")
138
+ rows = []
 
 
 
 
 
 
 
139
 
140
  if not rows:
141
+ print("[canonify] no rows to process")
142
+ return pd.DataFrame()
143
+
144
+ # 2) parse json strings safely in Python, skip bad ones
145
+ parsed = []
146
+ malformed_count = 0
147
+ for r in rows:
148
+ raw = r[0]
149
+ if not raw or not isinstance(raw, str):
150
+ malformed_count += 1
151
+ continue
152
+ try:
153
+ obj = json.loads(raw)
154
+ except Exception:
155
+ # Maybe raw is a single-object (not list) or legacy shape;
156
+ # attempt best-effort: ignore empty or malformed
157
+ malformed_count += 1
158
+ continue
159
+
160
+ # If this is a wrapper like {"rows": [...]} or {"data": [...]} or {"tables": {...}}
161
+ if isinstance(obj, dict):
162
+ # prefer list under rows, data, or fallback to tables flatten
163
+ if "rows" in obj and isinstance(obj["rows"], list):
164
+ parsed.extend(obj["rows"])
165
+ elif "data" in obj and isinstance(obj["data"], list):
166
+ parsed.extend(obj["data"])
167
+ elif "tables" in obj and isinstance(obj["tables"], dict):
168
+ # flatten: append all rows from all tables (optional)
169
+ for t_rows in obj["tables"].values():
170
+ if isinstance(t_rows, list):
171
+ parsed.extend(t_rows)
172
+ else:
173
+ # maybe the dict itself represents a single record
174
+ parsed.append(obj)
175
+ elif isinstance(obj, list):
176
+ parsed.extend(obj)
177
+ else:
178
+ # unknown shape — skip
179
+ malformed_count += 1
180
+ continue
181
+
182
+ if malformed_count:
183
+ print(f"[canonify] skipped {malformed_count} malformed/unsupported raw_rows")
184
+
185
+ if not parsed:
186
+ print("[canonify] no valid parsed rows after filtering")
187
  return pd.DataFrame()
188
 
189
+ # 3) build DataFrame and normalize column names
190
+ raw_df = pd.DataFrame(parsed)
191
+ if raw_df.empty:
192
+ print("[canonify] dataframe empty after parse")
193
+ return pd.DataFrame()
194
 
195
+ raw_df.columns = raw_df.columns.str.lower().str.strip()
196
  mapping = {}
197
  for canon, aliases in CANONICAL.items():
198
+ for col in raw_df.columns:
199
  if any(a in col for a in aliases):
200
  mapping[col] = canon
201
  break
202
 
203
+ # learn dynamic aliases
204
+ for col in raw_df.columns:
205
  if col not in sum(CANONICAL.values(), []):
206
  for canon in CANONICAL.keys():
207
  if canon in col and col not in CANONICAL[canon]:
208
  CANONICAL[canon].append(col)
209
  save_dynamic_aliases()
210
 
211
+ renamed = raw_df.rename(columns=mapping)
212
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
213
  df = renamed[cols].copy() if cols else renamed.copy()
214
 
215
+ # datatype conversions
216
  if "timestamp" in df:
217
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
218
  if "expiry_date" in df:
 
223
  if col in df:
224
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
225
 
226
+ # 4) persist canonical snapshot (use safe schema-versioning)
227
  os.makedirs("./db", exist_ok=True)
228
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
229
 
230
  table_name = ensure_schema_version(duck, df)
231
+ # create table if not exists with the columns of df
232
  duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
233
+ # if table created above has no columns (rare), fallback to explicit column creation handled in ensure_schema_version
234
+ try:
235
+ duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
236
+ except Exception as e:
237
+ print(f"[canonify] insert error, retrying with explicit column checks: {e}")
238
+ # ensure columns exist individually
239
+ existing_cols = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()}
240
+ for col in df.columns:
241
+ if col.lower() not in existing_cols:
242
+ dtype = map_pandas_to_duck(col, df[col])
243
+ duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} {dtype}")
244
+ duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
245
+
246
  reconcile_latest_schema(duck)
247
  duck.close()
248
 
249
  print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
250
+ return df