| |
| import os |
| import json |
| import duckdb |
| import pandas as pd |
| import numpy as np |
| from datetime import datetime, timedelta |
| from app.db import get_conn, ensure_raw_table, enforce_schema_contract, transactional_conn,ensure_schema_versions_table |
| from app.utils.detect_industry import _ALIAS, detect_industry |
| |
| from app.hybrid_entity_detector import hybrid_detect_entity_type |
| import time |
| from app.redis_client import redis |
|
|
| |
| CANONICAL = { |
| "timestamp": ["timestamp", "date", "sale_date", "created_at"], |
| "product_id": ["sku", "barcode", "plu", "product_id", "item_code"], |
| "qty": ["qty", "quantity", "units", "pieces"], |
| "total": ["total", "amount", "line_total", "sales_amount"], |
| "store_id": ["store_id", "branch", "location", "outlet_id"], |
| "category": ["category", "department", "cat", "family"], |
| "promo_flag": ["promo", "promotion", "is_promo", "discount_code"], |
| "expiry_date":["expiry_date", "best_before", "use_by", "expiration"], |
| } |
|
|
| ALIAS_FILE = "./db/alias_memory.json" |
|
|
| def map_pandas_to_duck(col: str, series: pd.Series) -> str: |
| if pd.api.types.is_bool_dtype(series): return "BOOLEAN" |
| if pd.api.types.is_integer_dtype(series): return "BIGINT" |
| if pd.api.types.is_float_dtype(series): return "DOUBLE" |
| if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP" |
| return "VARCHAR" |
|
|
| |
| def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame, entity_type: str) -> str: |
| """Creates entity-specific table: main.sales_canonical, main.inventory_canonical, etc.""" |
| table_name = f"main.{entity_type}_canonical" |
| |
| |
| duck.execute(f""" |
| CREATE TABLE IF NOT EXISTS {table_name} ( |
| id UUID DEFAULT uuid(), |
| _ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| |
| |
| existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall() |
| existing_cols = {str(r[0]).lower() for r in existing_cols_raw} |
| |
| |
| for col in df.columns: |
| col_name = str(col).lower().strip() |
| if col_name not in existing_cols: |
| try: |
| dtype = map_pandas_to_duck(col_name, df[col]) |
| print(f"[mapper] β Adding column '{col_name}:{dtype}'") |
| duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}") |
| except Exception as e: |
| print(f"[mapper] β οΈ Skipping column {col_name}: {e}") |
| |
| return table_name |
|
|
| |
| def load_dynamic_aliases() -> None: |
| if os.path.exists(ALIAS_FILE): |
| try: |
| with open(ALIAS_FILE) as f: |
| dynamic_aliases = json.load(f) |
| for k, v in dynamic_aliases.items(): |
| if k in CANONICAL: |
| CANONICAL[k].extend([a for a in v if a not in CANONICAL[k]]) |
| else: |
| CANONICAL[k] = v |
| except Exception as e: |
| print(f"[mapper] β οΈ failed to load alias memory: {e}") |
|
|
| def save_dynamic_aliases() -> None: |
| os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True) |
| with open(ALIAS_FILE, "w") as f: |
| json.dump(CANONICAL, f, indent=2) |
| |
| _ENTITY_CACHE = {} |
| _INDUSTRY_CACHE = {} |
| def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict: |
| """ |
| π― Capped at 2 Redis calls (immediate + after 5s sleep). |
| In-memory cache prevents re-polling the same source. |
| """ |
| |
| cache_key = (org_id, source_id) |
| if cache_key in _ENTITY_CACHE: |
| print(f"[poll] πΎ CACHE HIT: {cache_key}") |
| return _ENTITY_CACHE[cache_key] |
| |
| entity_key = f"entity:{org_id}:{source_id}" |
| print(f"[poll] β³ Polling for key: {entity_key}") |
| |
| |
| data = redis.get(entity_key) |
| if data: |
| entity_info = json.loads(data) |
| print(f"[poll] β
SUCCESS on first attempt: {entity_info['entity_type']}") |
| _ENTITY_CACHE[cache_key] = entity_info |
| return entity_info |
| |
| |
| print(f"[poll] π First check failed, sleeping 5s...") |
| time.sleep(5.0) |
| |
| |
| data = redis.get(entity_key) |
| if data: |
| entity_info = json.loads(data) |
| print(f"[poll] β
SUCCESS on second attempt: {entity_info['entity_type']}") |
| _ENTITY_CACHE[cache_key] = entity_info |
| return entity_info |
| |
| |
| print(f"[poll] β οΈ Both attempts failed - using direct detection") |
| return _fallback_detection(org_id, source_id) |
|
|
|
|
| def _fallback_detection(org_id: str, source_id: str) -> dict: |
| """ |
| Emergency: Detect entity directly from DuckDB. |
| Writes result to Redis and cache for recovery. |
| """ |
| print(f"[fallback] π¨ Running fallback for {org_id}/{source_id}") |
| |
| conn = get_conn(org_id) |
| rows = conn.execute(""" |
| SELECT row_data |
| FROM main.raw_rows |
| WHERE row_data IS NOT NULL |
| USING SAMPLE 100 |
| """).fetchall() |
| |
| if not rows: |
| print(f"[fallback] β No data found, returning UNKNOWN") |
| entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0} |
| else: |
| parsed = [json.loads(r[0]) for r in rows if r[0]] |
| df = pd.DataFrame(parsed) |
| |
| |
| entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json") |
| entity_info = {"entity_type": entity_type, "confidence": confidence} |
| print(f"[fallback] β
Direct detection: {entity_type} ({confidence:.2%})") |
| |
| |
| cache_key = (org_id, source_id) |
| _ENTITY_CACHE[cache_key] = entity_info |
| redis.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps(entity_info)) |
| |
| return entity_info |
| |
| def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict: |
| """ |
| π― Polls Redis for industry detection result (user-facing dashboard label). |
| Capped at 2 Redis calls (immediate + after 5s sleep). |
| In-memory cache prevents re-polling the same source. |
| |
| Returns: |
| dict: {"industry": str, "confidence": float} |
| """ |
| cache_key = (org_id, source_id) |
| |
| |
| if cache_key in _INDUSTRY_CACHE: |
| print(f"[poll_industry] πΎ CACHE HIT: {cache_key}") |
| return _INDUSTRY_CACHE[cache_key] |
| |
| industry_key = f"industry:{org_id}:{source_id}" |
| print(f"[poll_industry] β³ Polling for key: {industry_key}") |
| |
| |
| data = redis.get(industry_key) |
| if data: |
| industry_info = json.loads(data) |
| _INDUSTRY_CACHE[cache_key] = industry_info |
| print(f"[poll_industry] β
SUCCESS on first attempt: {industry_info['industry']}") |
| return industry_info |
| |
| |
| print(f"[poll_industry] π First check failed, sleeping 5s...") |
| time.sleep(5.0) |
| |
| |
| data = redis.get(industry_key) |
| if data: |
| industry_info = json.loads(data) |
| _INDUSTRY_CACHE[cache_key] = industry_info |
| print(f"[poll_industry] β
SUCCESS on second attempt: {industry_info['industry']}") |
| return industry_info |
| |
| |
| print(f"[poll_industry] β οΈ Both attempts failed - using direct detection") |
| return _fallback_industry_detection(org_id, source_id) |
| |
| def _fallback_industry_detection(org_id: str, source_id: str) -> dict: |
| """ |
| Emergency: Run industry detection directly from DuckDB data. |
| Uses the actual hybrid detector module you have. |
| Writes result to Redis and cache for recovery. |
| """ |
| print(f"[fallback_industry] π¨ Running fallback for {org_id}/{source_id}") |
| |
| try: |
| conn = get_conn(org_id) |
| rows = conn.execute(""" |
| SELECT row_data |
| FROM main.raw_rows |
| WHERE row_data IS NOT NULL |
| USING SAMPLE 100 |
| """).fetchall() |
| |
| if not rows: |
| print(f"[fallback_industry] β No data found, returning UNKNOWN") |
| industry_info = {"industry": "UNKNOWN", "confidence": 0.0} |
| else: |
| parsed = [json.loads(r[0]) for r in rows if r[0]] |
| df = pd.DataFrame(parsed) |
| |
| df.columns = [str(col).lower().strip() for col in df.columns] |
| |
| from app.hybrid_industry_detector import hybrid_detect_industry_type |
| |
| |
| industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id) |
| industry_info = {"industry": industry, "confidence": confidence} |
| print(f"[fallback_industry] β
Direct detection: {industry} ({confidence:.2%})") |
| |
| |
| cache_key = (org_id, source_id) |
| _INDUSTRY_CACHE[cache_key] = industry_info |
| redis.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps(industry_info)) |
| |
| return industry_info |
| |
| except Exception as e: |
| print(f"[fallback_industry] β Failed: {e}") |
| return {"industry": "UNKNOWN", "confidence": 0.0} |
|
|
|
|
|
|
| def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]: |
| """ |
| ENTERPRISE DATA INGESTION PIPELINE (v2.0) |
| ========================================== |
| |
| Transforms raw audit data into queryable canonical format while: |
| β
Preserving ALL original columns (no data loss) |
| β
Mapping recognized fields to canonical schema |
| β
Versioning schema changes for audit & rollback |
| β
Enforcing minimum schema contracts |
| β
Operating transactionally for data integrity |
| β
Handling background worker failures gracefully |
| |
| Flow: |
| 1. Fetch raw audit trail from main.raw_rows |
| 2. Parse nested JSON (handles {tables: {...}}, {data: [...]}, etc.) |
| 3. Normalize column names (force string, lowercase, dedupe) |
| 4. Map to canonical schema BUT keep unmapped columns intact |
| 5. Learn new column aliases for future mapping improvements |
| 6. Type-cast canonical fields (timestamp, qty, total, etc.) |
| 7. Poll Redis for entity type & industry (with fallback) |
| 8. Version the schema if structure changed |
| 9. Enforce schema contract (ensure required canonical columns exist) |
| 10. Transactionally insert into entity-specific table |
| 11. Return full DataFrame + industry metadata for frontend |
| |
| Args: |
| org_id: Tenant identifier (e.g., "org_synth_123") |
| source_id: Data source UUID for entity/industry detection |
| hours_window: Hours of raw data to consider (default: 24h) |
| |
| Returns: |
| tuple: (DataFrame with ALL columns, industry: str, confidence: float) |
| |
| Raises: |
| ValueError: If schema contract is violated (missing required columns) |
| HTTPException: On critical failures (quota, insertion errors) |
| """ |
| start_time = datetime.now() |
| print(f"\n[canonify] π Starting pipeline for {org_id}/{source_id}") |
| |
| |
| load_dynamic_aliases() |
| |
| |
| with get_conn(org_id) as conn: |
| ensure_raw_table(conn) |
| |
| |
| cutoff_time = datetime.now() - timedelta(hours=hours_window) |
| |
| try: |
| rows = conn.execute(""" |
| SELECT row_data FROM main.raw_rows |
| WHERE row_data IS NOT NULL |
| AND LENGTH(CAST(row_data AS TEXT)) > 0 |
| AND ingested_at >= ? |
| ORDER BY ingested_at DESC |
| """, (cutoff_time,)).fetchall() |
| except Exception as e: |
| print(f"[canonify] β SQL read error: {e}") |
| return pd.DataFrame(), "unknown", 0.0 |
|
|
| if not rows: |
| print("[canonify] β οΈ No audit rows found in window") |
| return pd.DataFrame(), "unknown", 0.0 |
|
|
| |
| parsed, malformed_count = [], 0 |
| for r in rows: |
| raw = r[0] |
| if not raw: |
| malformed_count += 1 |
| continue |
| |
| try: |
| obj = raw if isinstance(raw, (dict, list)) else json.loads(str(raw)) |
| except Exception: |
| malformed_count += 1 |
| continue |
|
|
| |
| if isinstance(obj, dict): |
| if "rows" in obj and isinstance(obj["rows"], list): |
| parsed.extend(obj["rows"]) |
| elif "data" in obj and isinstance(obj["data"], list): |
| parsed.extend(obj["data"]) |
| elif "tables" in obj and isinstance(obj["tables"], dict): |
| for table_rows in obj["tables"].values(): |
| if isinstance(table_rows, list): |
| parsed.extend(table_rows) |
| else: |
| parsed.append(obj) |
| elif isinstance(obj, list): |
| parsed.extend(obj) |
| else: |
| malformed_count += 1 |
|
|
| if malformed_count: |
| print(f"[canonify] β οΈ Skipped {malformed_count} malformed rows") |
| if not parsed: |
| print("[canonify] β No valid data after parsing") |
| return pd.DataFrame(), "unknown", 0.0 |
|
|
| |
| df = pd.DataFrame(parsed) |
| df.columns = [str(col).lower().strip() for col in df.columns] |
| df = df.loc[:, ~df.columns.duplicated()] |
| print(f"[canonify] π Parsed DataFrame: {len(df)} rows Γ {len(df.columns)} cols") |
|
|
| |
| |
| mapping, canonical_used = {}, set() |
| for canon, aliases in CANONICAL.items(): |
| for col in df.columns: |
| if any(str(alias).lower() in col for alias in aliases): |
| |
| |
| if canon not in canonical_used: |
| mapping[col] = canon |
| canonical_used.add(canon) |
| print(f"[canonify] π Mapped '{col}' β canonical '{canon}'") |
| break |
|
|
| |
| for col in df.columns: |
| for canon in CANONICAL.keys(): |
| if str(canon).lower() in col and col not in CANONICAL[canon]: |
| CANONICAL[canon].append(col) |
| print(f"[canonify] π§ Learned new alias: {canon} β {col}") |
| |
| save_dynamic_aliases() |
|
|
| |
| renamed = df.rename(columns=mapping) |
| |
| |
| final_columns, seen = [], set() |
| for col in renamed.columns: |
| if col in CANONICAL.keys(): |
| if col not in seen: |
| final_columns.append(col) |
| seen.add(col) |
| else: |
| final_columns.append(col) |
| |
| df = renamed[final_columns].copy() |
| print(f"[canonify] β
Kept columns: {list(df.columns)}") |
|
|
| |
| try: |
| if "timestamp" in df: |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| if "expiry_date" in df: |
| df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date |
| if "promo_flag" in df: |
| df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"}) |
| for col in ("qty", "total"): |
| if col in df: |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) |
| except Exception as e: |
| print(f"[canonify] β οΈ Type conversion warning (non-critical): {e}") |
|
|
| |
| entity_info = poll_for_entity(org_id, source_id) |
| entity_type = entity_info["entity_type"] |
| |
| industry_info = poll_for_industry(org_id, source_id) |
| industry = industry_info["industry"] |
| industry_confidence = industry_info["confidence"] |
| print(f"[canonify] π― Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})") |
|
|
| |
| os.makedirs("./db", exist_ok=True) |
| |
| with transactional_conn(org_id) as duck: |
| ensure_schema_versions_table(duck) |
| |
| |
| current_schema = {col: map_pandas_to_duck(col, df[col]) for col in df.columns} |
| existing_schema_row = duck.execute(""" |
| SELECT schema_json, version_id FROM main.schema_versions |
| WHERE table_name = ? AND status = 'applied' |
| ORDER BY version_id DESC LIMIT 1 |
| """, (f"{entity_type}_canonical",)).fetchone() |
| |
| is_new_schema = ( |
| not existing_schema_row or |
| json.loads(existing_schema_row[0]) != current_schema |
| ) |
| |
| version_id = None |
| if is_new_schema: |
| |
| version_id = duck.execute(""" |
| INSERT INTO main.schema_versions |
| (version_id, table_name, schema_json, status) |
| VALUES (nextval('schema_version_seq'), ?, ?, 'pending') |
| RETURNING version_id |
| """, (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0] |
| print(f"[canonify] π Created schema v{version_id} for {entity_type}_canonical") |
| |
| |
| table_name = ensure_canonical_table(duck, df, entity_type) |
| |
| |
| |
| if not df.empty: |
| table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall() |
| table_cols = [str(r[0]) for r in table_info] |
| |
| df_to_insert = df[[col for col in df.columns if col in table_cols]] |
| |
| if not df_to_insert.empty: |
| |
| df_to_insert = df_to_insert.replace([np.inf, -np.inf, np.nan], None) |
| |
| cols_str = ", ".join(df_to_insert.columns) |
| placeholders = ", ".join(["?"] * len(df_to_insert.columns)) |
| |
| duck.executemany( |
| f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})", |
| df_to_insert.values.tolist() |
| ) |
| print(f"[canonify] πΎ Inserted {len(df_to_insert)} rows into {table_name}") |
| |
| |
| if is_new_schema and version_id: |
| try: |
| duck.execute(""" |
| UPDATE main.schema_versions |
| SET applied_at = CURRENT_TIMESTAMP, status = 'applied' |
| WHERE version_id = ? |
| """, (version_id,)) |
| print(f"[canonify] β
Schema v{version_id} marked as applied") |
| except Exception as e: |
| print(f"[canonify] β οΈ Schema update warning (non-critical): {e}") |
| |
| df = df.replace([np.inf, -np.inf, np.nan], None) |
| duration_ms = (datetime.now() - start_time).total_seconds() * 1000 |
| print(f"[canonify] β
Pipeline complete in {duration_ms:.2f}ms for {org_id}") |
|
|
| |
| if not df.empty: |
| |
| try: |
| redis.publish( |
| f"analytics_trigger:{org_id}:{source_id}", |
| json.dumps({ |
| "type": "kpi_compute", |
| "entity_type": entity_type, |
| "industry": industry |
| }) |
| ) |
| print(f"[canonify] π Triggered analytics for {source_id}") |
| except Exception as e: |
| print(f"[canonify] β οΈ Analytics trigger failed (non-critical): {e}") |
| |
| return df, industry, industry_confidence |