Peter Mutwiri commited on
Commit
aaab8f4
·
1 Parent(s): d8e13ad

updated version id

Browse files
Files changed (1) hide show
  1. app/mapper.py +12 -7
app/mapper.py CHANGED
@@ -472,14 +472,19 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
472
  print(f"[canonify] 💾 Inserted {len(df_to_insert)} rows into {table_name}")
473
 
474
  # 8d) Mark schema as applied post-insert
 
475
  if is_new_schema and version_id:
476
- duck.execute("""
477
- UPDATE main.schema_versions
478
- SET applied_at = CURRENT_TIMESTAMP, status = 'applied',
479
- rows_at_migration = ?
480
- WHERE version_id = ?
481
- """, (len(df_to_insert), version_id))
482
-
 
 
 
 
483
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
484
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
485
 
 
472
  print(f"[canonify] 💾 Inserted {len(df_to_insert)} rows into {table_name}")
473
 
474
  # 8d) Mark schema as applied post-insert
475
+ # 8d) Mark schema as applied post-insert (DuckDB-safe)
476
  if is_new_schema and version_id:
477
+ try:
478
+ # Use simple UPDATE without complex constraints
479
+ duck.execute("""
480
+ UPDATE main.schema_versions
481
+ SET applied_at = CURRENT_TIMESTAMP, status = 'applied'
482
+ WHERE version_id = ?
483
+ """, (version_id,))
484
+ print(f"[canonify] ✅ Schema v{version_id} marked as applied")
485
+ except Exception as e:
486
+ print(f"[canonify] ⚠️ Schema update warning (non-critical): {e}")
487
+ # Continue even if update fails—data is already inserted
488
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
489
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
490