shaliz-kong commited on
Commit
48e3910
·
1 Parent(s): 5407eb5

added data verification on canonify

Browse files
Files changed (1) hide show
  1. app/mapper.py +39 -3
app/mapper.py CHANGED
@@ -422,6 +422,8 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
422
  # 7️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
423
  os.makedirs("./db", exist_ok=True)
424
 
 
 
425
  with transactional_conn(org_id) as duck:
426
  ensure_schema_versions_table(duck)
427
 
@@ -468,7 +470,8 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
468
  f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
469
  df_to_insert.values.tolist()
470
  )
471
- print(f"[canonify] 💾 Inserted {len(df_to_insert)} rows into {table_name}")
 
472
 
473
  # Mark schema as applied
474
  if is_new_schema and version_id:
@@ -482,12 +485,45 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
482
  except Exception as e:
483
  print(f"[canonify] ⚠️ Schema update warning: {e}")
484
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
485
  # 8️⃣ FINAL: Clean DataFrame for response
486
  df = df.replace([np.inf, -np.inf, np.nan], None)
487
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
488
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
489
 
490
- # 9️⃣ SINGLE, SAFE WORKER TRIGGER (idempotent)
491
  try:
492
  # Defensive: ensure keys exist (they should from poll_for_entity)
493
  e_key = f"entity:{org_id}:{source_id}"
@@ -502,7 +538,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
502
  "type": "kpi_compute",
503
  "entity_type": entity_type,
504
  "industry": industry,
505
- "rows_inserted": len(df),
506
  "timestamp": datetime.now().isoformat()
507
  })
508
  print(f"[canonify] 🚀 Triggered analytics for {source_id}")
 
422
  # 7️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
423
  os.makedirs("./db", exist_ok=True)
424
 
425
+ rows_inserted = 0 # Track insertion count
426
+
427
  with transactional_conn(org_id) as duck:
428
  ensure_schema_versions_table(duck)
429
 
 
470
  f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
471
  df_to_insert.values.tolist()
472
  )
473
+ rows_inserted = len(df_to_insert)
474
+ print(f"[canonify] 💾 Inserted {rows_inserted} rows into {table_name}")
475
 
476
  # Mark schema as applied
477
  if is_new_schema and version_id:
 
485
  except Exception as e:
486
  print(f"[canonify] ⚠️ Schema update warning: {e}")
487
 
488
+ # ⬇️⬇️⬇️ NEW VERIFICATION STEP ⬇️⬇️⬇️
489
+ # CRITICAL: Ensure data is committed and visible before spawning worker
490
+
491
+ print(f"[canonify] 🔍 Verifying data visibility...")
492
+
493
+ # Use a NEW connection to verify data is visible (ensures isolation level is correct)
494
+ verification_conn = None
495
+ try:
496
+ verification_conn = get_conn(org_id)
497
+ verification_conn.execute("CHECKPOINT") # Force any pending writes to flush
498
+
499
+ # Query the table we just inserted into
500
+ verify_table = f"main.{entity_type}_canonical"
501
+ verify_count = verification_conn.execute(f"SELECT COUNT(*) FROM {verify_table}").fetchone()[0]
502
+
503
+ if verify_count >= rows_inserted:
504
+ print(f"[canonify] ✅ Verification passed: {verify_count} rows visible in {verify_table}")
505
+ else:
506
+ print(f"[canonify] ❌ Verification failed: Expected {rows_inserted}, found {verify_count} rows")
507
+ # Don't proceed if verification fails
508
+ return df, industry, industry_confidence
509
+
510
+ except Exception as e:
511
+ print(f"[canonify] ❌ Verification error: {e}")
512
+ return df, industry, industry_confidence
513
+ finally:
514
+ if verification_conn:
515
+ try:
516
+ verification_conn.close()
517
+ print("[canonify] 🔒 Verification connection closed")
518
+ except:
519
+ pass
520
+ # ⬆️⬆️⬆️ END VERIFICATION ⬆️⬆️⬆️
521
  # 8️⃣ FINAL: Clean DataFrame for response
522
  df = df.replace([np.inf, -np.inf, np.nan], None)
523
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
524
  print(f"[canonify] ✅ Pipeline complete in {duration_ms:.2f}ms for {org_id}")
525
 
526
+ # 9️⃣ SINGLE, SAFE WORKER TRIGGER (idempotent) - NOW ONLY AFTER VERIFICATION
527
  try:
528
  # Defensive: ensure keys exist (they should from poll_for_entity)
529
  e_key = f"entity:{org_id}:{source_id}"
 
538
  "type": "kpi_compute",
539
  "entity_type": entity_type,
540
  "industry": industry,
541
+ "rows_inserted": rows_inserted, # Use actual count
542
  "timestamp": datetime.now().isoformat()
543
  })
544
  print(f"[canonify] 🚀 Triggered analytics for {source_id}")