petermutwiri commited on
Commit
b3e7f14
·
verified ·
1 Parent(s): 2d13919

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +46 -76
app/mapper.py CHANGED
@@ -1,9 +1,12 @@
1
- import os, json, duckdb, pandas as pd
 
 
 
 
2
  from datetime import datetime
3
  from app.db import get_conn, ensure_raw_table
4
  from app.utils.detect_industry import _ALIAS
5
 
6
-
7
  # ---------------------- Canonical schema base ---------------------- #
8
  CANONICAL = {
9
  "timestamp": ["timestamp", "date", "sale_date", "created_at"],
@@ -15,45 +18,44 @@ CANONICAL = {
15
  "promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
16
  "expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
17
  }
18
-
19
  ALIAS_FILE = "./db/alias_memory.json"
20
 
21
- def safe_str_transform(series: pd.Series) -> pd.Series:
22
- """Apply .str.lower() & .str.strip() only if dtype is object/string."""
23
- if pd.api.types.is_string_dtype(series):
24
- return series.str.lower().str.strip()
25
- return series
26
- # ---------------------- Alias memory helpers ---------------------- #
 
 
 
 
 
 
 
 
 
 
27
  def load_dynamic_aliases() -> None:
28
- """Load learned aliases and merge into CANONICAL."""
29
  if os.path.exists(ALIAS_FILE):
30
  try:
31
  with open(ALIAS_FILE) as f:
32
  dynamic_aliases = json.load(f)
33
  for k, v in dynamic_aliases.items():
34
  if k in CANONICAL:
35
- for alias in v:
36
- if alias not in CANONICAL[k]:
37
- CANONICAL[k].append(alias)
38
  else:
39
  CANONICAL[k] = v
40
  except Exception as e:
41
  print(f"[mapper] ⚠️ failed to load alias memory: {e}")
42
 
43
-
44
  def save_dynamic_aliases() -> None:
45
- """Persist learned aliases for next runs."""
46
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
47
  with open(ALIAS_FILE, "w") as f:
48
  json.dump(CANONICAL, f, indent=2)
49
 
50
-
51
- # ---------------------- Schema versioning helpers ---------------------- #
52
- def ensure_schema_version(duck, df: pd.DataFrame) -> str:
53
- """
54
- Ensure schema versioning and track evolution.
55
- Returns the active canonical table name (e.g., main.canonical_v2).
56
- """
57
  duck.execute("CREATE SCHEMA IF NOT EXISTS main")
58
  duck.execute("""
59
  CREATE TABLE IF NOT EXISTS main.schema_versions (
@@ -62,7 +64,6 @@ def ensure_schema_version(duck, df: pd.DataFrame) -> str:
62
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
63
  )
64
  """)
65
-
66
  latest = duck.execute("SELECT * FROM main.schema_versions ORDER BY version DESC LIMIT 1").fetchone()
67
  new_signature = sorted(df.columns.tolist())
68
 
@@ -70,81 +71,55 @@ def ensure_schema_version(duck, df: pd.DataFrame) -> str:
70
  latest_cols = sorted(json.loads(latest[1]))
71
  if latest_cols == new_signature:
72
  return f"main.canonical_v{latest[0]}"
73
- else:
74
- new_version = latest[0] + 1
75
- duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
76
- (new_version, json.dumps(new_signature)))
77
- print(f"[schema] → new version detected: canonical_v{new_version}")
78
- return f"main.canonical_v{new_version}"
79
  else:
80
- duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
81
- (1, json.dumps(new_signature)))
82
- print("[schema] → initialized canonical_v1")
83
- return "main.canonical_v1"
84
-
85
 
86
- def reconcile_latest_schema(duck):
87
- """
88
- Merge all canonical_v* tables into main.canonical_latest
89
- preserving new columns and filling missing values with NULL.
90
- """
91
  tables = [r[0] for r in duck.execute("""
92
  SELECT table_name FROM information_schema.tables
93
  WHERE table_name LIKE 'canonical_v%'
94
  """).fetchall()]
95
  if not tables:
96
  return
97
-
98
  union_query = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
99
  duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
100
- print(f"[schema] ✅ reconciled {len(tables)} schema versions → canonical_latest")
101
-
102
 
103
  # ---------------------- Canonify core logic ---------------------- #
104
  def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
105
- """
106
- Normalize, version, and persist canonical data snapshot for org_id.
107
- """
108
  load_dynamic_aliases()
