petermutwiri commited on
Commit
8b47729
Β·
verified Β·
1 Parent(s): 085e727

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +88 -100
app/mapper.py CHANGED
@@ -1,11 +1,11 @@
1
- # mapper.py – production-hardened
2
  import os
3
  import json
4
  import duckdb
5
  import pandas as pd
6
  from datetime import datetime, timedelta
7
  from app.db import get_conn, ensure_raw_table
8
- from app.utils.detect_industry import _ALIAS
9
 
10
  # ---------------------- Canonical schema base ---------------------- #
11
  CANONICAL = {
@@ -18,6 +18,7 @@ CANONICAL = {
18
  "promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
19
  "expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
20
  }
 
21
  ALIAS_FILE = "./db/alias_memory.json"
22
 
23
  def map_pandas_to_duck(col: str, series: pd.Series) -> str:
@@ -26,7 +27,7 @@ def map_pandas_to_duck(col: str, series: pd.Series) -> str:
26
  if pd.api.types.is_float_dtype(series): return "DOUBLE"
27
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
28
  return "VARCHAR"
29
- # ---------- helpers ---------- #
30
  def safe_str_transform(s: pd.Series) -> pd.Series:
31
  if pd.api.types.is_string_dtype(s):
32
  return s.str.lower().str.strip()
@@ -42,8 +43,29 @@ def add_column_if_not_exists(duck: duckdb.DuckDBPyConnection, table: str, col: s
42
  duck.execute(f"ALTER TABLE {table} ADD COLUMN {col} {dtype}")
43
  print(f"[schema] βž• added {col}:{dtype} to {table}")
44
 
45
-
46
- # ---------- alias memory ---------- #
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  def load_dynamic_aliases() -> None:
48
  if os.path.exists(ALIAS_FILE):
49
  try:
@@ -62,75 +84,45 @@ def save_dynamic_aliases() -> None:
62
  with open(ALIAS_FILE, "w") as f:
63
  json.dump(CANONICAL, f, indent=2)
64
 
65
- # ---------- schema versioning ---------- #
66
- def ensure_schema_version(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
67
- duck.execute("CREATE SCHEMA IF NOT EXISTS main")
68
-
69
- # versioning metadata
70
- duck.execute("""
71
- CREATE TABLE IF NOT EXISTS main.schema_versions (
72
- version INTEGER PRIMARY KEY,
73
- columns JSON,
74
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 
 
 
75
  )
76
  """)
77
-
78
- new_signature = sorted(df.columns.tolist())
79
- latest = duck.execute(
80
- "SELECT version, columns FROM main.schema_versions ORDER BY version DESC LIMIT 1"
81
- ).fetchone()
82
-
83
- if latest is None:
84
- version = 1
85
- else:
86
- latest_cols = sorted(json.loads(latest[1]))
87
- if latest_cols == new_signature:
88
- # βœ… schema unchanged β†’ reuse existing table
89
- return f"main.canonical_v{latest[0]}"
90
- version = latest[0] + 1
91
-
92
- # βœ… record new schema version
93
- duck.execute(
94
- "INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
95
- (version, json.dumps(new_signature))
96
- )
97
-
98
- table = f"main.canonical_v{version}"
99
-
100
- # βœ… create new table with ALL columns directly (safe)
101
- col_defs = []
102
  for col in df.columns:
103
- dtype = map_pandas_to_duck(col, df[col])
104
- col_defs.append(f"{col} {dtype}")
105
-
106
- duck.execute(f"CREATE TABLE {table} ({', '.join(col_defs)})")
107
-
108
- print(f"[schema] βœ… created {table}")
109
- return table
110
-
111
-
112
- def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
113
- tables = [r[0] for r in duck.execute("""
114
- SELECT table_name FROM information_schema.tables
115
- WHERE table_name LIKE 'canonical_v%'
116
- """).fetchall()]
117
- if not tables:
118
- return
119
- union_query = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
120
- duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
121
- print(f"[schema] βœ… reconciled {len(tables)} versions β†’ canonical_latest")
122
-
123
- def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
124
  """
125
- Normalize, version, and persist canonical data snapshot for org_id.
126
- This version pulls raw_rows as raw strings and parses JSON in Python so
127
- malformed raw_rows don't crash the pipeline.
128
  """
129
  load_dynamic_aliases()
130
  conn = get_conn(org_id)
131
  ensure_raw_table(conn)
132
 
133
- # 1) pull raw strings from DB (no JSON parsing in SQL)
134
  try:
135
  rows = conn.execute("SELECT row_data FROM main.raw_rows WHERE row_data IS NOT NULL AND LENGTH(row_data) > 0").fetchall()
136
  except Exception as e:
@@ -139,9 +131,9 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
139
 
140
  if not rows:
141
  print("[canonify] no rows to process")
142
- return pd.DataFrame()
143
 
144
- # 2) parse json strings safely in Python, skip bad ones
145
  parsed = []
146
  malformed_count = 0
147
  for r in rows:
@@ -152,45 +144,38 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
152
  try:
153
  obj = json.loads(raw)
154
  except Exception:
155
- # Maybe raw is a single-object (not list) or legacy shape;
156
- # attempt best-effort: ignore empty or malformed
157
  malformed_count += 1
158
  continue
159
 
160
- # If this is a wrapper like {"rows": [...]} or {"data": [...]} or {"tables": {...}}
161
  if isinstance(obj, dict):
162
- # prefer list under rows, data, or fallback to tables flatten
163
  if "rows" in obj and isinstance(obj["rows"], list):
164
  parsed.extend(obj["rows"])
165
  elif "data" in obj and isinstance(obj["data"], list):
166
  parsed.extend(obj["data"])
167
  elif "tables" in obj and isinstance(obj["tables"], dict):
168
- # flatten: append all rows from all tables (optional)
169
  for t_rows in obj["tables"].values():
170
  if isinstance(t_rows, list):
171
  parsed.extend(t_rows)
172
  else:
173
- # maybe the dict itself represents a single record
174
  parsed.append(obj)
175
  elif isinstance(obj, list):
176
  parsed.extend(obj)
177
  else:
178
- # unknown shape β€” skip
179
  malformed_count += 1
180
  continue
181
 
182
  if malformed_count:
183
- print(f"[canonify] skipped {malformed_count} malformed/unsupported raw_rows")
184
 
185
  if not parsed:
186
- print("[canonify] no valid parsed rows after filtering")
187
- return pd.DataFrame()
188
 
189
- # 3) build DataFrame and normalize column names
190
  raw_df = pd.DataFrame(parsed)
191
  if raw_df.empty:
192
  print("[canonify] dataframe empty after parse")
193
- return pd.DataFrame()
194
 
195
  raw_df.columns = raw_df.columns.str.lower().str.strip()
196
  mapping = {}
@@ -200,7 +185,7 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
200
  mapping[col] = canon
201
  break
202
 
203
- # learn dynamic aliases
204
  for col in raw_df.columns:
205
  if col not in sum(CANONICAL.values(), []):
206
  for canon in CANONICAL.keys():
@@ -212,7 +197,7 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
212
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
213
  df = renamed[cols].copy() if cols else renamed.copy()
214
 
215
- # datatype conversions
216
  if "timestamp" in df:
217
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
218
  if "expiry_date" in df:
@@ -223,28 +208,31 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
223
  if col in df:
224
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
225
 
226
- # 4) persist canonical snapshot (use safe schema-versioning)
 
 
 
 
227
  os.makedirs("./db", exist_ok=True)
228
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
229
-
230
- table_name = ensure_schema_version(duck, df)
231
- # create table if not exists with the columns of df
232
- duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
233
- # if table created above has no columns (rare), fallback to explicit column creation handled in ensure_schema_version
 
 
234
  try:
235
- duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
 
 
 
 
236
  except Exception as e:
237
- print(f"[canonify] insert error, retrying with explicit column checks: {e}")
238
- # ensure columns exist individually
239
- existing_cols = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()}
240
- for col in df.columns:
241
- if col.lower() not in existing_cols:
242
- dtype = map_pandas_to_duck(col, df[col])
243
- duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} {dtype}")
244
- duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
245
-
246
- reconcile_latest_schema(duck)
247
- duck.close()
248
 
 
249
  print(f"[canonify] βœ… canonical snapshot updated for {org_id}")
250
- return df
 
 
1
+ # app/mapper.py – FIXED WITH INDUSTRY DETECTION
2
  import os
3
  import json
4
  import duckdb
5
  import pandas as pd
6
  from datetime import datetime, timedelta
7
  from app.db import get_conn, ensure_raw_table
8
+ from app.utils.detect_industry import _ALIAS # βœ… RESTORED
9
 
10
  # ---------------------- Canonical schema base ---------------------- #
11
  CANONICAL = {
 
18
  "promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
19
  "expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
20
  }
21
+
22
  ALIAS_FILE = "./db/alias_memory.json"
23
 
24
  def map_pandas_to_duck(col: str, series: pd.Series) -> str:
 
27
  if pd.api.types.is_float_dtype(series): return "DOUBLE"
28
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
29
  return "VARCHAR"
30
+
31
  def safe_str_transform(s: pd.Series) -> pd.Series:
32
  if pd.api.types.is_string_dtype(s):
33
  return s.str.lower().str.strip()
 
43
  duck.execute(f"ALTER TABLE {table} ADD COLUMN {col} {dtype}")
44
  print(f"[schema] βž• added {col}:{dtype} to {table}")
45
 
46
+ # ---------- INDUSTRY DETECTION INTEGRATION ---------- #
47
+ def detect_industry_from_df(df: pd.DataFrame) -> tuple[str, float]:
48
+ """
49
+ Auto-detect industry based on column patterns using _ALIAS.
50
+ Returns: (industry_name, confidence_score)
51
+ """
52
+ if df.empty:
53
+ return "unknown", 0.0
54
+
55
+ cols = set(df.columns.str.lower())
56
+ scores = {}
57
+
58
+ for industry, aliases in _ALIAS.items():
59
+ matches = sum(1 for alias in aliases if any(alias in col for col in cols))
60
+ scores[industry] = min(matches / len(aliases), 1.0) if aliases else 0
61
+
62
+ # Get best match
63
+ best_industry = max(scores, key=scores.get)
64
+ confidence = scores[best_industry]
65
+
66
+ return best_industry, confidence
67
+
68
+ # ---------- Alias Memory (no changes) ---------- #
69
  def load_dynamic_aliases() -> None:
70
  if os.path.exists(ALIAS_FILE):
71
  try:
 
84
  with open(ALIAS_FILE, "w") as f:
85
  json.dump(CANONICAL, f, indent=2)
86
 
87
+ # ---------- Dynamic Schema Evolution ---------- #
88
+ def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
89
+ """
90
+ Single canonical table that evolves dynamically.
91
+ Adds missing columns on-the-fly without creating new versions.
92
+ """
93
+ table_name = "main.canonical"
94
+
95
+ # Ensure base table exists
96
+ duck.execute(f"""
97
+ CREATE TABLE IF NOT EXISTS {table_name} (
98
+ id UUID DEFAULT uuid(),
99
+ _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
100
  )
101
  """)
102
+
103
+ # Get existing columns
104
+ existing_cols = {r[0].lower() for r in duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()}
105
+
106
+ # Add missing columns dynamically
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  for col in df.columns:
108
+ if col.lower() not in existing_cols:
109
+ dtype = map_pandas_to_duck(col, df[col])
110
+ print(f"[mapper] βž• Adding new column '{col}:{dtype}' to {table_name}")
111
+ duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} {dtype}")
112
+
113
+ return table_name
114
+
115
+ # ---------- Main Canonify Function (WITH INDUSTRY DETECTION) ---------- #
116
+ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
 
 
 
 
 
 
 
 
 
 
 
 
117
  """
118
+ Normalize, evolve schema dynamically, persist to canonical table, AND detect industry.
119
+ Returns: (dataframe, industry_name, confidence_score)
 
120
  """
