shaliz-kong commited on
Commit
14aa120
Β·
1 Parent(s): 6439a99

refactored mapper and analytics worker

Browse files
Files changed (2) hide show
  1. app/mapper.py +217 -352
  2. app/tasks/analytics_worker.py +492 -185
app/mapper.py CHANGED
@@ -1,17 +1,16 @@
1
- # app/mapper.py – BULLETPROOF VERSION
2
  import os
3
  import json
4
- # import duckdb
5
  import pandas as pd
6
  import numpy as np
7
  from datetime import datetime, timedelta
8
- from app.db import get_conn, ensure_raw_table, transactional_conn,ensure_schema_versions_table
9
- # app/mapper.py (add line 1)
10
- from app.hybrid_entity_detector import hybrid_detect_entity_type
11
  import time
 
 
 
12
  from app.core.event_hub import event_hub
13
 
14
- # ---------------------- Canonical schema base ---------------------- #
15
  CANONICAL = {
16
  "timestamp": ["timestamp", "date", "sale_date", "created_at"],
17
  "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
@@ -25,45 +24,20 @@ CANONICAL = {
25
 
26
  ALIAS_FILE = "./db/alias_memory.json"
27
 
 
 
 
 
28
  def map_pandas_to_duck(col: str, series: pd.Series) -> str:
 
29
  if pd.api.types.is_bool_dtype(series): return "BOOLEAN"
30
  if pd.api.types.is_integer_dtype(series): return "BIGINT"
31
  if pd.api.types.is_float_dtype(series): return "DOUBLE"
32
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
33
  return "VARCHAR"
34
 
35
- # ---------- entity detection(uses ai to detect entity from the data) ---------- #
36
- def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame, entity_type: str) -> str:
37
- """Creates entity-specific table: main.sales_canonical, main.inventory_canonical, etc."""
38
- table_name = f"main.{entity_type}_canonical"
39
-
40
- # Create base table if doesn't exist
41
- duck.execute(f"""
42
- CREATE TABLE IF NOT EXISTS {table_name} (
43
- id UUID DEFAULT uuid(),
44
- _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
45
- )
46
- """)
47
-
48
- # Get existing columns (lowercase for comparison)
49
- existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
50
- existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
51
-
52
- # βœ… BULLETPROOF: Add missing columns with safe name handling
53
- for col in df.columns:
54
- col_name = str(col).lower().strip() # βœ… FORCE STRING
55
- if col_name not in existing_cols:
56
- try:
57
- dtype = map_pandas_to_duck(col_name, df[col])
58
- print(f"[mapper] βž• Adding column '{col_name}:{dtype}'")
59
- duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}")
60
- except Exception as e:
61
- print(f"[mapper] ⚠️ Skipping column {col_name}: {e}")
62
-
63
- return table_name
64
-
65
- # ---------- Alias Memory ---------- #
66
  def load_dynamic_aliases() -> None:
 
67
  if os.path.exists(ALIAS_FILE):
68
  try:
69
  with open(ALIAS_FILE) as f:
@@ -74,22 +48,24 @@ def load_dynamic_aliases() -> None:
74
  else:
75
  CANONICAL[k] = v
76
  except Exception as e:
77
- print(f"[mapper] ⚠️ failed to load alias memory: {e}")
78
 
79
  def save_dynamic_aliases() -> None:
 
80
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
81
  with open(ALIAS_FILE, "w") as f:
82
  json.dump(CANONICAL, f, indent=2)
83
- # βœ… Module-level cache: (org_id, source_id) -> entity_info
84
- _ENTITY_CACHE = {}
85
- _INDUSTRY_CACHE = {} # NEW
86
- def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
87
  """
88
- 🎯 Capped at 2 Redis calls (immediate + after 5s sleep).
89
- In-memory cache prevents re-polling the same source.
90
  """
91
- # 1. Check cache (zero Redis calls)
92
  cache_key = (org_id, source_id)
 
 
93
  if cache_key in _ENTITY_CACHE:
94
  print(f"[poll] πŸ’Ύ CACHE HIT: {cache_key}")
95
  return _ENTITY_CACHE[cache_key]
@@ -97,62 +73,75 @@ def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
97
  entity_key = f"entity:{org_id}:{source_id}"
98
  print(f"[poll] ⏳ Polling for key: {entity_key}")
99
 
100
- # 2. First attempt (immediate)
101
  data = event_hub.get_key(entity_key)
102
  if data:
103
  entity_info = json.loads(data)
104
- print(f"[poll] βœ… SUCCESS on first attempt: {entity_info['entity_type']}")
105
  _ENTITY_CACHE[cache_key] = entity_info
106
  return entity_info
107
 
108
- # 3. Sleep 5 seconds (gives worker time)
109
- print("[poll] πŸ”„ First check failed, sleeping 5s...")
110
- time.sleep(5.0)
111
 
112
- # 4. Second attempt (final)
113
  data = event_hub.get_key(entity_key)
114
  if data:
115
  entity_info = json.loads(data)
116
- print(f"[poll] βœ… SUCCESS on second attempt: {entity_info['entity_type']}")
117
  _ENTITY_CACHE[cache_key] = entity_info
118
  return entity_info
119
 
120
- # 5. Emergency fallback (worker is dead)
121
- print("[poll] ⚠️ Both attempts failed - using direct detection")
122
-
123
- # Use the combined fallback so we only hit DuckDB once and write both
124
- # entity AND industry keys atomically when possible.
125
  entity_info, industry_info = _fallback_combined(org_id, source_id)
126
-
127
- # Invalidate local cache entry so that subsequent callers read Redis first
128
- _ENTITY_CACHE.pop((org_id, source_id), None)
129
-
130
  return entity_info
131
 
132
-
133
- # OLD: _fallback_detection kept for reference (commented out during refactor)
134
- """
135
- def _fallback_detection(org_id: str, source_id: str) -> dict:
136
- # (original implementation)
137
- ...
138
- """
139
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140
 
141
  def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]:
142
- """Single DuckDB query to produce both entity and industry detections.
143
-
144
- Guarantees:
145
- - Always writes `entity:{org_id}:{source_id}` and
146
- `industry:{org_id}:{source_id}` to Redis (or logs if write fails).
147
- - Updates module caches then invalidates them so readers re-check Redis.
148
- - Attempts parallel detection to reduce latency.
149
  """
150
  print(f"[fallback_combined] 🚨 Running combined fallback for {org_id}/{source_id}")
151
-
152
- # Default UNKNOWN placeholders
153
  entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0}
154
  industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
155
-
156
  try:
157
  conn = get_conn(org_id)
158
  rows = conn.execute("""
@@ -161,152 +150,71 @@ def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]:
161
  WHERE row_data IS NOT NULL
162
  USING SAMPLE 100
163
  """).fetchall()
164
-
165
  if rows:
166
  parsed = [json.loads(r[0]) for r in rows if r[0]]
167
  df = pd.DataFrame(parsed)
168
  df.columns = [str(col).lower().strip() for col in df.columns]
169
-
170
- # Run both detectors concurrently (thread pool for CPU/IO-bound work)
171
- from concurrent.futures import ThreadPoolExecutor
172
-
173
- def run_entity():
174
  try:
175
  return hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
176
  except Exception as e:
177
- print(f"[fallback_combined] ❌ entity detection failed: {e}")
178
  return ("UNKNOWN", 0.0, False)
179
-
180
- def run_industry():
181
  try:
182
  from app.hybrid_industry_detector import hybrid_detect_industry_type
183
  return hybrid_detect_industry_type(org_id, df, source_id)
184
  except Exception as e:
185
- print(f"[fallback_combined] ❌ industry detection failed: {e}")
186
  return ("UNKNOWN", 0.0, False)
187
-
188
  with ThreadPoolExecutor(max_workers=2) as ex:
189
- ent_f = ex.submit(run_entity)
190
- ind_f = ex.submit(run_industry)
191
- ent_res = ent_f.result()
192
- ind_res = ind_f.result()
193
-
194
- entity_info = {"entity_type": ent_res[0], "confidence": ent_res[1]}
195
- industry_info = {"industry": ind_res[0], "confidence": ind_res[1]}
196
-
197
- print(f"[fallback_combined] βœ… Entity: {entity_info['entity_type']} ({entity_info['confidence']:.2%})")
198
- print(f"[fallback_combined] βœ… Industry: {industry_info['industry']} ({industry_info['confidence']:.2%})")
199
-
200
  except Exception as e:
201
- print(f"[fallback_combined] ❌ Combined fallback failed: {e}")
202
-
203
- # Persist to Redis; prefer pipeline when available for minimal window
204
- e_key = f"entity:{org_id}:{source_id}"
205
- i_key = f"industry:{org_id}:{source_id}"
206
-
207
  try:
208
- pipe = event_hub.pipeline()
209
- if pipe is not None:
210
- try:
211
- pipe.setex(e_key, 3600, json.dumps(entity_info))
212
- pipe.setex(i_key, 3600, json.dumps(industry_info))
213
- # Also add a per-source readiness nudging stream entry
214
- try:
215
- # Structured readiness message for per-source stream
216
- msg = json.dumps({
217
- "org_id": org_id,
218
- "source_id": source_id,
219
- "status": "ready"
220
- })
221
- pipe.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
222
- except Exception:
223
- # Not fatal; continue
224
- pass
225
- pipe.execute()
226
- except Exception as e:
227
- print(f"[fallback_combined] ❌ Pipeline execute failed: {e}")
228
- # Fall back to sequential writes
229
- event_hub.setex(e_key, 3600, json.dumps(entity_info))
230
- event_hub.setex(i_key, 3600, json.dumps(industry_info))
231
- else:
232
- # Pipeline not available; do sequential writes
233
- event_hub.setex(e_key, 3600, json.dumps(entity_info))
234
- event_hub.setex(i_key, 3600, json.dumps(industry_info))
235
-
236
- except Exception as e:
237
- print(f"[fallback_combined] ❌ Redis write failed: {e}")
238
-
239
- # Update caches (then immediately invalidate to avoid stale-reads window)
240
- _ENTITY_CACHE[(org_id, source_id)] = entity_info
241
- _INDUSTRY_CACHE[(org_id, source_id)] = industry_info
242
- _ENTITY_CACHE.pop((org_id, source_id), None)
243
- _INDUSTRY_CACHE.pop((org_id, source_id), None)
244
-
245
- return entity_info, industry_info
246
- #poll for industry from redis
247
- def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
248
- """
249
- 🎯 Polls Redis for industry detection result (user-facing dashboard label).
250
- Capped at 2 Redis calls (immediate + after 5s sleep).
251
- In-memory cache prevents re-polling the same source.
252
 
253
- Returns:
254
- dict: {"industry": str, "confidence": float}
255
- """
256
  cache_key = (org_id, source_id)
 
 
257
 
