shaliz-kong commited on
Commit
2863e39
Β·
1 Parent(s): ee959f2

add lock to descover schema

Browse files
Files changed (2) hide show
  1. app/mapper.py +1 -21
  2. app/tasks/analytics_worker.py +4 -1
app/mapper.py CHANGED
@@ -474,27 +474,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
474
  # In app/mapper.py, after `rows_inserted = len(df_to_insert)`
475
  print(f"[canonify] πŸ’Ύ Inserted {rows_inserted} rows into {table_name}")
476
 
477
- # ==================== DIAGNOSTIC QUERIES (DISABLE IN PROD) ====================
478
- print(f"[canonify] πŸ”¬ DIAGNOSTIC: Querying table immediately post-insert...")
479
- try:
480
- diagnostic_conn = get_conn(org_id)
481
-
482
- # Check total row count
483
- total_rows = diagnostic_conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
484
- print(f"[canonify] πŸ”¬ DIAGNOSTIC: Table {table_name} has {total_rows} total rows")
485
-
486
- # Sample the data
487
- sample = diagnostic_conn.execute(f"SELECT * FROM {table_name} LIMIT 3").df()
488
- if not sample.empty:
489
- print(f"[canonify] πŸ”¬ DIAGNOSTIC: Sample data:\n{sample.to_string()}")
490
- else:
491
- print(f"[canonify] πŸ”¬ DIAGNOSTIC: Table exists but SELECT returned ZERO rows")
492
-
493
- diagnostic_conn.close()
494
- except Exception as diag_e:
495
- print(f"[canonify] πŸ”¬ DIAGNOSTIC ERROR: {diag_e}")
496
- # ============================================================================
497
-
498
  # Mark schema as applied
499
  if is_new_schema and version_id:
500
  try:
 
474
  # In app/mapper.py, after `rows_inserted = len(df_to_insert)`
475
  print(f"[canonify] πŸ’Ύ Inserted {rows_inserted} rows into {table_name}")
476
 
477
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
478
  # Mark schema as applied
479
  if is_new_schema and version_id:
480
  try:
app/tasks/analytics_worker.py CHANGED
@@ -44,7 +44,7 @@ class AnalyticsWorker:
44
  self.hours_window = hours_window
45
 
46
  # Core engines
47
- self.schema = OrgSchema(org_id)
48
  self.col_embedder = ColumnEmbeddingService()
49
  self.txn_embedder = EmbeddingService()
50
  self.vector_service = VectorService(org_id)
@@ -364,6 +364,9 @@ class AnalyticsWorker:
364
 
365
  def _discover_schema(self):
366
  """Schema discovery with proper caching and error handling"""
 
 
 
367
  try:
368
  logger.info("[SCHEMA] 🧠 Cache miss, discovering...")
369
 
 
44
  self.hours_window = hours_window
45
 
46
  # Core engines
47
+
48
  self.col_embedder = ColumnEmbeddingService()
49
  self.txn_embedder = EmbeddingService()
50
  self.vector_service = VectorService(org_id)
 
364
 
365
  def _discover_schema(self):
366
  """Schema discovery with proper caching and error handling"""
367
+ if not self._entity_type:
368
+ logger.error("[SCHEMA] ❌ Cannot discover schema: entity_type not set")
369
+ raise ValueError("entity_type must be set before schema discovery")
370
  try:
371
  logger.info("[SCHEMA] 🧠 Cache miss, discovering...")
372