121
  load_dynamic_aliases()
122
  conn = get_conn(org_id)
123
  ensure_raw_table(conn)
124
 
125
+ # 1) Pull raw rows safely
126
  try:
127
  rows = conn.execute("SELECT row_data FROM main.raw_rows WHERE row_data IS NOT NULL AND LENGTH(row_data) > 0").fetchall()
128
  except Exception as e:
 
131
 
132
  if not rows:
133
  print("[canonify] no rows to process")
134
+ return pd.DataFrame(), "unknown", 0.0
135
 
136
+ # 2) Parse JSON safely
137
  parsed = []
138
  malformed_count = 0
139
  for r in rows:
 
144
  try:
145
  obj = json.loads(raw)
146
  except Exception:
 
 
147
  malformed_count += 1
148
  continue
149
 
 
150
  if isinstance(obj, dict):
 
151
  if "rows" in obj and isinstance(obj["rows"], list):
152
  parsed.extend(obj["rows"])
153
  elif "data" in obj and isinstance(obj["data"], list):
154
  parsed.extend(obj["data"])
155
  elif "tables" in obj and isinstance(obj["tables"], dict):
 
156
  for t_rows in obj["tables"].values():
157
  if isinstance(t_rows, list):
158
  parsed.extend(t_rows)
159
  else:
 