258
- # 1. Check cache FIRST
259
- if cache_key in _INDUSTRY_CACHE:
260
- print(f"[poll_industry] πŸ’Ύ CACHE HIT: {cache_key}")
261
- return _INDUSTRY_CACHE[cache_key]
262
-
263
- industry_key = f"industry:{org_id}:{source_id}"
264
- print(f"[poll_industry] ⏳ Polling for key: {industry_key}")
265
-
266
- # 2. First attempt (immediate)
267
- data = event_hub.get_key(industry_key)
268
- if data:
269
- industry_info = json.loads(data)
270
- _INDUSTRY_CACHE[cache_key] = industry_info
271
- print(f"[poll_industry] βœ… SUCCESS on first attempt: {industry_info['industry']}")
272
- return industry_info
273
-
274
- # 3. Sleep 5 seconds (gives worker time)
275
- print("[poll_industry] πŸ”„ First check failed, sleeping 5s...")
276
- time.sleep(5.0)
277
-
278
- # 4. Second attempt (final)
279
- data = event_hub.get_key(industry_key)
280
- if data:
281
- industry_info = json.loads(data)
282
- _INDUSTRY_CACHE[cache_key] = industry_info
283
- print(f"[poll_industry] βœ… SUCCESS on second attempt: {industry_info['industry']}")
284
- return industry_info
285
-
286
- # 5. Emergency fallback (worker is dead)
287
- print("[poll_industry] ⚠️ Both attempts failed - using direct detection")
288
- industry_info = _fallback_industry_detection(org_id, source_id)
289
-
290
- # 🎯 NEW: Force write to Redis (ensure it's there)
291
- event_hub.setex(
292
- f"industry:{org_id}:{source_id}",
293
- 3600,
294
- json.dumps(industry_info)
295
- )
296
-
297
- # 🎯 NEW: Clear stale cache so next read is fresh
298
- if (org_id, source_id) in _INDUSTRY_CACHE:
299
- del _INDUSTRY_CACHE[(org_id, source_id)]
300
-
301
- return industry_info
302
- #fallback industry detection
303
  def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
304
  """
305
- Emergency: Run industry detection directly from DuckDB data.
306
- Uses the actual hybrid detector module you have.
307
- Writes result to Redis and cache for recovery.
308
  """
309
- print(f"[fallback_industry] 🚨 Running fallback for {org_id}/{source_id}")
310
 
311
  try:
312
  conn = get_conn(org_id)
@@ -318,90 +226,80 @@ def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
318
  """).fetchall()
319
 
320
  if not rows:
321
- print("[fallback_industry] ❌ No data found, returning UNKNOWN")
322
- industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
323
- else:
324
- parsed = [json.loads(r[0]) for r in rows if r[0]]
325
- df = pd.DataFrame(parsed)
326
- # βœ… ADD THIS LINE - Normalize column names before detection
327
- df.columns = [str(col).lower().strip() for col in df.columns]
328
- # βœ… CORRECT: Import from your actual module
329
- from app.hybrid_industry_detector import hybrid_detect_industry_type
330
-
331
- # Call it (note: it returns 3 values: industry, confidence, is_confident)
332
- industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
333
- industry_info = {"industry": industry, "confidence": confidence}
334
- print(f"[fallback_industry] βœ… Direct detection: {industry} ({confidence:.2%})")
335
 
336
- # βœ… CRITICAL: Write to Redis BEFORE returning
 
 
 
337
  redis_key = f"industry:{org_id}:{source_id}"
338
  event_hub.setex(redis_key, 3600, json.dumps(industry_info))
339
- print(f"[fallback_industry] πŸ’Ύ WRITTEN TO REDIS: {redis_key}")
340
-
341
- # βœ… Also populate module cache
342
- cache_key = (org_id, source_id)
343
- _INDUSTRY_CACHE[cache_key] = industry_info
344
 
345
  return industry_info
346
 
347
  except Exception as e:
348
  print(f"[fallback_industry] ❌ Failed: {e}")
349
- # βœ… Even on error, write UNKNOWN to Redis so worker doesn't hang
350
  redis_key = f"industry:{org_id}:{source_id}"
351
  event_hub.setex(redis_key, 3600, json.dumps({"industry": "UNKNOWN", "confidence": 0.0}))
352
  return {"industry": "UNKNOWN", "confidence": 0.0}
353
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354
 
 
355
 
356
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
357
  """
358
- ENTERPRISE DATA INGESTION PIPELINE (v2.0)
359
- ==========================================
360
-
361
- Transforms raw audit data into queryable canonical format while:
362
- βœ… Preserving ALL original columns (no data loss)
363
- βœ… Mapping recognized fields to canonical schema
364
- βœ… Versioning schema changes for audit & rollback
365
- βœ… Enforcing minimum schema contracts
366
- βœ… Operating transactionally for data integrity
367
- βœ… Handling background worker failures gracefully
368
-
369
- Flow:
370
- 1. Fetch raw audit trail from main.raw_rows
371
- 2. Parse nested JSON (handles {tables: {...}}, {data: [...]}, etc.)
372
- 3. Normalize column names (force string, lowercase, dedupe)
373
- 4. Map to canonical schema BUT keep unmapped columns intact
374
- 5. Learn new column aliases for future mapping improvements
375
- 6. Type-cast canonical fields (timestamp, qty, total, etc.)
376
- 7. Poll Redis for entity type & industry (with fallback)
377
- 8. Version the schema if structure changed
378
- 9. Enforce schema contract (ensure required canonical columns exist)
379
- 10. Transactionally insert into entity-specific table
380
- 11. Return full DataFrame + industry metadata for frontend
381
-
382
- Args:
383
- org_id: Tenant identifier (e.g., "org_synth_123")
384
- source_id: Data source UUID for entity/industry detection
385
- hours_window: Hours of raw data to consider (default: 24h)
386
-
387
- Returns:
388
- tuple: (DataFrame with ALL columns, industry: str, confidence: float)
389
-
390
- Raises:
391
- ValueError: If schema contract is violated (missing required columns)
392
- HTTPException: On critical failures (quota, insertion errors)
393
  """
394
  start_time = datetime.now()
395
  print(f"\n[canonify] πŸš€ Starting pipeline for {org_id}/{source_id}")
396
 
397
- # Load dynamic aliases before mapping
398
  load_dynamic_aliases()
399
-
400
- # 1️⃣ FETCH RAW AUDIT DATA
401
  with get_conn(org_id) as conn:
402
  ensure_raw_table(conn)
403
-
404
- # βœ… FIXED: Calculate cutoff in Python, bind properly
405
  cutoff_time = datetime.now() - timedelta(hours=hours_window)
406
 
407
  try:
@@ -415,26 +313,26 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
415
  except Exception as e:
416
  print(f"[canonify] ❌ SQL read error: {e}")
417
  return pd.DataFrame(), "unknown", 0.0
418
-
419
  if not rows:
420
- print("[canonify] ⚠️ No audit rows found in window")
421
  return pd.DataFrame(), "unknown", 0.0
422
-
423
- # 2️⃣ PARSE NESTED JSON PAYLOADS
424
  parsed, malformed_count = [], 0
425
  for r in rows:
426
  raw = r[0]
427
  if not raw:
428
  malformed_count += 1
429
  continue
430
-
431
  try:
432
  obj = raw if isinstance(raw, (dict, list)) else json.loads(str(raw))
433
  except Exception:
434
  malformed_count += 1
435
  continue
436
-
437
- # Extract rows from various payload structures
438
  if isinstance(obj, dict):
439
  if "rows" in obj and isinstance(obj["rows"], list):
440
  parsed.extend(obj["rows"])
@@ -450,34 +348,31 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
450
  parsed.extend(obj)
451
  else:
452
  malformed_count += 1
453
-
454
  if malformed_count:
455
  print(f"[canonify] ⚠️ Skipped {malformed_count} malformed rows")
456
  if not parsed:
457
  print("[canonify] ❌ No valid data after parsing")
458
  return pd.DataFrame(), "unknown", 0.0
459
-
460
- # 3️⃣ NORMALIZE COLUMN NAMES (Bulletproof)
461
  df = pd.DataFrame(parsed)
462
  df.columns = [str(col).lower().strip() for col in df.columns]
463
  df = df.loc[:, ~df.columns.duplicated()]
464
  print(f"[canonify] πŸ“Š Parsed DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
465
-
466
- # 4️⃣ MAP TO CANONICAL SCHEMA (Preserve All Columns)
467
- # Build mapping: original_col β†’ canonical_col
468
  mapping, canonical_used = {}, set()
469
  for canon, aliases in CANONICAL.items():
470
  for col in df.columns:
471
  if any(str(alias).lower() in col for alias in aliases):
472
- # If multiple cols map to same canonical (e.g., begin/end datetime),
473
- # keep first as canonical, others stay original
474
  if canon not in canonical_used:
475
  mapping[col] = canon
476
  canonical_used.add(canon)
477
  print(f"[canonify] πŸ”€ Mapped '{col}' β†’ canonical '{canon}'")
478
  break
479
-
480
- # Learn new aliases for future improvements
481
  for col in df.columns:
482
  for canon in CANONICAL.keys():
483
  if str(canon).lower() in col and col not in CANONICAL[canon]:
@@ -485,11 +380,10 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
485
  print(f"[canonify] 🧠 Learned new alias: {canon} ← {col}")
486
 
487
  save_dynamic_aliases()
488
-
489
- # Apply mapping but keep ALL columns
490
  renamed = df.rename(columns=mapping)
491
 
492
- # Build final column list: canonicals first (deduped), then originals
493
  final_columns, seen = [], set()
494
  for col in renamed.columns:
495
  if col in CANONICAL.keys():
@@ -501,8 +395,8 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
501
 
502
  df = renamed[final_columns].copy()
503
  print(f"[canonify] βœ… Kept columns: {list(df.columns)}")
504
-
505
- # 5️⃣ TYPE CONVERSIONS (Best Effort)
506
  try:
507
  if "timestamp" in df:
508
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
@@ -514,24 +408,25 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
514
  if col in df:
515
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
516
  except Exception as e:
517
- print(f"[canonify] ⚠️ Type conversion warning (non-critical): {e}")
518
-
519
- # 6️⃣ DETECT ENTITY & INDUSTRY (with worker fallback)
520
  entity_info = poll_for_entity(org_id, source_id)
521
  entity_type = entity_info["entity_type"]
522
 
 
523
  industry_info = poll_for_industry(org_id, source_id)
524
  industry = industry_info["industry"]
525
  industry_confidence = industry_info["confidence"]
526
  print(f"[canonify] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})")
527
-
528
- # 8️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
529
  os.makedirs("./db", exist_ok=True)
530
 
531
  with transactional_conn(org_id) as duck:
532
  ensure_schema_versions_table(duck)
533
 
534
- # 8a) Detect schema changes
535
  current_schema = {col: map_pandas_to_duck(col, df[col]) for col in df.columns}
536
  existing_schema_row = duck.execute("""
537
  SELECT schema_json, version_id FROM main.schema_versions
@@ -546,7 +441,6 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
546
 
547
  version_id = None
548
  if is_new_schema:
