shaliz-kong commited on
Commit Β·
3369665
1
Parent(s): 883efb8
refactoreed descover schema
Browse files- app/tasks/analytics_worker.py +21 -18
app/tasks/analytics_worker.py
CHANGED
|
@@ -332,29 +332,32 @@ class AnalyticsWorker:
|
|
| 332 |
|
| 333 |
# app/tasks/analytics_worker.py - Replace your _discover_schema method
|
| 334 |
|
| 335 |
-
|
| 336 |
-
|
| 337 |
-
|
| 338 |
-
|
| 339 |
-
raise ValueError("entity_type must be set before schema discovery")
|
| 340 |
try:
|
| 341 |
logger.info("[SCHEMA] π§ Cache miss, discovering...")
|
| 342 |
|
| 343 |
from app.schemas.org_schema import OrgSchema
|
| 344 |
|
| 345 |
-
#
|
| 346 |
-
|
| 347 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 348 |
|
| 349 |
if not mapping:
|
| 350 |
raise ValueError("Empty mapping returned")
|
| 351 |
|
| 352 |
-
#
|
| 353 |
cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache"
|
| 354 |
-
|
| 355 |
-
# β
FIX: Save to Redis with proper TTL
|
| 356 |
-
event_hub.setex(cache_key, 86400, json.dumps(mapping))
|
| 357 |
-
logger.info(f"[SCHEMA] πΎ Cached mapping for 24h: {cache_key}")
|
| 358 |
|
| 359 |
self._schema_cache = mapping
|
| 360 |
logger.info(f"[SCHEMA] β
Discovery complete: {len(mapping)} columns")
|
|
@@ -363,13 +366,13 @@ class AnalyticsWorker:
|
|
| 363 |
except Exception as e:
|
| 364 |
logger.error(f"[SCHEMA] β Discovery failed: {e}")
|
| 365 |
|
| 366 |
-
# π EMERGENCY FALLBACK: Map columns to themselves
|
| 367 |
logger.warning("[SCHEMA] π¨ Using fallback - mapping columns as-is")
|
| 368 |
-
stealth_mapping = {col: col for col in
|
| 369 |
|
| 370 |
-
|
| 371 |
-
|
| 372 |
-
|
| 373 |
|
| 374 |
self._schema_cache = stealth_mapping
|
| 375 |
return stealth_mapping
|
|
|
|
| 332 |
|
| 333 |
# app/tasks/analytics_worker.py - Replace your _discover_schema method
|
| 334 |
|
| 335 |
+
# app/tasks/analytics_worker.py - Replace line ~95
|
| 336 |
+
|
| 337 |
+
async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]:
|
| 338 |
+
"""Schema discovery with entity context (NOW ACCEPTS df)"""
|
|
|
|
| 339 |
try:
|
| 340 |
logger.info("[SCHEMA] π§ Cache miss, discovering...")
|
| 341 |
|
| 342 |
from app.schemas.org_schema import OrgSchema
|
| 343 |
|
| 344 |
+
# Ensure entity_type is set (from STEP 2)
|
| 345 |
+
if not getattr(self, '_entity_type', None):
|
| 346 |
+
raise ValueError("entity_type must be set in STEP 2")
|
| 347 |
+
|
| 348 |
+
# Run sync discovery in thread pool (non-blocking)
|
| 349 |
+
def sync_discover():
|
| 350 |
+
schema = OrgSchema(self.org_id, self._entity_type)
|
| 351 |
+
return schema.get_mapping()
|
| 352 |
+
|
| 353 |
+
mapping = await asyncio.to_thread(sync_discover)
|
| 354 |
|
| 355 |
if not mapping:
|
| 356 |
raise ValueError("Empty mapping returned")
|
| 357 |
|
| 358 |
+
# Cache for 24h
|
| 359 |
cache_key = f"schema:{self.org_id}:{self._entity_type}:worker_cache"
|
| 360 |
+
await asyncio.to_thread(event_hub.setex, cache_key, 86400, json.dumps(mapping))
|
|
|
|
|
|
|
|
|
|
| 361 |
|
| 362 |
self._schema_cache = mapping
|
| 363 |
logger.info(f"[SCHEMA] β
Discovery complete: {len(mapping)} columns")
|
|
|
|
| 366 |
except Exception as e:
|
| 367 |
logger.error(f"[SCHEMA] β Discovery failed: {e}")
|
| 368 |
|
| 369 |
+
# π EMERGENCY FALLBACK: Map df columns to themselves
|
| 370 |
logger.warning("[SCHEMA] π¨ Using fallback - mapping columns as-is")
|
| 371 |
+
stealth_mapping = {col: col for col in df.columns}
|
| 372 |
|
| 373 |
+
if getattr(self, '_entity_type', None):
|
| 374 |
+
cache_key = f"schema:{self._entity_type}:fallback"
|
| 375 |
+
await asyncio.to_thread(event_hub.setex, cache_key, 3600, json.dumps(stealth_mapping))
|
| 376 |
|
| 377 |
self._schema_cache = stealth_mapping
|
| 378 |
return stealth_mapping
|