160
  parsed.append(obj)
161
  elif isinstance(obj, list):
162
  parsed.extend(obj)
163
  else:
 
164
  malformed_count += 1
165
  continue
166
 
167
  if malformed_count:
168
+ print(f"[canonify] skipped {malformed_count} malformed rows")
169
 
170
  if not parsed:
171
+ print("[canonify] no valid parsed rows")
172
+ return pd.DataFrame(), "unknown", 0.0
173
 
174
+ # 3) Build DataFrame and normalize
175
  raw_df = pd.DataFrame(parsed)
176
  if raw_df.empty:
177
  print("[canonify] dataframe empty after parse")
178
+ return pd.DataFrame(), "unknown", 0.0
179
 
180
  raw_df.columns = raw_df.columns.str.lower().str.strip()
181
  mapping = {}
 
185
  mapping[col] = canon
186
  break
187
 
188
+ # Learn dynamic aliases
189
  for col in raw_df.columns:
190
  if col not in sum(CANONICAL.values(), []):
191
  for canon in CANONICAL.keys():
 
197
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
198
  df = renamed[cols].copy() if cols else renamed.copy()
199
 
200
+ # 4) Type conversions
201
  if "timestamp" in df:
202
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
203
  if "expiry_date" in df:
 
208
  if col in df:
209
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
210
 
211
+ # 5) βœ… DETECT INDUSTRY BEFORE INSERTING
212
+ industry, confidence = detect_industry_from_df(df)
213
+ print(f"[canonify] 🎯 Detected industry: {industry} (confidence: {confidence:.2%})")
214
+
215
+ # 6) Dynamic schema evolution
216
  os.makedirs("./db", exist_ok=True)
217
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
218
+
219
+ table_name = ensure_canonical_table(duck, df)
220
+
221
+ # βœ… Safe insert with explicit column matching
222
+ cols_str = ", ".join(df.columns)
223
+ placeholders = ", ".join(["?"] * len(df.columns))
224
+
225
  try:
226
+ duck.executemany(
227
+ f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
228
+ df.values.tolist()
229
+ )
230
+ print(f"[canonify] βœ… Inserted {len(df)} rows into {table_name}")
231
  except Exception as e:
232
+ print(f"[canonify] ❌ Insert failed: {e}")
233
+ raise
 
 
 
 
 
 
 
 
 
234
 
235
+ duck.close()
236
  print(f"[canonify] βœ… canonical snapshot updated for {org_id}")
237
+
238
+ return df, industry, confidence # βœ… Return all three values