109
  conn = get_conn(org_id)
110
  ensure_raw_table(conn)
111
 
112
- # --------------------------
113
- # ⏱ Safe timestamp filtering
114
- # --------------------------
115
- try:
116
- rows = conn.execute(
117
- """
118
- SELECT row_data
119
- FROM raw_rows
120
- WHERE strptime(json_extract(row_data, '$.timestamp'), '%Y-%m-%d %H:%M:%S')
121
- >= now() - INTERVAL ? HOUR
122
- """,
123
- (hours_window,)
124
- ).fetchall()
125
- except Exception as e:
126
- print(f"[canonify] ⚠️ fallback to all rows due to timestamp parse error: {e}")
127
- rows = conn.execute("SELECT row_data FROM raw_rows").fetchall()
128
 
129
  if not rows:
130
- print("[canonify] no rows to process")
131
  return pd.DataFrame()
132
 
133
- # --------------------------
134
- # 🧩 DataFrame normalization
135
- # --------------------------
136
  raw = pd.DataFrame([json.loads(r[0]) for r in rows])
137
  raw.columns = safe_str_transform(raw.columns)
138
 
139
- # Flexible alias mapping
140
  mapping = {}
141
  for canon, aliases in CANONICAL.items():
142
  for col in raw.columns:
143
  if any(a in col for a in aliases):
144
  mapping[col] = canon
145
  break
146
-
147
- # 🧠 Learn new aliases dynamically
148
  for col in raw.columns:
149
  if col not in sum(CANONICAL.values(), []):
150
  for canon in CANONICAL.keys():
@@ -152,12 +127,11 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
152
  CANONICAL[canon].append(col)
153
  save_dynamic_aliases()
154
 
155
- # Apply canonical renaming
156
  renamed = raw.rename(columns=mapping)
157
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
158
  df = renamed[cols].copy() if cols else renamed.copy()
159
 
160
- # 🔢 Normalize datatypes
161
  if "timestamp" in df:
162
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
163
  if "expiry_date" in df:
@@ -168,19 +142,15 @@ def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
168
  if col in df:
169
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
170
 
171
- # --------------------------
172
- # 🪣 Schema versioning + storage
173
- # --------------------------
174
  os.makedirs("./db", exist_ok=True)
175
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
176
 
177
  table_name = ensure_schema_version(duck, df)
178
  duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
179
  duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
180
-
181
- # 🧩 Always refresh canonical_latest for unified analytics
182
  reconcile_latest_schema(duck)
183
  duck.close()
184
 
185
  print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
186
- return df
 
1
+ # mapper.py – production-hardened
2
+ import os
3
+ import json
4
+ import duckdb
5
+ import pandas as pd
6
  from datetime import datetime
7
  from app.db import get_conn, ensure_raw_table
8
  from app.utils.detect_industry import _ALIAS
9
 
 
10
  # ---------------------- Canonical schema base ---------------------- #
11
  CANONICAL = {
12
  "timestamp": ["timestamp", "date", "sale_date", "created_at"],
 
18
  "promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
19
  "expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
20
  }
 
21
  ALIAS_FILE = "./db/alias_memory.json"
22
 
23
+ # ---------- helpers ---------- #
24
+ def safe_str_transform(s: pd.Series) -> pd.Series:
25
+ if pd.api.types.is_string_dtype(s):
26
+ return s.str.lower().str.strip()
27
+ return s
28
+
29
+ def sql(conn, stmt: str, *args):
30
+ """Centralised parameter binding → no more int-vs-tuple mistakes."""
31
+ return conn.execute(stmt, args).fetchall()
32
+
33
+ def add_column_if_not_exists(conn, table: str, col: str, dtype: str) -> None:
34
+ cols = {c[0] for c in conn.execute(f"DESCRIBE {table}").fetchall()}
35
+ if col.lower() not in cols:
36
+ conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {dtype}")
37
+
38
+ # ---------- alias memory ---------- #
39
  def load_dynamic_aliases() -> None:
 
40
  if os.path.exists(ALIAS_FILE):
41
  try:
42
  with open(ALIAS_FILE) as f:
43
  dynamic_aliases = json.load(f)
44
  for k, v in dynamic_aliases.items():
45
  if k in CANONICAL:
46
+ CANONICAL[k].extend([a for a in v if a not in CANONICAL[k]])
 
 
47
  else:
