shaliz-kong commited on
Commit ·
aef3a87
1
Parent(s): 48e3910
added strict data verification in cannonify
Browse files- app/mapper.py +29 -16
app/mapper.py
CHANGED
|
@@ -488,34 +488,47 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
|
|
| 488 |
# ⬇️⬇️⬇️ NEW VERIFICATION STEP ⬇️⬇️⬇️
|
| 489 |
# CRITICAL: Ensure data is committed and visible before spawning worker
|
| 490 |
|
| 491 |
-
print(f"[canonify] 🔍 Verifying data visibility...")
|
| 492 |
-
|
| 493 |
-
# Use a NEW connection to verify data is visible (ensures isolation level is correct)
|
| 494 |
verification_conn = None
|
|
|
|
|
|
|
|
|
|
| 495 |
try:
|
| 496 |
-
|
| 497 |
-
|
|
|
|
| 498 |
|
| 499 |
-
|
| 500 |
-
|
| 501 |
-
|
| 502 |
|
| 503 |
-
|
| 504 |
-
|
| 505 |
-
|
| 506 |
-
|
| 507 |
-
|
| 508 |
-
return df, industry, industry_confidence
|
| 509 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 510 |
except Exception as e:
|
| 511 |
print(f"[canonify] ❌ Verification error: {e}")
|
| 512 |
-
return df, industry, industry_confidence
|
| 513 |
finally:
|
| 514 |
if verification_conn:
|
| 515 |
try:
|
| 516 |
verification_conn.close()
|
| 517 |
print("[canonify] 🔒 Verification connection closed")
|
| 518 |
-
except:
|
| 519 |
pass
|
| 520 |
# ⬆️⬆️⬆️ END VERIFICATION ⬆️⬆️⬆️
|
| 521 |
# 8️⃣ FINAL: Clean DataFrame for response
|
|
|
|
| 488 |
# ⬇️⬇️⬇️ NEW VERIFICATION STEP ⬇️⬇️⬇️
|
| 489 |
# CRITICAL: Ensure data is committed and visible before spawning worker
|
| 490 |
|
| 491 |
+
print(f"[canonify] 🔍 Verifying data visibility (expecting {rows_inserted} rows)...")
|
| 492 |
+
|
|
|
|
| 493 |
verification_conn = None
|
| 494 |
+
MAX_VERIFY_ATTEMPTS = 5
|
| 495 |
+
VERIFY_DELAY = 1.0 # seconds
|
| 496 |
+
|
| 497 |
try:
|
| 498 |
+
for attempt in range(MAX_VERIFY_ATTEMPTS):
|
| 499 |
+
verification_conn = get_conn(org_id)
|
| 500 |
+
verification_conn.execute("CHECKPOINT") # Force flush any pending writes
|
| 501 |
|
| 502 |
+
# Query the table we just inserted into
|
| 503 |
+
verify_table = f"main.{entity_type}_canonical"
|
| 504 |
+
verify_count = verification_conn.execute(f"SELECT COUNT(*) FROM {verify_table}").fetchone()[0]
|
| 505 |
|
| 506 |
+
print(f"[canonify] 🔍 Verification attempt {attempt + 1}: {verify_count}/{rows_inserted} rows visible")
|
| 507 |
+
|
| 508 |
+
if verify_count >= rows_inserted:
|
| 509 |
+
print(f"[canonify] ✅ Verification passed: {verify_count} rows visible in {verify_table}")
|
| 510 |
+
break
|
|
|
|
| 511 |
|
| 512 |
+
if attempt < MAX_VERIFY_ATTEMPTS - 1:
|
| 513 |
+
print(f"[canonify] ⏳ Data not yet visible, waiting {VERIFY_DELAY}s...")
|
| 514 |
+
try:
|
| 515 |
+
verification_conn.close()
|
| 516 |
+
except Exception:
|
| 517 |
+
pass
|
| 518 |
+
time.sleep(VERIFY_DELAY)
|
| 519 |
+
else:
|
| 520 |
+
# This runs if the loop completes without breaking
|
| 521 |
+
print(f"[canonify] ❌ Verification failed: Expected {rows_inserted} rows, found only {verify_count}")
|
| 522 |
+
print(f"[canonify] ⚠️ Proceeding anyway but KPI worker may timeout...")
|
| 523 |
+
|
| 524 |
except Exception as e:
|
| 525 |
print(f"[canonify] ❌ Verification error: {e}")
|
|
|
|
| 526 |
finally:
|
| 527 |
if verification_conn:
|
| 528 |
try:
|
| 529 |
verification_conn.close()
|
| 530 |
print("[canonify] 🔒 Verification connection closed")
|
| 531 |
+
except Exception:
|
| 532 |
pass
|
| 533 |
# ⬆️⬆️⬆️ END VERIFICATION ⬆️⬆️⬆️
|
| 534 |
# 8️⃣ FINAL: Clean DataFrame for response
|