Peter Mutwiri commited on
Commit
cbd7757
·
1 Parent(s): aaab8f4

added legacy schema versioning

Browse files
Files changed (2) hide show
  1. app/db.py +10 -12
  2. app/mapper.py +9 -15
app/db.py CHANGED
@@ -108,21 +108,13 @@ def ensure_raw_table(conn: duckdb.DuckDBPyConnection):
108
  def ensure_schema_versions_table(conn: duckdb.DuckDBPyConnection):
109
  """
110
  Tracks schema evolution for each entity table.
111
- Enables rollback, audit, and cross-org schema drift monitoring.
112
-
113
- Table: main.schema_versions
114
- - version_id: Auto-incrementing schema version
115
- - table_name: Target entity table (e.g., 'sales_canonical')
116
- - schema_json: {"column": "dtype", ...}
117
- - created_at: When version was detected
118
- - applied_at: When version was successfully applied
119
- - status: pending | applied | failed
120
- - rows_at_migration: Row count snapshot for migration tracking
121
  """
122
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
 
123
  conn.execute("""
124
  CREATE TABLE IF NOT EXISTS main.schema_versions (
125
- version_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
126
  table_name VARCHAR NOT NULL,
127
  schema_json JSON NOT NULL,
128
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
@@ -131,7 +123,13 @@ def ensure_schema_versions_table(conn: duckdb.DuckDBPyConnection):
131
  rows_at_migration BIGINT
132
  )
133
  """)
134
-
 
 
 
 
 
 
135
 
136
  def infer_duckdb_type(value: Any) -> str:
137
  """
 
108
  def ensure_schema_versions_table(conn: duckdb.DuckDBPyConnection):
109
  """
110
  Tracks schema evolution for each entity table.
111
+ Compatible with DuckDB 0.10.3 constraint limitations.
 
 
 
 
 
 
 
 
 
112
  """
113
  conn.execute("CREATE SCHEMA IF NOT EXISTS main")
114
+ # Use legacy SERIAL syntax instead of IDENTITY
115
  conn.execute("""
116
  CREATE TABLE IF NOT EXISTS main.schema_versions (
117
+ version_id BIGINT PRIMARY KEY,
118
  table_name VARCHAR NOT NULL,
119
  schema_json JSON NOT NULL,
120
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
 
123
  rows_at_migration BIGINT
124
  )
125
  """)
126
+
127
+ # Create sequence if it doesn't exist (for manual auto-increment)
128
+ conn.execute("""
129
+ CREATE SEQUENCE IF NOT EXISTS schema_version_seq
130
+ START WITH 1
131
+ INCREMENT BY 1
132
+ """)
133
 
134
  def infer_duckdb_type(value: Any) -> str:
135
  """
app/mapper.py CHANGED
@@ -417,14 +417,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
417
  industry_confidence = industry_info["confidence"]
418
  print(f"[canonify] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})")
419
 
420
- # 7️⃣ ENFORCE SCHEMA CONTRACT
421
- try:
422
- enforce_schema_contract(df, org_id)
423
- except ValueError as e:
424
- print(f"[canonify] ❌ Schema contract violated: {e}")
425
- # For now, log and continue; in strict mode, raise HTTPException
426
-
427
- # 8️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
428
  os.makedirs("./db", exist_ok=True)
429
 
430
  with transactional_conn(org_id) as duck:
@@ -445,11 +438,14 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
445
 
446
  version_id = None
447
  if is_new_schema:
 
448
  version_id = duck.execute("""
449
- INSERT INTO main.schema_versions (table_name, schema_json, status)
450
- VALUES (?, ?, 'pending')
451
- """, (f"{entity_type}_canonical", json.dumps(current_schema)))
452
- version_id = duck.execute("SELECT last_insert_rowid()").fetchone()[0]
 
 
453
 
454
  # 8b) Ensure table exists with current schema
455
  table_name = ensure_canonical_table(duck, df, entity_type)
@@ -472,10 +468,8 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
472
  print(f"[canonify] 💾 Inserted {len(df_to_insert)} rows into {table_name}")
473
 
474
  # 8d) Mark schema as applied post-insert
475
- # 8d) Mark schema as applied post-insert (DuckDB-safe)
476
  if is_new_schema and version_id:
477
  try:
478
- # Use simple UPDATE without complex constraints
479
  duck.execute("""
480
  UPDATE main.schema_versions
481
  SET applied_at = CURRENT_TIMESTAMP, status = 'applied'
@@ -484,7 +478,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
484
  print(f"[canonify] ✅ Schema v{version_id} marked as applied")
485
  except Exception as e:
486
  print(f"[canonify] ⚠️ Schema update warning (non-critical): {e}")
487
- # Continue even if update fails—data is already inserted
488
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
489
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
490
 
 
417
  industry_confidence = industry_info["confidence"]
418
  print(f"[canonify] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})")
419
 
420
+ # 8️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
 
 
 
 
 
 
 
421
  os.makedirs("./db", exist_ok=True)
422
 
423
  with transactional_conn(org_id) as duck:
 
438
 
439
  version_id = None
440
  if is_new_schema:
441
+ # Manual auto-increment for DuckDB 0.10.3 compatibility
442
  version_id = duck.execute("""
443
+ INSERT INTO main.schema_versions
444
+ (version_id, table_name, schema_json, status)
445
+ VALUES (nextval('schema_version_seq'), ?, ?, 'pending')
446
+ RETURNING version_id
447
+ """, (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0]
448
+ print(f"[canonify] 📝 Created schema v{version_id} for {entity_type}_canonical")
449
 
450
  # 8b) Ensure table exists with current schema
451
  table_name = ensure_canonical_table(duck, df, entity_type)
 
468
  print(f"[canonify] 💾 Inserted {len(df_to_insert)} rows into {table_name}")
469
 
470
  # 8d) Mark schema as applied post-insert
 
471
  if is_new_schema and version_id:
472
  try:
 
473
  duck.execute("""
474
  UPDATE main.schema_versions
475
  SET applied_at = CURRENT_TIMESTAMP, status = 'applied'
 
478
  print(f"[canonify] ✅ Schema v{version_id} marked as applied")
479
  except Exception as e:
480
  print(f"[canonify] ⚠️ Schema update warning (non-critical): {e}")
481
+
482
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
483
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
484