48
  CANONICAL[k] = v
49
  except Exception as e:
50
  print(f"[mapper] ⚠️ failed to load alias memory: {e}")
51
 
 
52
  def save_dynamic_aliases() -> None:
 
53
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
54
  with open(ALIAS_FILE, "w") as f:
55
  json.dump(CANONICAL, f, indent=2)
56
 
57
+ # ---------- schema versioning ---------- #
58
+ def ensure_schema_version(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
 
 
 
 
 
59
  duck.execute("CREATE SCHEMA IF NOT EXISTS main")
60
  duck.execute("""
61
  CREATE TABLE IF NOT EXISTS main.schema_versions (
 
64
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
65
  )
66
  """)
 
67
  latest = duck.execute("SELECT * FROM main.schema_versions ORDER BY version DESC LIMIT 1").fetchone()
68
  new_signature = sorted(df.columns.tolist())
69
 
 
71
  latest_cols = sorted(json.loads(latest[1]))
72
  if latest_cols == new_signature:
73
  return f"main.canonical_v{latest[0]}"
74
+ new_version = latest[0] + 1
 
 
 
 
 
75
  else:
76
+ new_version = 1
77
+ duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
78
+ (new_version, json.dumps(new_signature)))
79
+ print(f"[schema] → canonical_v{new_version}")
80
+ return f"main.canonical_v{new_version}"
81
 
82
+ def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
 
 
 
 
83
  tables = [r[0] for r in duck.execute("""
84
  SELECT table_name FROM information_schema.tables
85
  WHERE table_name LIKE 'canonical_v%'
86
  """).fetchall()]
87
  if not tables:
88
  return
 
89
  union_query = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
90
  duck.execute("CREATE OR REPLACE TABLE main.canonical_latest AS " + union_query)
91
+ print(f"[schema] ✅ reconciled {len(tables)} versions → canonical_latest")
 
92
 
93
  # ---------------------- Canonify core logic ---------------------- #
94
  def canonify_df(org_id: str, hours_window: int = 24) -> pd.DataFrame:
 
 
 
95
  load_dynamic_aliases()
96
  conn = get_conn(org_id)
97
  ensure_raw_table(conn)
98
 
99
+ # 1️⃣ bullet-proof timestamp filter
100
+ rows = sql(conn, """
101
+ SELECT row_data
102
+ FROM raw_rows
103
+ WHERE try_strptime(NULLIF(json_extract(row_data, '$.timestamp'), ''),
104
+ '%Y-%m-%d %H:%M:%S')
105
+ >= now() - INTERVAL ? HOUR
106
+ """, hours_window)
 
 
 
 
 
 
 
 
107
 
108
  if not rows:
109
+ print("[canonify] no rows")
110
  return pd.DataFrame()
111
 
112
+ # 2️⃣ normalise
 
 
113
  raw = pd.DataFrame([json.loads(r[0]) for r in rows])
114
  raw.columns = safe_str_transform(raw.columns)
115
 
 
116
  mapping = {}
117
  for canon, aliases in CANONICAL.items():
118
  for col in raw.columns:
119
  if any(a in col for a in aliases):
120
  mapping[col] = canon
121
  break
122
+ # dynamic aliases
 
123
  for col in raw.columns:
124
  if col not in sum(CANONICAL.values(), []):
125
  for canon in CANONICAL.keys():
 
127
  CANONICAL[canon].append(col)
128
  save_dynamic_aliases()
129
 
 
130
  renamed = raw.rename(columns=mapping)
131
  cols = [c for c in CANONICAL.keys() if c in renamed.columns]
132
  df = renamed[cols].copy() if cols else renamed.copy()
133
 
134
+ # 3️⃣ datatype fixes
135
  if "timestamp" in df:
136
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
137
  if "expiry_date" in df:
 
142
  if col in df:
143
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
144
 
145
+ # 4️⃣ schema versioning & storage
 
 
146
  os.makedirs("./db", exist_ok=True)
147
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
148
 
149
  table_name = ensure_schema_version(duck, df)
150
  duck.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM df LIMIT 0")
151
  duck.execute(f"INSERT INTO {table_name} SELECT * FROM df")
 
 
152
  reconcile_latest_schema(duck)
153
  duck.close()
154
 
155
  print(f"[canonify] ✅ canonical snapshot updated for {org_id}")
156
+ return df