549
- # Manual auto-increment for DuckDB 0.10.3 compatibility
550
  version_id = duck.execute("""
551
  INSERT INTO main.schema_versions
552
  (version_id, table_name, schema_json, status)
@@ -555,11 +449,10 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
555
  """, (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0]
556
  print(f"[canonify] πŸ“ Created schema v{version_id} for {entity_type}_canonical")
557
 
558
- # 8b) Ensure table exists with current schema
559
  table_name = ensure_canonical_table(duck, df, entity_type)
560
 
561
- # 8c) Transactional insert
562
- # 8d) Clean and insert data
563
  if not df.empty:
564
  table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
565
  table_cols = [str(r[0]) for r in table_info]
@@ -567,7 +460,6 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
567
  df_to_insert = df[[col for col in df.columns if col in table_cols]]
568
 
569
  if not df_to_insert.empty:
570
- # πŸ”§ CRITICAL: Replace NaN/Infinity with None for JSON compliance
571
  df_to_insert = df_to_insert.replace([np.inf, -np.inf, np.nan], None)
572
 
573
  cols_str = ", ".join(df_to_insert.columns)
@@ -579,7 +471,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
579
  )
580
  print(f"[canonify] πŸ’Ύ Inserted {len(df_to_insert)} rows into {table_name}")
581
 
582
- # 8d) Mark schema as applied post-insert
583
  if is_new_schema and version_id:
584
  try:
585
  duck.execute("""
@@ -589,61 +481,34 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
589
  """, (version_id,))
590
  print(f"[canonify] βœ… Schema v{version_id} marked as applied")
591
  except Exception as e:
592
- print(f"[canonify] ⚠️ Schema update warning (non-critical): {e}")
593
- # At the very end of canonify_df function, line ~470
594
- df = df.replace([np.inf, -np.inf, np.nan], None) # Clean for JSON response
 
595
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
596
  print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms for {org_id}")
597
-
598
- # After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
599
- if not df.empty:
600
- # At the end of the canonify pipeline: ensure Redis keys for entity/industry
601
- # are present (defensive) and nudge workers via stream AFTER commit.
602
- try:
603
- e_key = f"entity:{org_id}:{source_id}"
604
- i_key = f"industry:{org_id}:{source_id}"
605
- entity_payload = json.dumps({"entity_type": entity_type, "confidence": 1.0})
606
- industry_payload = json.dumps({"industry": industry, "confidence": industry_confidence})
607
-
608
- pipe = event_hub.pipeline()
609
- if pipe is not None:
610
- try:
611
- pipe.setex(e_key, 3600, entity_payload)
612
- pipe.setex(i_key, 3600, industry_payload)
613
- # per-source readiness nudge
614
- msg = json.dumps({"org_id": org_id, "source_id": source_id, "status": "ready"})
615
- pipe.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
616
- pipe.execute()
617
- except Exception as e:
618
- print(f"[canonify] ⚠️ Pipeline nudge failed: {e}")
619
- # Fallback to sequential writes
620
- try:
621
- event_hub.setex(e_key, 3600, entity_payload)
622
- event_hub.setex(i_key, 3600, industry_payload)
623
- except Exception as re:
624
- print(f"[canonify] ❌ Redis setex fallback failed: {re}")
625
- else:
626
- # Pipeline not available; write sequentially
627
- try:
628
- event_hub.setex(e_key, 3600, entity_payload)
629
- event_hub.setex(i_key, 3600, industry_payload)
630
- msg = json.dumps({"org_id": org_id, "source_id": source_id, "status": "ready"})
631
- event_hub.redis.xadd(event_hub.stream_key(org_id, source_id), {"message": msg})
632
- except Exception as e:
633
- print(f"[canonify] ❌ Redis nudge failed: {e}")
634
-
635
- # Emit central trigger for worker manager
636
- try:
637
- event_hub.emit_analytics_trigger(org_id, source_id, {
638
- "type": "kpi_compute",
639
- "entity_type": entity_type,
640
- "industry": industry,
641
- "rows_inserted": len(df)
642
- })
643
- print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
644
- except Exception as e:
645
- print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
646
- except Exception as e:
647
- print(f"[canonify] ⚠️ Finalization nudge failed: {e}")
648
 
649
  return df, industry, industry_confidence
 
 
1
  import os
2
  import json
 
3
  import pandas as pd
4
  import numpy as np
5
  from datetime import datetime, timedelta
6
+ from concurrent.futures import ThreadPoolExecutor
 
 
7
  import time
8
+
9
+ from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema_versions_table
10
+ from app.hybrid_entity_detector import hybrid_detect_entity_type
11
  from app.core.event_hub import event_hub
12
 
13
+ # ---------------------- Canonical Schema ---------------------- #
14
  CANONICAL = {
15
  "timestamp": ["timestamp", "date", "sale_date", "created_at"],
16
  "product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
 
24
 
25
  ALIAS_FILE = "./db/alias_memory.json"
26
 
27
+ # Module-level caches
28
+ _ENTITY_CACHE = {}
29
+ _INDUSTRY_CACHE = {}
30
+
31
  def map_pandas_to_duck(col: str, series: pd.Series) -> str:
32
+ """Map pandas dtype to DuckDB type"""
33
  if pd.api.types.is_bool_dtype(series): return "BOOLEAN"
34
  if pd.api.types.is_integer_dtype(series): return "BIGINT"
35
  if pd.api.types.is_float_dtype(series): return "DOUBLE"
36
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
37
  return "VARCHAR"
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  def load_dynamic_aliases() -> None:
40
+ """Load column alias mappings from disk"""
41
  if os.path.exists(ALIAS_FILE):
42
  try:
43
  with open(ALIAS_FILE) as f:
 
48
  else:
49
  CANONICAL[k] = v
50
  except Exception as e:
51
+ print(f"[mapper] ⚠️ Failed to load alias memory: {e}")
52
 
53
  def save_dynamic_aliases() -> None:
54
+ """Save column alias mappings to disk"""
55
  os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
56
  with open(ALIAS_FILE, "w") as f:
57
  json.dump(CANONICAL, f, indent=2)
58
+
59
+ # ==================== ENTITY & INDUSTRY DETECTION ====================
60
+
61
+ def poll_for_entity(org_id: str, source_id: str, timeout: int = 10) -> dict:
62
  """
63
+ Poll Redis for entity detection result.
64
+ Uses cache first, then Redis, then fallback.
65
  """
 
66
  cache_key = (org_id, source_id)
67
+
68
+ # 1. Check cache (zero Redis calls)
69
  if cache_key in _ENTITY_CACHE:
70
  print(f"[poll] πŸ’Ύ CACHE HIT: {cache_key}")
71
  return _ENTITY_CACHE[cache_key]
 
73
  entity_key = f"entity:{org_id}:{source_id}"
74
  print(f"[poll] ⏳ Polling for key: {entity_key}")
75
 
76
+ # 2. Try Redis (immediate)
77
  data = event_hub.get_key(entity_key)
78
  if data:
79
  entity_info = json.loads(data)
80
+ print(f"[poll] βœ… Redis hit: {entity_info['entity_type']}")
81
  _ENTITY_CACHE[cache_key] = entity_info
82
  return entity_info
83
 
84
+ # 3. Sleep briefly
85
+ print("[poll] πŸ”„ First check failed, sleeping 3s...")
86
+ time.sleep(3.0)
87
 
88
+ # 4. Try Redis again
89
  data = event_hub.get_key(entity_key)
90
  if data:
91
  entity_info = json.loads(data)
 
92
  _ENTITY_CACHE[cache_key] = entity_info
93
  return entity_info
94
 
95
+ # 5. Fallback (single DuckDB query for both entity & industry)
96
+ print("[poll] ⚠️ Using combined fallback")
 
 
 
97
  entity_info, industry_info = _fallback_combined(org_id, source_id)
98
+
99
+ # 6. Populate industry cache too (since we have it)
100
+ _INDUSTRY_CACHE[cache_key] = industry_info
101
+
102
  return entity_info
103
 
104
+ def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
105
+ """
106
+ Poll Redis for industry detection result.
107
+ Reuses data from entity poll to avoid duplicate Redis calls.
108
+ """
109
+ cache_key = (org_id, source_id)
110
+
111
+ # 1. Check cache (filled by poll_for_entity)
112
+ if cache_key in _INDUSTRY_CACHE:
113
+ print(f"[poll_industry] πŸ’Ύ CACHE HIT: {cache_key}")
114
+ return _INDUSTRY_CACHE[cache_key]
115
+
116
+ # 2. If cache missed but entity was polled, the fallback already ran
117
+ # So just check Redis one more time
118
+ industry_key = f"industry:{org_id}:{source_id}"
119
+ data = event_hub.get_key(industry_key)
120
+
121
+ if data:
122
+ industry_info = json.loads(data)
123
+ _INDUSTRY_CACHE[cache_key] = industry_info
124
+ return industry_info
125
+
126
+ # 3. Rare: fallback failed to write industry, run emergency fallback
127
+ print("[poll_industry] ⚠️ Cache miss, running emergency fallback")
128
+ industry_info = _fallback_industry_detection(org_id, source_id)
129
+ _INDUSTRY_CACHE[cache_key] = industry_info
130
+
131
+ return industry_info
132
 
133
  def _fallback_combined(org_id: str, source_id: str) -> tuple[dict, dict]:
134
+ """
135
+ SINGLE DuckDB query to detect BOTH entity and industry.
136
+ Writes BOTH keys to Redis atomically.
137
+ Updates caches WITHOUT immediately invalidating them.
 
 
 
138
  """
139
  print(f"[fallback_combined] 🚨 Running combined fallback for {org_id}/{source_id}")
140
+
141
+ # Default values
142
  entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0}
143
  industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
144
+
145
  try:
146
  conn = get_conn(org_id)
147
  rows = conn.execute("""
 
150
  WHERE row_data IS NOT NULL
151
  USING SAMPLE 100
152
  """).fetchall()
153
+
154
  if rows:
155
  parsed = [json.loads(r[0]) for r in rows if r[0]]
156
  df = pd.DataFrame(parsed)
157
  df.columns = [str(col).lower().strip() for col in df.columns]
158
+
159
+ # Parallel detection
160
+ def detect_entity():
 
 
161
  try:
162
  return hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
163
  except Exception as e:
164
+ print(f"[fallback] Entity detection failed: {e}")
165
  return ("UNKNOWN", 0.0, False)
166
+
167
+ def detect_industry():
168
  try:
169
  from app.hybrid_industry_detector import hybrid_detect_industry_type
170
  return hybrid_detect_industry_type(org_id, df, source_id)
171
  except Exception as e:
172
+ print(f"[fallback] Industry detection failed: {e}")
173
  return ("UNKNOWN", 0.0, False)
174
+
175
  with ThreadPoolExecutor(max_workers=2) as ex:
