analytics-engine / app /mapper.py
Peter Mutwiri
fixed indentation on mapper
c528e19
raw
history blame
21.1 kB
# app/mapper.py – BULLETPROOF VERSION
import os
import json
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from app.db import get_conn, ensure_raw_table, enforce_schema_contract, transactional_conn,ensure_schema_versions_table
from app.utils.detect_industry import _ALIAS, detect_industry
# app/mapper.py (add line 1)
from app.hybrid_entity_detector import hybrid_detect_entity_type
import time
from app.redis_client import redis
# ---------------------- Canonical schema base ---------------------- #
CANONICAL = {
"timestamp": ["timestamp", "date", "sale_date", "created_at"],
"product_id": ["sku", "barcode", "plu", "product_id", "item_code"],
"qty": ["qty", "quantity", "units", "pieces"],
"total": ["total", "amount", "line_total", "sales_amount"],
"store_id": ["store_id", "branch", "location", "outlet_id"],
"category": ["category", "department", "cat", "family"],
"promo_flag": ["promo", "promotion", "is_promo", "discount_code"],
"expiry_date":["expiry_date", "best_before", "use_by", "expiration"],
}
ALIAS_FILE = "./db/alias_memory.json"
def map_pandas_to_duck(col: str, series: pd.Series) -> str:
if pd.api.types.is_bool_dtype(series): return "BOOLEAN"
if pd.api.types.is_integer_dtype(series): return "BIGINT"
if pd.api.types.is_float_dtype(series): return "DOUBLE"
if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
return "VARCHAR"
# ---------- entity detection(uses ai to detect entity from the data) ---------- #
def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame, entity_type: str) -> str:
"""Creates entity-specific table: main.sales_canonical, main.inventory_canonical, etc."""
table_name = f"main.{entity_type}_canonical"
# Create base table if doesn't exist
duck.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id UUID DEFAULT uuid(),
_ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Get existing columns (lowercase for comparison)
existing_cols_raw = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
existing_cols = {str(r[0]).lower() for r in existing_cols_raw}
# βœ… BULLETPROOF: Add missing columns with safe name handling
for col in df.columns:
col_name = str(col).lower().strip() # βœ… FORCE STRING
if col_name not in existing_cols:
try:
dtype = map_pandas_to_duck(col_name, df[col])
print(f"[mapper] βž• Adding column '{col_name}:{dtype}'")
duck.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {dtype}")
except Exception as e:
print(f"[mapper] ⚠️ Skipping column {col_name}: {e}")
return table_name
# ---------- Alias Memory ---------- #
def load_dynamic_aliases() -> None:
if os.path.exists(ALIAS_FILE):
try:
with open(ALIAS_FILE) as f:
dynamic_aliases = json.load(f)
for k, v in dynamic_aliases.items():
if k in CANONICAL:
CANONICAL[k].extend([a for a in v if a not in CANONICAL[k]])
else:
CANONICAL[k] = v
except Exception as e:
print(f"[mapper] ⚠️ failed to load alias memory: {e}")
def save_dynamic_aliases() -> None:
os.makedirs(os.path.dirname(ALIAS_FILE), exist_ok=True)
with open(ALIAS_FILE, "w") as f:
json.dump(CANONICAL, f, indent=2)
# βœ… Module-level cache: (org_id, source_id) -> entity_info
_ENTITY_CACHE = {}
_INDUSTRY_CACHE = {} # NEW
def poll_for_entity(org_id: str, source_id: str, timeout: int = 30) -> dict:
"""
🎯 Capped at 2 Redis calls (immediate + after 5s sleep).
In-memory cache prevents re-polling the same source.
"""
# 1. Check cache (zero Redis calls)
cache_key = (org_id, source_id)
if cache_key in _ENTITY_CACHE:
print(f"[poll] πŸ’Ύ CACHE HIT: {cache_key}")
return _ENTITY_CACHE[cache_key]
entity_key = f"entity:{org_id}:{source_id}"
print(f"[poll] ⏳ Polling for key: {entity_key}")
# 2. First attempt (immediate)
data = redis.get(entity_key)
if data:
entity_info = json.loads(data)
print(f"[poll] βœ… SUCCESS on first attempt: {entity_info['entity_type']}")
_ENTITY_CACHE[cache_key] = entity_info
return entity_info
# 3. Sleep 5 seconds (gives worker time)
print(f"[poll] πŸ”„ First check failed, sleeping 5s...")
time.sleep(5.0)
# 4. Second attempt (final)
data = redis.get(entity_key)
if data:
entity_info = json.loads(data)
print(f"[poll] βœ… SUCCESS on second attempt: {entity_info['entity_type']}")
_ENTITY_CACHE[cache_key] = entity_info
return entity_info
# 5. Emergency fallback (worker is dead)
print(f"[poll] ⚠️ Both attempts failed - using direct detection")
return _fallback_detection(org_id, source_id)
def _fallback_detection(org_id: str, source_id: str) -> dict:
"""
Emergency: Detect entity directly from DuckDB.
Writes result to Redis and cache for recovery.
"""
print(f"[fallback] 🚨 Running fallback for {org_id}/{source_id}")
conn = get_conn(org_id)
rows = conn.execute("""
SELECT row_data
FROM main.raw_rows
WHERE row_data IS NOT NULL
USING SAMPLE 100
""").fetchall()
if not rows:
print(f"[fallback] ❌ No data found, returning UNKNOWN")
entity_info = {"entity_type": "UNKNOWN", "confidence": 0.0}
else:
parsed = [json.loads(r[0]) for r in rows if r[0]]
df = pd.DataFrame(parsed)
# This is fast (<50ms) with rule-based detection
entity_type, confidence, _ = hybrid_detect_entity_type(org_id, df, f"{source_id}.json")
entity_info = {"entity_type": entity_type, "confidence": confidence}
print(f"[fallback] βœ… Direct detection: {entity_type} ({confidence:.2%})")
# Cache and write to Redis so next time it works
cache_key = (org_id, source_id)
_ENTITY_CACHE[cache_key] = entity_info
redis.setex(f"entity:{org_id}:{source_id}", 3600, json.dumps(entity_info))
return entity_info
#poll for industry from redis
def poll_for_industry(org_id: str, source_id: str, timeout: int = 10) -> dict:
"""
🎯 Polls Redis for industry detection result (user-facing dashboard label).
Capped at 2 Redis calls (immediate + after 5s sleep).
In-memory cache prevents re-polling the same source.
Returns:
dict: {"industry": str, "confidence": float}
"""
cache_key = (org_id, source_id)
# 1. Check cache FIRST
if cache_key in _INDUSTRY_CACHE:
print(f"[poll_industry] πŸ’Ύ CACHE HIT: {cache_key}")
return _INDUSTRY_CACHE[cache_key]
industry_key = f"industry:{org_id}:{source_id}"
print(f"[poll_industry] ⏳ Polling for key: {industry_key}")
# 2. First attempt (immediate)
data = redis.get(industry_key)
if data:
industry_info = json.loads(data)
_INDUSTRY_CACHE[cache_key] = industry_info
print(f"[poll_industry] βœ… SUCCESS on first attempt: {industry_info['industry']}")
return industry_info
# 3. Sleep 5 seconds (gives worker time)
print(f"[poll_industry] πŸ”„ First check failed, sleeping 5s...")
time.sleep(5.0)
# 4. Second attempt (final)
data = redis.get(industry_key)
if data:
industry_info = json.loads(data)
_INDUSTRY_CACHE[cache_key] = industry_info
print(f"[poll_industry] βœ… SUCCESS on second attempt: {industry_info['industry']}")
return industry_info
# 5. Emergency fallback (worker is dead)
print(f"[poll_industry] ⚠️ Both attempts failed - using direct detection")
return _fallback_industry_detection(org_id, source_id)
#fallback industry detection
def _fallback_industry_detection(org_id: str, source_id: str) -> dict:
"""
Emergency: Run industry detection directly from DuckDB data.
Uses the actual hybrid detector module you have.
Writes result to Redis and cache for recovery.
"""
print(f"[fallback_industry] 🚨 Running fallback for {org_id}/{source_id}")
try:
conn = get_conn(org_id)
rows = conn.execute("""
SELECT row_data
FROM main.raw_rows
WHERE row_data IS NOT NULL
USING SAMPLE 100
""").fetchall()
if not rows:
print(f"[fallback_industry] ❌ No data found, returning UNKNOWN")
industry_info = {"industry": "UNKNOWN", "confidence": 0.0}
else:
parsed = [json.loads(r[0]) for r in rows if r[0]]
df = pd.DataFrame(parsed)
# βœ… ADD THIS LINE - Normalize column names before detection
df.columns = [str(col).lower().strip() for col in df.columns]
# βœ… CORRECT: Import from your actual module
from app.hybrid_industry_detector import hybrid_detect_industry_type
# Call it (note: it returns 3 values: industry, confidence, is_confident)
industry, confidence, _ = hybrid_detect_industry_type(org_id, df, source_id)
industry_info = {"industry": industry, "confidence": confidence}
print(f"[fallback_industry] βœ… Direct detection: {industry} ({confidence:.2%})")
# Cache and write to Redis
cache_key = (org_id, source_id)
_INDUSTRY_CACHE[cache_key] = industry_info
redis.setex(f"industry:{org_id}:{source_id}", 3600, json.dumps(industry_info))
return industry_info
except Exception as e:
print(f"[fallback_industry] ❌ Failed: {e}")
return {"industry": "UNKNOWN", "confidence": 0.0}
def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
"""
ENTERPRISE DATA INGESTION PIPELINE (v2.0)
==========================================
Transforms raw audit data into queryable canonical format while:
βœ… Preserving ALL original columns (no data loss)
βœ… Mapping recognized fields to canonical schema
βœ… Versioning schema changes for audit & rollback
βœ… Enforcing minimum schema contracts
βœ… Operating transactionally for data integrity
βœ… Handling background worker failures gracefully
Flow:
1. Fetch raw audit trail from main.raw_rows
2. Parse nested JSON (handles {tables: {...}}, {data: [...]}, etc.)
3. Normalize column names (force string, lowercase, dedupe)
4. Map to canonical schema BUT keep unmapped columns intact
5. Learn new column aliases for future mapping improvements
6. Type-cast canonical fields (timestamp, qty, total, etc.)
7. Poll Redis for entity type & industry (with fallback)
8. Version the schema if structure changed
9. Enforce schema contract (ensure required canonical columns exist)
10. Transactionally insert into entity-specific table
11. Return full DataFrame + industry metadata for frontend
Args:
org_id: Tenant identifier (e.g., "org_synth_123")
source_id: Data source UUID for entity/industry detection
hours_window: Hours of raw data to consider (default: 24h)
Returns:
tuple: (DataFrame with ALL columns, industry: str, confidence: float)
Raises:
ValueError: If schema contract is violated (missing required columns)
HTTPException: On critical failures (quota, insertion errors)
"""
start_time = datetime.now()
print(f"\n[canonify] πŸš€ Starting pipeline for {org_id}/{source_id}")
# Load dynamic aliases before mapping
load_dynamic_aliases()
# 1️⃣ FETCH RAW AUDIT DATA
with get_conn(org_id) as conn:
ensure_raw_table(conn)
# βœ… FIXED: Calculate cutoff in Python, bind properly
cutoff_time = datetime.now() - timedelta(hours=hours_window)
try:
rows = conn.execute("""
SELECT row_data FROM main.raw_rows
WHERE row_data IS NOT NULL
AND LENGTH(CAST(row_data AS TEXT)) > 0
AND ingested_at >= ?
ORDER BY ingested_at DESC
""", (cutoff_time,)).fetchall()
except Exception as e:
print(f"[canonify] ❌ SQL read error: {e}")
return pd.DataFrame(), "unknown", 0.0
if not rows:
print("[canonify] ⚠️ No audit rows found in window")
return pd.DataFrame(), "unknown", 0.0
# 2️⃣ PARSE NESTED JSON PAYLOADS
parsed, malformed_count = [], 0
for r in rows:
raw = r[0]
if not raw:
malformed_count += 1
continue
try:
obj = raw if isinstance(raw, (dict, list)) else json.loads(str(raw))
except Exception:
malformed_count += 1
continue
# Extract rows from various payload structures
if isinstance(obj, dict):
if "rows" in obj and isinstance(obj["rows"], list):
parsed.extend(obj["rows"])
elif "data" in obj and isinstance(obj["data"], list):
parsed.extend(obj["data"])
elif "tables" in obj and isinstance(obj["tables"], dict):
for table_rows in obj["tables"].values():
if isinstance(table_rows, list):
parsed.extend(table_rows)
else:
parsed.append(obj)
elif isinstance(obj, list):
parsed.extend(obj)
else:
malformed_count += 1
if malformed_count:
print(f"[canonify] ⚠️ Skipped {malformed_count} malformed rows")
if not parsed:
print("[canonify] ❌ No valid data after parsing")
return pd.DataFrame(), "unknown", 0.0
# 3️⃣ NORMALIZE COLUMN NAMES (Bulletproof)
df = pd.DataFrame(parsed)
df.columns = [str(col).lower().strip() for col in df.columns]
df = df.loc[:, ~df.columns.duplicated()]
print(f"[canonify] πŸ“Š Parsed DataFrame: {len(df)} rows Γ— {len(df.columns)} cols")
# 4️⃣ MAP TO CANONICAL SCHEMA (Preserve All Columns)
# Build mapping: original_col β†’ canonical_col
mapping, canonical_used = {}, set()
for canon, aliases in CANONICAL.items():
for col in df.columns:
if any(str(alias).lower() in col for alias in aliases):
# If multiple cols map to same canonical (e.g., begin/end datetime),
# keep first as canonical, others stay original
if canon not in canonical_used:
mapping[col] = canon
canonical_used.add(canon)
print(f"[canonify] πŸ”€ Mapped '{col}' β†’ canonical '{canon}'")
break
# Learn new aliases for future improvements
for col in df.columns:
for canon in CANONICAL.keys():
if str(canon).lower() in col and col not in CANONICAL[canon]:
CANONICAL[canon].append(col)
print(f"[canonify] 🧠 Learned new alias: {canon} ← {col}")
save_dynamic_aliases()
# Apply mapping but keep ALL columns
renamed = df.rename(columns=mapping)
# Build final column list: canonicals first (deduped), then originals
final_columns, seen = [], set()
for col in renamed.columns:
if col in CANONICAL.keys():
if col not in seen:
final_columns.append(col)
seen.add(col)
else:
final_columns.append(col)
df = renamed[final_columns].copy()
print(f"[canonify] βœ… Kept columns: {list(df.columns)}")
# 5️⃣ TYPE CONVERSIONS (Best Effort)
try:
if "timestamp" in df:
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
if "expiry_date" in df:
df["expiry_date"] = pd.to_datetime(df["expiry_date"], errors="coerce").dt.date
if "promo_flag" in df:
df["promo_flag"] = df["promo_flag"].astype(str).isin({"1", "true", "t", "yes"})
for col in ("qty", "total"):
if col in df:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
except Exception as e:
print(f"[canonify] ⚠️ Type conversion warning (non-critical): {e}")
# 6️⃣ DETECT ENTITY & INDUSTRY (with worker fallback)
entity_info = poll_for_entity(org_id, source_id)
entity_type = entity_info["entity_type"]
industry_info = poll_for_industry(org_id, source_id)
industry = industry_info["industry"]
industry_confidence = industry_info["confidence"]
print(f"[canonify] 🎯 Entity: {entity_type}, Industry: {industry} ({industry_confidence:.2%})")
# 8️⃣ SCHEMA VERSIONING & TRANSACTIONAL INSERT
os.makedirs("./db", exist_ok=True)
with transactional_conn(org_id) as duck:
ensure_schema_versions_table(duck)
# 8a) Detect schema changes
current_schema = {col: map_pandas_to_duck(col, df[col]) for col in df.columns}
existing_schema_row = duck.execute("""
SELECT schema_json, version_id FROM main.schema_versions
WHERE table_name = ? AND status = 'applied'
ORDER BY version_id DESC LIMIT 1
""", (f"{entity_type}_canonical",)).fetchone()
is_new_schema = (
not existing_schema_row or
json.loads(existing_schema_row[0]) != current_schema
)
version_id = None
if is_new_schema:
# Manual auto-increment for DuckDB 0.10.3 compatibility
version_id = duck.execute("""
INSERT INTO main.schema_versions
(version_id, table_name, schema_json, status)
VALUES (nextval('schema_version_seq'), ?, ?, 'pending')
RETURNING version_id
""", (f"{entity_type}_canonical", json.dumps(current_schema))).fetchone()[0]
print(f"[canonify] πŸ“ Created schema v{version_id} for {entity_type}_canonical")
# 8b) Ensure table exists with current schema
table_name = ensure_canonical_table(duck, df, entity_type)
# 8c) Transactional insert
# 8d) Clean and insert data
if not df.empty:
table_info = duck.execute(f"PRAGMA table_info('{table_name}')").fetchall()
table_cols = [str(r[0]) for r in table_info]
df_to_insert = df[[col for col in df.columns if col in table_cols]]
if not df_to_insert.empty:
# πŸ”§ CRITICAL: Replace NaN/Infinity with None for JSON compliance
df_to_insert = df_to_insert.replace([np.inf, -np.inf, np.nan], None)
cols_str = ", ".join(df_to_insert.columns)
placeholders = ", ".join(["?"] * len(df_to_insert.columns))
duck.executemany(
f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})",
df_to_insert.values.tolist()
)
print(f"[canonify] πŸ’Ύ Inserted {len(df_to_insert)} rows into {table_name}")
# 8d) Mark schema as applied post-insert
if is_new_schema and version_id:
try:
duck.execute("""
UPDATE main.schema_versions
SET applied_at = CURRENT_TIMESTAMP, status = 'applied'
WHERE version_id = ?
""", (version_id,))
print(f"[canonify] βœ… Schema v{version_id} marked as applied")
except Exception as e:
print(f"[canonify] ⚠️ Schema update warning (non-critical): {e}")
# At the very end of canonify_df function, line ~470
df = df.replace([np.inf, -np.inf, np.nan], None) # Clean for JSON response
duration_ms = (datetime.now() - start_time).total_seconds() * 1000
print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms for {org_id}")
# After line: print(f"[canonify] βœ… Pipeline complete in {duration_ms:.2f}ms")
if not df.empty:
# At the end of the canonify pipeline
try:
redis.publish(
f"analytics_trigger:{org_id}:{source_id}",
json.dumps({
"type": "kpi_compute",
"entity_type": entity_type,
"industry": industry
})
)
print(f"[canonify] πŸš€ Triggered analytics for {source_id}")
except Exception as e:
print(f"[canonify] ⚠️ Analytics trigger failed (non-critical): {e}")
return df, industry, industry_confidence