petermutwiri commited on
Commit
015dbe2
Β·
verified Β·
1 Parent(s): 19419b3

Update app/mapper.py

Browse files
Files changed (1) hide show
  1. app/mapper.py +29 -13
app/mapper.py CHANGED
@@ -65,6 +65,8 @@ def save_dynamic_aliases() -> None:
65
  # ---------- schema versioning ---------- #
66
  def ensure_schema_version(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
67
  duck.execute("CREATE SCHEMA IF NOT EXISTS main")
 
 
68
  duck.execute("""
69
  CREATE TABLE IF NOT EXISTS main.schema_versions (
70
  version INTEGER PRIMARY KEY,
@@ -72,26 +74,40 @@ def ensure_schema_version(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) ->
72
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
73
  )
74
  """)
75
- latest = duck.execute("SELECT * FROM main.schema_versions ORDER BY version DESC LIMIT 1").fetchone()
76
  new_signature = sorted(df.columns.tolist())
 
 
 
77
 
78
- if latest:
 
 
79
  latest_cols = sorted(json.loads(latest[1]))
80
  if latest_cols == new_signature:
 
81
  return f"main.canonical_v{latest[0]}"
82
- new_version = latest[0] + 1
83
- else:
84
- new_version = 1
85
- duck.execute("INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
86
- (new_version, json.dumps(new_signature)))
87
- print(f"[schema] β†’ canonical_v{new_version}")
88
- table_name = f"main.canonical_v{new_version}"
 
 
 
 
 
89
  for col in df.columns:
90
- col_lower = col.lower()
91
  dtype = map_pandas_to_duck(col, df[col])
92
- if col_lower not in {c.lower() for c in duck.execute(f"PRAGMA table_info('{table_name}')").df()['name']}:
93
- duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} {dtype}")
94
- print(f"[schema] βž• added {col}:{dtype} to {table_name}")
 
 
 
 
95
 
96
  def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
97
  tables = [r[0] for r in duck.execute("""
 
65
  # ---------- schema versioning ---------- #
66
  def ensure_schema_version(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
67
  duck.execute("CREATE SCHEMA IF NOT EXISTS main")
68
+
69
+ # versioning metadata
70
  duck.execute("""
71
  CREATE TABLE IF NOT EXISTS main.schema_versions (
72
  version INTEGER PRIMARY KEY,
 
74
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
75
  )
76
  """)
77
+
78
  new_signature = sorted(df.columns.tolist())
79
+ latest = duck.execute(
80
+ "SELECT version, columns FROM main.schema_versions ORDER BY version DESC LIMIT 1"
81
+ ).fetchone()
82
 
83
+ if latest is None:
84
+ version = 1
85
+ else:
86
  latest_cols = sorted(json.loads(latest[1]))
87
  if latest_cols == new_signature:
88
+ # βœ… schema unchanged β†’ reuse existing table
89
  return f"main.canonical_v{latest[0]}"
90
+ version = latest[0] + 1
91
+
92
+ # βœ… record new schema version
93
+ duck.execute(
94
+ "INSERT INTO main.schema_versions (version, columns) VALUES (?, ?)",
95
+ (version, json.dumps(new_signature))
96
+ )
97
+
98
+ table = f"main.canonical_v{version}"
99
+
100
+ # βœ… create new table with ALL columns directly (safe)
101
+ col_defs = []
102
  for col in df.columns:
 
103
  dtype = map_pandas_to_duck(col, df[col])
104
+ col_defs.append(f"{col} {dtype}")
105
+
106
+ duck.execute(f"CREATE TABLE {table} ({', '.join(col_defs)})")
107
+
108
+ print(f"[schema] βœ… created {table}")
109
+ return table
110
+
111
 
112
  def reconcile_latest_schema(duck: duckdb.DuckDBPyConnection) -> None:
113
  tables = [r[0] for r in duck.execute("""