176
+ ent_future = ex.submit(detect_entity)
177
+ ind_future = ex.submit(detect_industry)
178
+
179
+ entity_type, ent_conf, _ = ent_future.result()
180
+ industry, ind_conf, _ = ind_future.result()
181
+
182
+ entity_info = {"entity_type": entity_type, "confidence": ent_conf}
183
+ industry_info = {"industry": industry, "confidence": ind_conf}
184
+
185
+ print(f"[fallback] βœ… Entity: {entity_type} ({ent_conf:.2%}), Industry: {industry} ({ind_conf:.2%})")
186
+
187
  except Exception as e:
188
+ print(f"[fallback_combined] ❌ Failed: {e}")
189
+
190
+ # GUARANTEE: Write to Redis (pipeline for atomicity)
 
 
 
191
  try:
192
+ e_key = f"entity:{org_id}:{source_id}"
193
+ i_key = f"industry:{org_id}:{source_id}"
194
+
195
+ pipe = event_hub.redis.pipeline()
196
+ pipe.setex(e_key, 3600, json.dumps(entity_info))
197
+ pipe.setex(i_key, 3600, json.dumps(industry_info))
198
+ pipe.execute()
199
+
200
+ print(f"[fallback] πŸ’Ύ WRITTEN to Redis: {e_key}, {i_key}")
201
+
202
+ except Exception as re:
203
+ print(f"[fallback] ❌ Redis write failed: {re}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
 
205
+ # Update caches (keep them valid!)
 
 
206
  cache_key = (org_id, source_id)
207
+ _ENTITY_CACHE[cache_key] = entity_info
208
+ _INDUSTRY_CACHE[cache_key] = industry_info
209
 
210
+ return entity_info, industry_info
211
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
  def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
213
  """
214
+ Emergency fallback for industry only (rarely used).
215
+ Should only trigger if combined fallback fails.
 
216
  """
217
+ print(f"[fallback_industry] 🚨 Emergency fallback for {org_id}/{source_id}")
218
 
219
  try:
220
  conn = get_conn(org_id)
 
226
  """).fetchall()
227
 
228
  if not rows:
229
+ print("[fallback_industry] ❌ No data found")
230
+ return {"industry": "UNKNOWN", "confidence": 0.0}
231
+
232
+ parsed = [json.loads(r[0]) for r in rows if r[0]]
233
+ df = pd.DataFrame(parsed)
234
+ df.columns = [str(col).lower().strip() for col in df.columns]
235
+
236
+ from app.hybrid_industry_detector import hybrid_detect_industry_type
237
+ industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
 
 
 
 
 
238
 
239
+ industry_info = {"industry": industry, "confidence": confidence}
240
+ print(f"[fallback_industry] βœ… Detected: {industry} ({confidence:.2%})")
241
+
242
+ # Write to Redis
243
  redis_key = f"industry:{org_id}:{source_id}"
244
  event_hub.setex(redis_key, 3600, json.dumps(industry_info))
245
+ print(f"[fallback_industry] πŸ’Ύ WRITTEN to Redis: {redis_key}")
 
 
 
 
246
 
247
  return industry_info
248
 
249
  except Exception as e:
250
  print(f"[fallback_industry] ❌ Failed: {e}")
251
+ # Even on error, write UNKNOWN
252
  redis_key = f"industry:{org_id}:{source_id}"
253
  event_hub.setex(redis_key, 3600, json.dumps({"industry": "UNKNOWN", "confidence": 0.0}))
254
  return {"industry": "UNKNOWN", "confidence": 0.0}
255
 
256
+ # ==================== ENTITY TABLE CREATION ====================
257
+
258
+ def ensure_canonical_table(duck, df: pd.DataFrame, entity_type: str) -> str:
259
+ """Creates entity-specific table with safe column addition"""
260
+ table_name = f"main.{entity_type}_canonical"
261
+
262
+ # Create base table
263
+ duck.execute(f"""
264
+ CREATE TABLE IF NOT EXISTS {table_name} (
265
+ id UUID DEFAULT uuid(),
266
+ _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
267
+ )
268
+ """)
269
+
270
+ # Get existing columns
271
+ existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
272
+ existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
273
+
274
+ # Add missing columns
275
+ for col in df.columns:
276
+ col_name = str(col).lower().strip()
277
+ if col_name not in existing_cols:
278
+ try:
279
+ dtype = map_pandas_to_duck(col_name, df[col])
280
+ print(f"[mapper] βž• Adding column '{col_name}:{dtype}'")
281
+ duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}")
282
+ except Exception as e:
283
+ print(f"[mapper] ⚠️ Skipping column {col_name}: {e}")
284
+
285
+ return table_name
286
 
287
+ # ==================== MAIN PIPELINE ====================
288
 
289
  def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
290
  """
291
+ ENTERPRISE DATA INGESTION PIPELINE
292
+ Safe, idempotent, and Redis-efficient.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
293
  """
294
  start_time = datetime.now()
295
  print(f"\n[canonify] πŸš€ Starting pipeline for {org_id}/{source_id}")
296
 
297
+ # Load aliases
298
  load_dynamic_aliases()
299
+
300
+ # 1️⃣ FETCH RAW DATA
301
  with get_conn(org_id) as conn:
302
  ensure_raw_table(conn)
 
 
303
  cutoff_time = datetime.now() - timedelta(hours=hours_window)
304
 
305
  try:
 
313
  except Exception as e:
314
  print(f"[canonify] ❌ SQL read error: {e}")
315
  return pd.DataFrame(), "unknown", 0.0
316
+
317
  if not rows:
318
+ print("[canonify] ⚠️ No audit rows found")
319
  return pd.DataFrame(), "unknown", 0.0
320
+
321
+ # 2️⃣ PARSE JSON
322
  parsed, malformed_count = [], 0
323
  for r in rows:
324
  raw = r[0]
325
  if not raw:
326
  malformed_count += 1
327
  continue
328
+
329
  try:
330
  obj = raw if isinstance(raw, (dict, list)) else json.loads(str(raw))
331
  except Exception:
332
  malformed_count += 1
333
  continue
334
+
335
+ # Extract rows from various structures
336
  if isinstance(obj, dict):
337
  if "rows" in obj and isinstance(obj["rows"], list):
338
  parsed.extend(obj["rows"])
 
348
  parsed.extend(obj)
349
  else:
350
  malformed_count += 1
351
+
352
  if malformed_count:
353
  print(f"[canonify] ⚠️ Skipped {malformed_count} malformed rows")
354
  if not parsed:
355
  print("[canonify] ❌ No valid data after parsing")
356
  return pd.DataFrame(), "unknown", 0.0
357
+
358
+ # 3️⃣ NORMALIZE COLUMNS
359
  df = pd.DataFrame(parsed)
360
  df.columns = [str(col).lower().strip() for col in df.columns]
361
  df = df.loc[:, ~df.columns.duplicated()]
362
  print(f"[canonify] πŸ“Š Parsed DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
363
+
364
+ # 4️⃣ MAP TO CANONICAL SCHEMA
 
365
  mapping, canonical_used = {}, set()
366
  for canon, aliases in CANONICAL.items():
367
  for col in df.columns:
368
  if any(str(alias).lower() in col for alias in aliases):
 
 
369
  if canon not in canonical_used:
370
  mapping[col] = canon
371
  canonical_used.add(canon)
372
  print(f"[canonify] πŸ”€ Mapped '{col}' β†’ canonical '{canon}'")
373
  break
374
+
375
+ # Learn new aliases
376
  for col in df.columns:
377
  for canon in CANONICAL.keys():
378
  if str(canon).lower() in col and col not in CANONICAL[canon]:
 
380
  print(f"[canonify] 🧠 Learned new alias: {canon} ← {col}")
381
 
382
  save_dynamic_aliases()
383
+
384
+ # Apply mapping, keep all columns
385
  renamed = df.rename(columns=mapping)
386
 
 
387
  final_columns, seen = [], set()
388
  for col in renamed.columns:
389
  if col in CANONICAL.keys():
 
395
 
396
  df = renamed[final_columns].copy()
397
  print(f"[canonify] βœ… Kept columns: {list(df.columns)}")
398
+
399
+ # 5️⃣ TYPE CONVERSIONS
400
  try:
401
  if "timestamp" in df:
402
  df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
 
408
  if col in df:
409
  df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
410
  except Exception as e:
411
+ print(f"[canonify] ⚠️ Type conversion warning: {e}")
412
+
413
+ # 6️⃣ DETECT ENTITY & INDUSTRY
414
  entity_info = poll_for_entity(org_id, source_id)
415
  entity_type = entity_info["entity_type"]
416
 
417
+ # Industry is fetched from cache filled by poll_for_entity
418
  industry_info = poll_for_industry(org_id, source_id)
419
  industry = industry_info["industry"]
420
  industry_confidence = industry_info["confidence"]
421
  print(f"[canonify] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})")
422
+
423
+ # 7️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
424
  os.makedirs("./db", exist_ok=True)
425
 
426
  with transactional_conn(org_id) as duck:
427
  ensure_schema_versions_table(duck)
428
 
429
+ # Detect schema changes
430
  current_schema = {col: map_pandas_to_duck(col, df[col]) for col in df.columns}
431
  existing_schema_row = duck.execute("""
432
  SELECT schema_json, version_id FROM main.schema_versions
 
441
 
442
  version_id = None
443
  if is_new_schema:
 
444
  version_id = duck.execute("""
445
  INSERT INTO main.schema_versions
446
  (version_id, table_name, schema_json, status)
 
449
  """, (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0]
450
  print(f"[canonify] πŸ“ Created schema v{version_id} for {entity_type}_canonical")
451
 
452
+ # Ensure table exists
453
  table_name = ensure_canonical_table(duck, df, entity_type)
454
 
455
+ # Insert data
 
456
  if not df.empty:
457
  table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
458
  table_cols = [str(r[0]) for r in table_info]
 
460
  df_to_insert = df[[col for col in df.columns if col in table_cols]]
461
 
462
  if not df_to_insert.empty:
 
463
  df_to_insert = df_to_insert.replace([np.inf, -np.inf, np.nan], None)
464
 
465
  cols_str = ", ".join(df_to_insert.columns)
 
471
  )
472
  print(f"[canonify] πŸ’Ύ Inserted {len(df_to_insert)} rows into {table_name}")
473
 
474
+ # Mark schema as applied
475
  if is_new_schema and version_id:
476
  try:
477
  duck.execute("""
 
481
  """, (version_id,))
482
  print(f"[canonify] βœ… Schema v{version_id} marked as applied")
483
  except Exception as e:
484
+ print(f"[canonify] ⚠️ Schema update warning: {e}")
485
+
486
+ # 8️⃣ FINAL: Clean DataFrame for response
487
+ df = df.replace([np.inf, -np.inf, np.nan], None)
488
  duration_ms = (datetime.now() - start_time).total_seconds() * 1000
489
  print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms for {org_id}")
490
+
491
+ # 9️⃣ SINGLE, SAFE WORKER TRIGGER (idempotent)
492
+ try:
493
+ # Defensive: ensure keys exist (they should from poll_for_entity)
494
+ e_key = f"entity:{org_id}:{source_id}"
495
+ i_key = f"industry:{org_id}:{source_id}"
496
+
497
+ if not event_hub.exists(e_key) or not event_hub.exists(i_key):
498
+ print(f"[canonify] ⚠️ Keys missing, running fallback to ensure")
499
+ _fallback_combined(org_id, source_id)
500
+
501
+ # 🎯 ONE trigger message to worker manager
502
+ event_hub.emit_analytics_trigger(org_id, source_id, {
503
+ "type": "kpi_compute",
504
+ "entity_type": entity_type,
505
+ "industry": industry,
506
+ "rows_inserted": len(df),
507
+ "timestamp": datetime.now().isoformat()
508
+ })
509
+ print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
510
+
511
+ except Exception as e:
512
+ print(f"[canonify] ⚠️ Analytics trigger failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
513
 
514
  return df, industry, industry_confidence
app/tasks/analytics_worker.py CHANGED
@@ -1,29 +1,39 @@
1
- # app/tasks/analytics_worker.py
2
  import asyncio
3
  import json
 
 
 
 
 
 
4
  import pandas as pd
5
  import logging
6
- from datetime import datetime,timedelta
7
- from typing import Dict, Any
8
- import time
9
 
10
  from app.core.event_hub import event_hub
11
  from app.db import get_conn
12
- from app.schemas.org_schema import OrgSchema # AI schema mapper
13
- from app.service.column_embedding_service import ColumnEmbeddingService # Vector engine
14
- from app.service.vector_service import VectorService # AI query storage
15
  from app.engine.kpi_calculators.registry import get_kpi_calculator
16
- from app.service.embedding_service import EmbeddingService # HF API fallback
17
 
18
- logging.basicConfig(level=logging.INFO)
 
 
 
 
19
  logger = logging.getLogger(__name__)
20
 
 
 
 
21
 
22
  class AnalyticsWorker:
23
  """
24
  🧠+πŸš€ Hybrid: Deep reasoning + Async efficiency
25
- - Solves column mapping for any data shape
26
- - Non-blocking, cached, zero downtime
 
27
  """
28
 
29
  def __init__(self, org_id: str, source_id: str, hours_window: int = 24):
@@ -32,122 +42,208 @@ class AnalyticsWorker:
32
  self.hours_window = hours_window
33
 
34
  # Core engines
35
- self.schema = OrgSchema(org_id) # AI-powered schema resolver
36
- self.col_embedder = ColumnEmbeddingService() # For column mapping
37
- self.txn_embedder = EmbeddingService() # For transaction embeddings
38
- self.vector_service = VectorService(org_id) # For AI queries
 
 
 
 
 
 
 
39
 
40
- self.computed_at = None
41
- self._entity_type = None
42
 
43
  async def run(self) -> Dict[str, Any]:
44
  """
45
  🎯 THE ENGINE - Zero gaps, pure flow
46
 
47
- 1. Load data from DuckDB (wait for table)
48
- 2. Discover column mapping (AI, cached)
49
- 3. Alias columns for KPI calculator
50
- 4. Embed transactions (async, for AI queries)
51
- 5. Compute KPIs (industry-aware)
52
- 6. Publish to Redis (UI + AI channels)
53
- 7. Cache results (5 min)
 
 
 
54
  """
55
  start_time = datetime.now()
56
- logger.info(f"\n[WORKER] πŸš€ STARTING {self.org_id}/{self.source_id}")
57
- # 🎯 NEW: Wait for entity/industry keys to exist
58
- await self._wait_for_entity_and_industry()
 
 
 
 
 
 
 
 
 
59
  try:
60
- # 1️⃣ LOAD DATA (handles missing tables)
 
 
 
 
 
61
  df = await self._load_dataframe()
62
  if df.empty:
63
  await self._publish_status("error", "No data")
64
- return {"error": "No data"}
65
 
66
  logger.info(f"[WORKER] πŸ“Š Loaded {len(df)} rows Γ— {len(df.columns)} cols")
67
 
68
-
69
- # Fast from cache (~0ms), slow on first run (~30s)
70
  mapping = await self._discover_schema(df)
71
  if not mapping:
72
  await self._publish_status("error", "Schema discovery failed")
73
- return {"error": "No schema mapping"}
74
 
75
- logger.info(f"[WORKER] πŸ”€ Mapping: {list(mapping.items())[:5]}...") # Log first 5
76
 
77
- # 3️⃣ ALIAS COLUMNS (clean code)
78
  df = self._alias_columns(df, mapping)
79
 
80
- # 4️⃣ EMBED TRANSACTIONS (Elon's rocket - async)
81
- # Does NOT block KPI computation
82
  embed_task = asyncio.create_task(
83
- self._embed_transactions(df.head(1000)), # Top 1000 for performance
84
- name=f"embed-{self.org_id}"
85
  )
86
 
87
- # 5️⃣ COMPUTE KPIs (industry-aware)
88
  industry = await self._get_industry()
89
  calculator = get_kpi_calculator(industry, self.org_id, df, self.source_id)
90
-
91
- # Run CPU-heavy work in thread pool
92
  results = await asyncio.to_thread(calculator.compute_all)
93
- self.computed_at = datetime.now()
94
 
95
- logger.info(f"[WORKER] βœ… KPIs computed in {(self.computed_at - start_time).total_seconds():.2f}s")
 
 
96
 
97
- # 6️⃣ PUBLISH TO REDIS (multiple channels)
98
  await self._publish(results)
99
 
100
- # 7️⃣ CACHE (5 min TTL)
101
- self._cache(results)
 
 
 
102
 
103
- # Wait for embeddings (non-critical)
104
  try:
105
  await asyncio.wait_for(embed_task, timeout=30)
106
  logger.info("[WORKER] βœ… Embeddings completed")
107
  except asyncio.TimeoutError:
108
  logger.warning("[WORKER] ⚠️ Embedding timeout, but KPIs published")
109
 
110
- duration = (self.computed_at - start_time).total_seconds()
111
- logger.info(f"[WORKER] 🎯 COMPLETE: {duration:.2f}s for {self.org_id}")
112
-
113
  return results
114
 
115
  except Exception as e:
116
  logger.error(f"[WORKER] ❌ CRITICAL: {e}", exc_info=True)
117
  await self._publish_status("error", str(e))
118
- return {"error": str(e)}
 
 
 
 
119
 
120
- # ==================== INTERNAL METHODS ====================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
  async def _load_dataframe(self) -> pd.DataFrame:
123
- """🐒 Sync load with table readiness check"""
124
  return await asyncio.to_thread(self._sync_load_dataframe)
125
 
126
  def _sync_load_dataframe(self) -> pd.DataFrame:
127
- """Waits up to 30s for table + data"""
128
  conn = None
129
  MAX_WAIT = 30
130
- RETRY_INTERVAL = 2
131
 
132
  try:
133
- # Get entity type from hub-backed Redis
134
- entity_key = f"entity:{self.org_id}:{self.source_id}"
135
- entity_info = event_hub.get_key(entity_key)
136
-
137
  if not entity_info:
138
- logger.warning(f"[LOAD] No entity info: {entity_key}")
139
  return pd.DataFrame()
140
 
141
- self._entity_type = json.loads(entity_info)["entity_type"]
142
  table_name = f"main.{self._entity_type}_canonical"
143
  cutoff = datetime.now() - timedelta(hours=self.hours_window)
144
 
145
  conn = get_conn(self.org_id)
 
146
 
147
- # Wait for table + data
148
  start = time.time()
149
  while (time.time() - start) < MAX_WAIT:
150
  try:
 
151
  count = conn.execute(
152
  f"SELECT COUNT(*) FROM {table_name} WHERE timestamp >= ?",
153
  [cutoff]
@@ -156,19 +252,23 @@ class AnalyticsWorker:
156
  if count > 0:
157
  logger.info(f"[LOAD] Table ready: {count} rows (waited {(time.time() - start):.1f}s)")
158
  break
159
- logger.info(f"[LOAD] Table empty (waited {(time.time() - start):.1f}s)")
 
 
160
  except Exception as e:
161
  if "does not exist" in str(e).lower():
162
- logger.info(f"[LOAD] Table doesn't exist (waited {(time.time() - start):.1f}s)")
163
  else:
164
  logger.warning(f"[LOAD] Error: {e}")
165
 
166
- time.sleep(RETRY_INTERVAL)
 
 
167
  else:
168
  logger.error(f"[LOAD] Timeout after {MAX_WAIT}s")
169
  return pd.DataFrame()
170
 
171
- # Load data
172
  df = conn.execute(
173
  f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC",
174
  [cutoff]
@@ -188,20 +288,95 @@ class AnalyticsWorker:
188
  except:
189
  pass
190
 
191
- async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
  """
193
- 🧠 Einstein's discovery engine
194
- Pattern β†’ Vector β†’ LLM (3-tier)
195
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
  try:
197
- # Fast: Redis cache (via hub)
198
  cache_key = f"schema:mapping:{self.org_id}"
 
 
199
  if cached := event_hub.get_key(cache_key):
200
- logger.info("[SCHEMA] Cache hit")
201
  return json.loads(cached)
202
 
203
  # Slow: AI discovery
204
- logger.info("[SCHEMA] Cache miss, discovering...")
205
  mapping = self.schema.get_mapping()
206
 
207
  if not mapping:
@@ -210,38 +385,39 @@ class AnalyticsWorker:
210
 
211
  # Cache for 24h
212
  event_hub.setex(cache_key, 86400, json.dumps(mapping))
213
- logger.info(f"[SCHEMA] Discovered {len(mapping)} mappings")
214
 
215
  return mapping
216
 
217
  except Exception as e:
218
- logger.error(f"[SCHEMA] Discovery failed: {e}", exc_info=True)
219
  return {}
220
 
221
  def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
222
- """πŸ”€ Renames actual columns to semantic names"""
223
  try:
224
- rename_map = {actual: semantic for semantic, actual in mapping.items() if actual in df.columns}
 
 
 
 
225
 
226
  if not rename_map:
227
  logger.warning("[ALIAS] No columns to alias")
228
  return df
229
 
230
- logger.info(f"[ALIAS] Renaming {len(rename_map)} columns: {rename_map}")
231
  return df.rename(columns=rename_map)
232
 
233
  except Exception as e:
234
- logger.error(f"[ALIAS] Error: {e}")
235
  return df
236
 
237
  async def _embed_transactions(self, df: pd.DataFrame):
238
- """
239
- πŸš€ Elon's vector engine: Embeds for AI queries
240
- Non-critical, runs async
241
- """
242
  try:
243
  if df.empty:
244
- logger.warning("[EMBED] No data")
245
  return
246
 
247
  # Build semantic texts
@@ -268,10 +444,10 @@ class AnalyticsWorker:
268
  })
269
 
270
  if not texts:
271
- logger.warning("[EMBED] No valid texts")
272
  return
273
 
274
- # Generate embeddings (HF API or local)
275
  logger.info(f"[EMBED] Generating {len(texts)} embeddings...")
276
  embeddings = []
277
 
@@ -293,126 +469,257 @@ class AnalyticsWorker:
293
  logger.info(f"[EMBED] βœ… Stored {len(embeddings)} vectors")
294
 
295
  except Exception as e:
296
- logger.error(f"[EMBED] Failed: {e}", exc_info=True)
297
  # Non-critical - don't raise
298
 
299
- async def _get_industry(self) -> str:
300
- """Get industry from Redis"""
 
 
301
  try:
302
- key = f"industry:{self.org_id}:{self.source_id}"
303
- if data := event_hub.get_key(key):
304
- return json.loads(data).get("industry", "supermarket").lower()
305
- return "supermarket"
306
- except:
307
- return "supermarket"
308
- async def _wait_for_entity_and_industry(self):
309
- """Block until entity and industry are detected (max 30s)"""
310
- max_wait = 30
311
- start = time.time()
312
-
313
- while (time.time() - start) < max_wait:
314
- entity_key = f"entity:{self.org_id}:{self.source_id}"
315
- industry_key = f"industry:{self.org_id}:{self.source_id}"
316
-
317
- try:
318
- # If in-memory cache exists but Redis does not, invalidate cache
319
- from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
320
-
321
- cache_ent = _ENTITY_CACHE.get((self.org_id, self.source_id))
322
- cache_ind = _INDUSTRY_CACHE.get((self.org_id, self.source_id))
323
-
324
- ent_exists = event_hub.exists(entity_key)
325
- ind_exists = event_hub.exists(industry_key)
326
-
327
- if cache_ent and not ent_exists:
328
- _ENTITY_CACHE.pop((self.org_id, self.source_id), None)
329
- logger.debug(f"[WORKER] Cleared stale _ENTITY_CACHE for {self.org_id}/{self.source_id}")
330
- if cache_ind and not ind_exists:
331
- _INDUSTRY_CACHE.pop((self.org_id, self.source_id), None)
332
- logger.debug(f"[WORKER] Cleared stale _INDUSTRY_CACHE for {self.org_id}/{self.source_id}")
333
-
334
- if ent_exists and ind_exists:
335
- logger.info("[WORKER] βœ… Entity & industry keys found")
336
- return
337
-
338
- except Exception as e:
339
- logger.debug(f"[WORKER] Redis/cache check error: {e}")
340
-
341
- logger.info("[WORKER] ⏳ Waiting for entity/industry keys...")
342
- await asyncio.sleep(2)
343
-
344
- logger.warning("[WORKER] ⚠️ Timeout waiting for keys, proceeding anyway")
345
- # Change _publish() method to use streams
346
- async def _publish(self, results: Dict[str, Any]):
347
- try:
348
- ts = self.computed_at.isoformat() if self.computed_at else datetime.now().isoformat()
349
-
350
- # Publish via central hub (streams + structured messages)
351
- event_hub.emit_kpi_update(self.org_id, self.source_id, {
352
- "data": results,
353
- "rows": results.get("metadata", {}).get("rows_analyzed", 0),
354
- "timestamp": ts
355
- })
356
-
357
- # Publish insights
358
- for alert in results.get("predictive", {}).get("alerts", []):
359
- event_hub.emit_insight(self.org_id, self.source_id, alert)
360
-
361
- logger.info(f"[PUBLISH] πŸ“€ Sent to stream for {self.org_id}/{self.source_id}")
362
-
363
- except Exception as e:
364
- logger.error(f"[PUBLISH] Error: {e}", exc_info=True)
365
 
366
- async def _publish_status(self, status: str, message: str = ""):
367
- """Publish status"""
368
  try:
369
- event_hub.emit_status(self.org_id, self.source_id, status, message)
 
 
370
  except Exception as e:
371
- logger.error(f"[STATUS] Error: {e}")
372
 
373
- def _cache(self, results: Dict[str, Any]):
374
- """Cache for 5 min"""
375
  try:
376
- event_hub.setex(f"kpi_cache:{self.org_id}:{self.source_id}", 300, json.dumps(results))
377
- logger.debug("[CACHE] Cached results")
 
 
 
 
 
 
 
 
 
378
  except Exception as e:
379
- logger.warning(f"[CACHE] Error: {e}")
380
 
381
 
382
- # ---- Redis Listener (The Glue) ---- #
383
- async def redis_listener():
 
384
  """
385
- 🎧 Runs forever, triggers workers on Redis messages
386
- Start this with: `asyncio.create_task(redis_listener())` in main.py
387
  """
388
- pubsub = event_hub.redis.pubsub()
389
- pubsub.psubscribe("analytics_trigger:*")
390
 
391
- logger.info("🎧 Redis listener active - Einstein+Elon mode ENGAGED")
 
 
392
 
393
- async for message in pubsub.listen():
394
- if message["type"] == "pmessage":
395
- try:
396
- trigger = json.loads(message["data"])
397
- logger.info(f"πŸ“‘ Received: {trigger}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
398
 
399
- # Non-blocking worker spawn
400
- worker = AnalyticsWorker(
401
- trigger["org_id"],
402
- trigger["source_id"]
403
- )
404
- asyncio.create_task(worker.run())
405
 
406
- except Exception as e:
407
- logger.error(f"Listener error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
408
 
409
 
410
- # ---- FastAPI Integration ---- #
411
  async def trigger_kpi_computation(org_id: str, source_id: str):
412
- """Trigger the worker via Redis pubsub"""
 
 
 
413
  try:
414
- # Use the hub which writes both to pubsub and to a small stream
415
- event_hub.emit_analytics_trigger(org_id, source_id)
 
 
 
 
 
 
 
416
  logger.info(f"🎯 Triggered KPI computation: {org_id}/{source_id}")
 
 
417
  except Exception as e:
418
- logger.error(f"Trigger failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import asyncio
2
  import json
3
+ import os
4
+ import time
5
+ from asyncio import Lock
6
+ from datetime import datetime, timedelta
7
+ from typing import Dict, Any, Optional
8
+
9
  import pandas as pd
10
  import logging
 
 
 
11
 
12
  from app.core.event_hub import event_hub
13
  from app.db import get_conn
14
+ from app.schemas.org_schema import OrgSchema
15
+ from app.service.column_embedding_service import ColumnEmbeddingService
16
+ from app.service.vector_service import VectorService
17
  from app.engine.kpi_calculators.registry import get_kpi_calculator
18
+ from app.service.embedding_service import EmbeddingService
19
 
20
+ # Configure logging with request context
21
+ logging.basicConfig(
22
+ level=logging.INFO,
23
+ format='%(asctime)s | %(levelname)s | [%(name)s] %(message)s'
24
+ )
25
  logger = logging.getLogger(__name__)
26
 
27
+ # Global lock registry to prevent duplicate workers per org/source
28
+ _WORKER_LOCKS: Dict[str, Lock] = {}
29
+
30
 
31
  class AnalyticsWorker:
32
  """
33
  🧠+πŸš€ Hybrid: Deep reasoning + Async efficiency
34
+ - Solves column mapping for any data shape
35
+ - Non-blocking, cached, zero downtime
36
+ - Deduplication guaranteed via Redis + in-process locks
37
  """
38
 
39
  def __init__(self, org_id: str, source_id: str, hours_window: int = 24):
 
42
  self.hours_window = hours_window
43
 
44
  # Core engines
45
+ self.schema = OrgSchema(org_id)
46
+ self.col_embedder = ColumnEmbeddingService()
47
+ self.txn_embedder = EmbeddingService()
48
+ self.vector_service = VectorService(org_id)
49
+
50
+ self.computed_at: Optional[datetime] = None
51
+ self._entity_type: Optional[str] = None
52
+
53
+ # Deduplication keys
54
+ self.lock_key = f"worker:lock:{org_id}:{source_id}"
55
+ self.processed_key = f"worker:processed:{org_id}:{source_id}"
56
 
57
+ # Get or create in-process lock for this org/source pair
58
+ self._process_lock = _WORKER_LOCKS.setdefault(self.lock_key, Lock())
59
 
60
  async def run(self) -> Dict[str, Any]:
61
  """
62
  🎯 THE ENGINE - Zero gaps, pure flow
63
 
64
+ 1. Acquire distributed lock (prevent duplicates across workers)
65
+ 2. Wait for entity/industry keys with exponential backoff
66
+ 3. Load data from DuckDB
67
+ 4. Discover column mapping (AI, cached)
68
+ 5. Alias columns for KPI calculator
69
+ 6. Embed transactions (async, non-blocking)
70
+ 7. Compute KPIs (industry-aware)
71
+ 8. Publish to Redis (UI + AI channels)
72
+ 9. Cache results (5 min)
73
+ 10. Release lock & cleanup
74
  """
75
  start_time = datetime.now()
76
+ worker_id = f"{self.org_id}/{self.source_id}"
77
+
78
+ # 🎯 STEP 0: Check if already processed recently (idempotency)
79
+ if await self._is_already_processed():
80
+ logger.warning(f"[WORKER] ⚠️ Already processed {worker_id} in last 5min, skipping")
81
+ return {"status": "skipped", "reason": "already_processed"}
82
+
83
+ # 🎯 STEP 1: Acquire distributed lock (Redis + in-process)
84
+ if not await self._acquire_lock():
85
+ logger.warning(f"[WORKER] ❌ Lock not acquired for {worker_id}")
86
+ return {"status": "skipped", "reason": "lock_failed"}
87
+
88
  try:
89
+ logger.info(f"\n[WORKER] πŸš€ STARTING {worker_id}")
90
+
91
+ # 🎯 STEP 2: Wait for entity/industry keys (exponential backoff)
92
+ await self._wait_for_entity_and_industry()
93
+
94
+ # 🎯 STEP 3: Load data with retry logic
95
  df = await self._load_dataframe()
96
  if df.empty:
97
  await self._publish_status("error", "No data")
98
+ return {"status": "error", "reason": "no_data"}
99
 
100
  logger.info(f"[WORKER] πŸ“Š Loaded {len(df)} rows Γ— {len(df.columns)} cols")
101
 
102
+ # 🎯 STEP 4: Schema discovery (cached)
 
103
  mapping = await self._discover_schema(df)
104
  if not mapping:
105
  await self._publish_status("error", "Schema discovery failed")
106
+ return {"status": "error", "reason": "no_schema"}
107
 
108
+ logger.info(f"[WORKER] πŸ”€ Mapping: {list(mapping.items())[:5]}...")
109
 
110
+ # 🎯 STEP 5: Alias columns
111
  df = self._alias_columns(df, mapping)
112
 
113
+ # 🎯 STEP 6: Embed transactions (fire-and-forget, non-blocking)
 
114
  embed_task = asyncio.create_task(
115
+ self._embed_transactions(df.head(1000)),
116
+ name=f"embed-{self.org_id}-{self.source_id}"
117
  )
118
 
119
+ # 🎯 STEP 7: Compute KPIs (CPU-bound, run in thread pool)
120
  industry = await self._get_industry()
121
  calculator = get_kpi_calculator(industry, self.org_id, df, self.source_id)
 
 
122
  results = await asyncio.to_thread(calculator.compute_all)
 
123
 
124
+ self.computed_at = datetime.now()
125
+ duration = (self.computed_at - start_time).total_seconds()
126
+ logger.info(f"[WORKER] βœ… KPIs computed in {duration:.2f}s")
127
 
128
+ # 🎯 STEP 8: Publish results (atomic pipeline)
129
  await self._publish(results)
130
 
131
+ # 🎯 STEP 9: Cache with TTL
132
+ await self._cache_results(results)
133
+
134
+ # 🎯 STEP 10: Mark as processed (idempotency)
135
+ await self._mark_processed()
136
 
137
+ # Wait for embeddings (30s timeout, non-critical)
138
  try:
139
  await asyncio.wait_for(embed_task, timeout=30)
140
  logger.info("[WORKER] βœ… Embeddings completed")
141
  except asyncio.TimeoutError:
142
  logger.warning("[WORKER] ⚠️ Embedding timeout, but KPIs published")
143
 
144
+ logger.info(f"[WORKER] 🎯 COMPLETE: {worker_id} in {duration:.2f}s")
 
 
145
  return results
146
 
147
  except Exception as e:
148
  logger.error(f"[WORKER] ❌ CRITICAL: {e}", exc_info=True)
149
  await self._publish_status("error", str(e))
150
+ return {"status": "error", "reason": str(e)}
151
+
152
+ finally:
153
+ # 🎯 STEP 11: ALWAYS release lock
154
+ await self._release_lock()
155
 
156
+ # ==================== DEDUPLICATION & LOCKING ====================
157
+
158
+ async def _is_already_processed(self) -> bool:
159
+ """Check if this job was processed in last 5 minutes"""
160
+ try:
161
+ # Use Redis TTL to track processed jobs
162
+ return event_hub.redis.exists(self.processed_key)
163
+ except Exception as e:
164
+ logger.error(f"[LOCK] Error checking processed key: {e}")
165
+ return False
166
+
167
+ async def _acquire_lock(self) -> bool:
168
+ """
169
+ Acquire distributed lock using Redis SETNX + in-process lock.
170
+ Returns True if lock acquired, False otherwise.
171
+ """
172
+ try:
173
+ # Try Redis SETNX (set if not exists)
174
+ lock_acquired = event_hub.redis.setnx(self.lock_key, "1")
175
+ if not lock_acquired:
176
+ return False
177
+
178
+ # Set expiry (safety for crashed workers)
179
+ event_hub.redis.expire(self.lock_key, 300) # 5 minute max runtime
180
+
181
+ # Also acquire in-process lock (prevents same-process duplicates)
182
+ acquired = await asyncio.wait_for(self._process_lock.acquire(), timeout=1.0)
183
+ if not acquired:
184
+ # Release Redis lock if in-process lock fails
185
+ event_hub.redis.delete(self.lock_key)
186
+ return False
187
+
188
+ logger.info(f"[LOCK] βœ… Acquired for {self.lock_key}")
189
+ return True
190
+
191
+ except Exception as e:
192
+ logger.error(f"[LOCK] Failed to acquire: {e}")
193
+ return False
194
+
195
+ async def _release_lock(self):
196
+ """Release both Redis and in-process locks"""
197
+ try:
198
+ # Release in-process lock
199
+ if self._process_lock.locked():
200
+ self._process_lock.release()
201
+
202
+ # Release Redis lock
203
+ event_hub.redis.delete(self.lock_key)
204
+ logger.info(f"[LOCK] πŸ”“ Released for {self.lock_key}")
205
+
206
+ except Exception as e:
207
+ logger.error(f"[LOCK] Error releasing: {e}")
208
+
209
+ async def _mark_processed(self):
210
+ """Mark this job as processed (TTL 5 minutes)"""
211
+ try:
212
+ event_hub.redis.setex(self.processed_key, 300, "1")
213
+ except Exception as e:
214
+ logger.error(f"[LOCK] Failed to mark processed: {e}")
215
+
216
+ # ==================== DATA LOADING ====================
217
 
218
  async def _load_dataframe(self) -> pd.DataFrame:
219
+ """Async wrapper for sync data loading"""
220
  return await asyncio.to_thread(self._sync_load_dataframe)
221
 
222
  def _sync_load_dataframe(self) -> pd.DataFrame:
223
+ """Wait for table + data with exponential backoff"""
224
  conn = None
225
  MAX_WAIT = 30
226
+ INITIAL_RETRY = 0.5
227
 
228
  try:
229
+ # Get entity type from Redis
230
+ entity_info = self._get_entity_info()
 
 
231
  if not entity_info:
232
+ logger.error(f"[LOAD] No entity info for {self.org_id}/{self.source_id}")
233
  return pd.DataFrame()
234
 
235
+ self._entity_type = entity_info["entity_type"]
236
  table_name = f"main.{self._entity_type}_canonical"
237
  cutoff = datetime.now() - timedelta(hours=self.hours_window)
238
 
239
  conn = get_conn(self.org_id)
240
+ retry_delay = INITIAL_RETRY
241
 
242
+ # Exponential backoff wait
243
  start = time.time()
244
  while (time.time() - start) < MAX_WAIT:
245
  try:
246
+ # Check if table exists and has data
247
  count = conn.execute(
248
  f"SELECT COUNT(*) FROM {table_name} WHERE timestamp >= ?",
249
  [cutoff]
 
252
  if count > 0:
253
  logger.info(f"[LOAD] Table ready: {count} rows (waited {(time.time() - start):.1f}s)")
254
  break
255
+
256
+ logger.debug(f"[LOAD] Table empty, retrying in {retry_delay}s...")
257
+
258
  except Exception as e:
259
  if "does not exist" in str(e).lower():
260
+ logger.debug(f"[LOAD] Table doesn't exist yet, retrying...")
261
  else:
262
  logger.warning(f"[LOAD] Error: {e}")
263
 
264
+ time.sleep(retry_delay)
265
+ retry_delay = min(retry_delay * 1.5, 5.0) # Cap at 5s
266
+
267
  else:
268
  logger.error(f"[LOAD] Timeout after {MAX_WAIT}s")
269
  return pd.DataFrame()
270
 
271
+ # Load the data
272
  df = conn.execute(
273
  f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC",
274
  [cutoff]
 
288
  except:
289
  pass
290
 
291
+ def _get_entity_info(self) -> Optional[Dict[str, Any]]:
292
+ """Get entity info from Redis with cache invalidation"""
293
+ try:
294
+ from app.mapper import _ENTITY_CACHE
295
+ cache_key = (self.org_id, self.source_id)
296
+
297
+ entity_key = f"entity:{self.org_id}:{self.source_id}"
298
+ data = event_hub.get_key(entity_key)
299
+
300
+ if data:
301
+ entity_info = json.loads(data)
302
+ # Update cache but keep it fresh
303
+ _ENTITY_CACHE[cache_key] = entity_info
304
+ return entity_info
305
+
306
+ # Cache miss or stale: invalidate
307
+ _ENTITY_CACHE.pop(cache_key, None)
308
+ return None
309
+
310
+ except Exception as e:
311
+ logger.error(f"[ENTITY] Error: {e}")
312
+ return None
313
+
314
+ # ==================== ENTITY/INDUSTRY WAITING ====================
315
+
316
+ async def _wait_for_entity_and_industry(self):
317
  """
318
+ Wait for entity and industry keys with exponential backoff.
319
+ Also handles cache invalidation if Redis shows different data.
320
  """
321
+ MAX_WAIT = 30.0
322
+ INITIAL_DELAY = 0.5
323
+ MAX_DELAY = 5.0
324
+
325
+ entity_key = f"entity:{self.org_id}:{self.source_id}"
326
+ industry_key = f"industry:{self.org_id}:{self.source_id}"
327
+
328
+ delay = INITIAL_DELAY
329
+ start = time.time()
330
+
331
+ while (time.time() - start) < MAX_WAIT:
332
+ try:
333
+ # Check Redis directly (source of truth)
334
+ ent_exists = event_hub.exists(entity_key)
335
+ ind_exists = event_hub.exists(industry_key)
336
+
337
+ # If both exist, validate cache consistency
338
+ if ent_exists and ind_exists:
339
+ from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
340
+ cache_key = (self.org_id, self.source_id)
341
+
342
+ # Invalidate cache if Redis has data but cache doesn't
343
+ if cache_key not in _ENTITY_CACHE:
344
+ data = event_hub.get_key(entity_key)
345
+ if data:
346
+ _ENTITY_CACHE[cache_key] = json.loads(data)
347
+
348
+ if cache_key not in _INDUSTRY_CACHE:
349
+ data = event_hub.get_key(industry_key)
350
+ if data:
351
+ _INDUSTRY_CACHE[cache_key] = json.loads(data)
352
+
353
+ logger.info("[WORKER] βœ… Entity & industry keys found and validated")
354
+ return
355
+
356
+ logger.info(f"[WORKER] ⏳ Waiting for keys (entity={ent_exists}, industry={ind_exists})...")
357
+
358
+ except Exception as e:
359
+ logger.debug(f"[WORKER] Redis check error: {e}")
360
+
361
+ await asyncio.sleep(delay)
362
+ delay = min(delay * 1.5, MAX_DELAY)
363
+
364
+ logger.warning(f"[WORKER] ⚠️ Timeout waiting for keys after {MAX_WAIT}s, proceeding anyway")
365
+
366
+ # ==================== SCHEMA & EMBEDDING ====================
367
+
368
+ async def _discover_schema(self, df: pd.DataFrame) -> Dict[str, str]:
369
+ """🧠 Einstein's discovery engine with caching"""
370
  try:
 
371
  cache_key = f"schema:mapping:{self.org_id}"
372
+
373
+ # Fast: Redis cache
374
  if cached := event_hub.get_key(cache_key):
375
+ logger.info("[SCHEMA] πŸ’Ύ Cache hit")
376
  return json.loads(cached)
377
 
378
  # Slow: AI discovery
379
+ logger.info("[SCHEMA] 🧠 Cache miss, discovering...")
380
  mapping = self.schema.get_mapping()
381
 
382
  if not mapping:
 
385
 
386
  # Cache for 24h
387
  event_hub.setex(cache_key, 86400, json.dumps(mapping))
388
+ logger.info(f"[SCHEMA] βœ… Discovered {len(mapping)} mappings")
389
 
390
  return mapping
391
 
392
  except Exception as e:
393
+ logger.error(f"[SCHEMA] ❌ Discovery failed: {e}", exc_info=True)
394
  return {}
395
 
396
  def _alias_columns(self, df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
397
+ """πŸ”€ Renames columns to semantic names"""
398
  try:
399
+ rename_map = {
400
+ actual: semantic
401
+ for semantic, actual in mapping.items()
402
+ if actual in df.columns
403
+ }
404
 
405
  if not rename_map:
406
  logger.warning("[ALIAS] No columns to alias")
407
  return df
408
 
409
+ logger.info(f"[ALIAS] πŸ”€ Renaming {len(rename_map)} columns")
410
  return df.rename(columns=rename_map)
411
 
412
  except Exception as e:
413
+ logger.error(f"[ALIAS] ❌ Error: {e}", exc_info=True)
414
  return df
415
 
416
  async def _embed_transactions(self, df: pd.DataFrame):
417
+ """πŸš€ Elon's vector engine (fire-and-forget)"""
 
 
 
418
  try:
419
  if df.empty:
420
+ logger.warning("[EMBED] No data to embed")
421
  return
422
 
423
  # Build semantic texts
 
444
  })
445
 
446
  if not texts:
447
+ logger.warning("[EMBED] No valid texts generated")
448
  return
449
 
450
+ # Generate embeddings in batches
451
  logger.info(f"[EMBED] Generating {len(texts)} embeddings...")
452
  embeddings = []
453
 
 
469
  logger.info(f"[EMBED] βœ… Stored {len(embeddings)} vectors")
470
 
471
  except Exception as e:
472
+ logger.error(f"[EMBED] ❌ Failed: {e}", exc_info=True)
473
  # Non-critical - don't raise
474
 
475
+ # ==================== PUBLISHING & CACHING ====================
476
+
477
+ async def _publish(self, results: Dict[str, Any]):
478
+ """πŸ“€ Publish results to Redis (atomic pipeline)"""
479
  try:
480
+ ts = self.computed_at.isoformat() if self.computed_at else datetime.now().isoformat()
481
+
482
+ # Use atomic pipeline for minimal Redis calls
483
+ pipe = event_hub.redis.pipeline()
484
+
485
+ # Publish KPI update
486
+ kpi_data = {
487
+ "data": results,
488
+ "rows": results.get("metadata", {}).get("rows_analyzed", 0),
489
+ "timestamp": ts
490
+ }
491
+ pipe.setex(
492
+ f"kpi_cache:{self.org_id}:{self.source_id}",
493
+ 300, # 5 min TTL
494
+ json.dumps(kpi_data)
495
+ )
496
+
497
+ # Publish insights
498
+ for alert in results.get("predictive", {}).get("alerts", []):
499
+ pipe.lpush(
500
+ f"insights:{self.org_id}:{self.source_id}",
501
+ json.dumps(alert)
502
+ )
503
+ pipe.expire(f"insights:{self.org_id}:{self.source_id}", 300)
504
+
505
+ pipe.execute()
506
+ logger.info(f"[PUBLISH] πŸ“€ Published KPIs for {self.org_id}/{self.source_id}")
507
+
508
+ except Exception as e:
509
+ logger.error(f"[PUBLISH] ❌ Error: {e}", exc_info=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
510
 
511
+ async def _cache_results(self, results: Dict[str, Any]):
512
+ """πŸ’Ύ Cache results for 5 minutes"""
513
  try:
514
+ cache_key = f"kpi_cache:{self.org_id}:{self.source_id}"
515
+ event_hub.setex(cache_key, 300, json.dumps(results))
516
+ logger.debug("[CACHE] βœ… Results cached")
517
  except Exception as e:
518
+ logger.warning(f"[CACHE] ⚠️ Failed: {e}")
519
 
520
+ async def _publish_status(self, status: str, message: str = ""):
521
+ """πŸ“’ Publish worker status"""
522
  try:
523
+ status_data = {
524
+ "status": status,
525
+ "message": message,
526
+ "timestamp": datetime.now().isoformat(),
527
+ "worker_id": f"{self.org_id}:{self.source_id}"
528
+ }
529
+ event_hub.redis.setex(
530
+ f"worker:status:{self.org_id}:{self.source_id}",
531
+ 60,
532
+ json.dumps(status_data)
533
+ )
534
  except Exception as e:
535
+ logger.error(f"[STATUS] ❌ Failed: {e}")
536
 
537
 
538
+ # ==================== WORKER MANAGER & LISTENER ====================
539
+
540
+ class WorkerManager:
541
  """
542
+ πŸŽ›οΈ Manages worker lifecycle and prevents Redis hammering
 
543
  """
 
 
544
 
545
+ def __init__(self):
546
+ self.active_workers: Dict[str, asyncio.Task] = {}
547
+ self._shutdown = False
548
 
549
+ async def start_listener(self):
550
+ """🎧 Listen to Redis pubsub for triggers"""
551
+ pubsub = event_hub.redis.pubsub()
552
+ pubsub.psubscribe("analytics_trigger:*")
553
+
554
+ logger.info("🎧 Worker Manager: Einstein+Elon mode ENGAGED")
555
+
556
+ try:
557
+ while not self._shutdown:
558
+ # Wait for message with timeout (prevents blocking forever)
559
+ message = pubsub.get_message(timeout=5.0)
560
+
561
+ if message and message["type"] == "pmessage":
562
+ await self._handle_trigger(message)
563
+
564
+ # Cleanup completed tasks
565
+ self._cleanup_completed_workers()
566
+
567
+ except asyncio.CancelledError:
568
+ logger.info("[MANAGER] πŸ›‘ Listener cancelled")
569
+ finally:
570
+ pubsub.close()
571
+
572
+ async def _handle_trigger(self, message: Dict[str, Any]):
573
+ """Process a single trigger message"""
574
+ try:
575
+ trigger = json.loads(message["data"])
576
+ org_id = trigger["org_id"]
577
+ source_id = trigger["source_id"]
578
+
579
+ worker_id = f"{org_id}:{source_id}"
580
+
581
+ # Skip if worker already running
582
+ if worker_id in self.active_workers:
583
+ logger.debug(f"[MANAGER] Worker {worker_id} already running")
584
+ return
585
+
586
+ # Spawn worker (non-blocking)
587
+ worker = AnalyticsWorker(org_id, source_id)
588
+ task = asyncio.create_task(
589
+ worker.run(),
590
+ name=f"worker-{org_id}-{source_id}"
591
+ )
592
+
593
+ self.active_workers[worker_id] = task
594
+ logger.info(f"[MANAGER] πŸš€ Spawned worker for {worker_id}")
595
+
596
+ except Exception as e:
597
+ logger.error(f"[MANAGER] Trigger handling failed: {e}", exc_info=True)
598
+
599
+ def _cleanup_completed_workers(self):
600
+ """Remove completed/cancelled workers from registry"""
601
+ completed = []
602
+ for worker_id, task in self.active_workers.items():
603
+ if task.done():
604
+ completed.append(worker_id)
605
+
606
+ # Log completion status
607
+ if task.exception():
608
+ logger.error(f"[MANAGER] Worker {worker_id} failed: {task.exception()}")
609
+ else:
610
+ logger.debug(f"[MANAGER] Worker {worker_id} completed")
611
+
612
+ for worker_id in completed:
613
+ self.active_workers.pop(worker_id, None)
614
+
615
+ async def shutdown(self):
616
+ """Graceful shutdown"""
617
+ self._shutdown = True
618
+ logger.info("[MANAGER] πŸ›‘ Shutting down workers...")
619
+
620
+ # Cancel all active tasks
621
+ for task in self.active_workers.values():
622
+ task.cancel()
623
+
624
+ # Wait for cancellation
625
+ if self.active_workers:
626
+ await asyncio.gather(*self.active_workers.values(), return_exceptions=True)
627
+
628
+ logger.info("[MANAGER] βœ… All workers terminated")
629
 
 
 
 
 
 
 
630
 
631
+ # ==================== FASTAPI INTEGRATION ====================
632
+
633
+ # Global manager instance
634
+ _worker_manager: Optional[WorkerManager] = None
635
+
636
+
637
+ async def get_worker_manager() -> WorkerManager:
638
+ """Get or create worker manager singleton"""
639
+ global _worker_manager
640
+ if _worker_manager is None:
641
+ _worker_manager = WorkerManager()
642
+ return _worker_manager
643
 
644
 
 
645
  async def trigger_kpi_computation(org_id: str, source_id: str):
646
+ """
647
+ 🎯 FastAPI endpoint handler - triggers worker via Redis pubsub
648
+ Idempotent: multiple calls won't spawn duplicate workers
649
+ """
650
  try:
651
+ # Use event_hub which writes to both pubsub and stream for reliability
652
+ event_hub.emit_analytics_trigger(
653
+ org_id,
654
+ source_id,
655
+ {
656
+ "type": "kpi_compute",
657
+ "timestamp": datetime.now().isoformat()
658
+ }
659
+ )
660
  logger.info(f"🎯 Triggered KPI computation: {org_id}/{source_id}")
661
+ return {"status": "triggered", "org_id": org_id, "source_id": source_id}
662
+
663
  except Exception as e:
664
+ logger.error(f"Trigger failed: {e}", exc_info=True)
665
+ return {"status": "error", "message": str(e)}
666
+
667
+
668
+ # ==================== BACKGROUND TASK (Optional) ====================
669
+
670
+ async def continuous_kpi_refresh(manager: WorkerManager):
671
+ """
672
+ πŸŽ›οΈ Gentle background refresh - runs every 5 minutes
673
+ Only triggers for stale data (no active worker, no fresh cache)
674
+ """
675
+ await asyncio.sleep(10) # Let app startup complete
676
+
677
+ while True:
678
+ try:
679
+ # Get all entity keys
680
+ entity_keys = event_hub.redis.keys("entity:*:*")
681
+
682
+ for key in entity_keys[:10]: # Max 10 per cycle
683
+ key_str = key.decode()
684
+ _, org_id, source_id = key_str.split(":")
685
+
686
+ worker_id = f"{org_id}:{source_id}"
687
+
688
+ # Skip if worker already running
689
+ if worker_id in manager.active_workers:
690
+ continue
691
+
692
+ # Skip if KPIs are fresh (< 5 min old)
693
+ cache_key = f"kpi_cache:{org_id}:{source_id}"
694
+ if event_hub.redis.exists(cache_key):
695
+ continue
696
+
697
+ # Trigger refresh
698
+ await trigger_kpi_computation(org_id, source_id)
699
+ await asyncio.sleep(1) # 1s gap
700
+
701
+ except Exception as e:
702
+ logger.error(f"[AUTO] Error: {e}", exc_info=True)
703
+
704
+ await asyncio.sleep(300) # ⭐ Sleep 5 minutes
705
+
706
+
707
+ # ==================== MAIN.PY INTEGRATION EXAMPLE ====================
708
+
709
+ # In your main.py:
710
+ #
711
+ # from app.tasks.analytics_worker import get_worker_manager, continuous_kpi_refresh
712
+ #
713
+ # @app.on_event("startup")
714
+ # async def start_workers():
715
+ # manager = await get_worker_manager()
716
+ # asyncio.create_task(manager.start_listener())
717
+ #
718
+ # # Optional: Start background refresh
719
+ # if os.getenv("ENABLE_AUTO_REFRESH", "0") == "1":
720
+ # asyncio.create_task(continuous_kpi_refresh(manager))
721
+ #
722
+ # @app.on_event("shutdown")
723
+ # async def stop_workers():
724
+ # manager = await get_worker_manager()
725
+ # await manager.shutdown()