Mutsynchub / app /core /detection_engine.py
shaliz-kong
Initial commit: self-hosted Redis, DuckDB, Analytics Engine
98a466d
"""
app/core/detection_engine.py – UNIVERSAL DETECTION ENGINE
=======================================================
Consolidated entity and industry detection with dual-mode (LLM + rule-based).
Functions:
- hybrid_detect_entity_type()
- hybrid_detect_industry_type()
- Redis caching helpers
- Prometheus metrics
- Zero circular dependencies
"""
import json
import logging
import pandas as pd
from typing import Tuple, Optional, Dict, Any
from datetime import datetime
import time
from app.core.event_hub import event_hub
from app.service.llm_service import get_llm_service
# βœ… RULE-BASED IMPORTS (both in one place)
from app.entity_detector import detect_entity_type as rule_based_entity
from app.utils.detect_industry import detect_industry as rule_based_industry
from app.core.sre_logging import emit_mapper_log
# SRE: Prometheus metrics
try:
from prometheus_client import Counter, Histogram
detection_latency = Histogram(
'detection_duration_seconds',
'Time to detect entity/industry',
['detection_type', 'org_id']
)
detection_errors = Counter(
'detection_errors_total',
'Total detection failures',
['detection_type', 'org_id', 'error_type']
)
except ImportError:
detection_latency = None
detection_errors = None
logger = logging.getLogger(__name__)
# ====================================================================
# 🎯 ENTITY TYPE DETECTION
# ====================================================================
def hybrid_detect_entity_type(org_id: str, df: pd.DataFrame, source_id: str,
use_llm: bool = False) -> Tuple[str, float, bool]:
"""
Detect entity_type (SALES, INVENTORY, CUSTOMER, PRODUCT, etc.)
Args:
org_id: Organization ID
df: DataFrame to analyze
source_id: Source identifier
use_llm: If True, use LLM fallback when confidence < 0.75
Returns:
(entity_type: str, confidence: float, is_confident: bool)
"""
start_time = time.time()
emit_mapper_log("info", "Entity detection started",
org_id=org_id, source_id=source_id, use_llm=use_llm)
# 1. Rule-based detection (ALWAYS runs first – <10ms)
entity_type, confidence = rule_based_entity(df)
entity_type = entity_type.upper()
emit_mapper_log("info", "Rule-based entity completed",
org_id=org_id, source_id=source_id,
entity_type=entity_type, confidence=confidence)
# 2. If confident OR LLM disabled, return immediately
if confidence > 0.75 or not use_llm:
return entity_type, confidence, True
# 3. LLM fallback (only when use_llm=True and confidence < 0.75)
try:
emit_mapper_log("info", "Entity LLM fallback required",
org_id=org_id, source_id=source_id, rule_confidence=confidence)
llm = get_llm_service()
if not llm.is_ready():
emit_mapper_log("warning", "LLM not ready, using rule-based entity",
org_id=org_id, source_id=source_id)
return entity_type, confidence, False
# Build prompt
columns_str = ",".join(df.columns)
prompt = f"""Analyze these column names and determine the business entity type:
Columns: {columns_str}
Return ONLY JSON:
{{"entity_type":"SALES|INVENTORY|CUSTOMER|PRODUCT","confidence":0.95}}"""
# Generate with LLM
response = llm.generate(prompt, max_tokens=50, temperature=0.1)
result = json.loads(response)
llm_entity = result["entity_type"].upper()
llm_confidence = float(result["confidence"])
emit_mapper_log("info", "Entity LLM completed",
org_id=org_id, source_id=source_id,
llm_entity=llm_entity, llm_confidence=llm_confidence)
# Use LLM result if more confident
if llm_confidence > confidence:
return llm_entity, llm_confidence, True
return entity_type, confidence, False
except Exception as e:
emit_mapper_log("error", "Entity LLM fallback failed",
org_id=org_id, source_id=source_id, error=str(e))
if detection_errors:
detection_errors.labels(detection_type="entity", org_id=org_id, error_type=type(e).__name__).inc()
return entity_type, confidence, False
# ====================================================================
# 🎯 INDUSTRY TYPE DETECTION
# ====================================================================
def hybrid_detect_industry_type(org_id: str, df: pd.DataFrame, source_id: str,
use_llm: bool = False) -> Tuple[str, float, bool]:
"""
Detect industry vertical (SUPERMARKET, MANUFACTURING, PHARMA, RETAIL, WHOLESALE, HEALTHCARE)
Args:
org_id: Organization ID
df: DataFrame to analyze
source_id: Source identifier
use_llm: If True, enhance with LLM when confidence < 0.75
Returns:
(industry: str, confidence: float, is_confident: bool)
"""
start_time = time.time()
emit_mapper_log("info", "Industry detection started",
org_id=org_id, source_id=source_id, use_llm=use_llm)
# βœ… RULE-BASED DETECTION (always runs first – <10ms)
industry, confidence = rule_based_industry(df)
industry = industry.upper()
emit_mapper_log("info", "Rule-based industry completed",
org_id=org_id, source_id=source_id,
industry=industry, confidence=confidence)
# 2. If confident OR LLM disabled, return immediately
if confidence > 0.75 or not use_llm:
return industry, confidence, True
# 3. LLM fallback
try:
emit_mapper_log("info", "Industry LLM fallback required",
org_id=org_id, source_id=source_id, rule_confidence=confidence)
llm = get_llm_service()
if not llm.is_ready():
emit_mapper_log("warning", "LLM not ready for industry",
org_id=org_id, source_id=source_id)
return industry, confidence, False
# Industry-specific prompt with sample data
columns_str = ",".join(df.columns)
sample_data = df.head(3).to_dict(orient="records")
prompt = f"""Analyze this dataset and determine the business industry vertical:
Columns: {columns_str}
Sample rows: {json.dumps(sample_data)}
Return ONLY JSON:
{{"industry":"SUPERMARKET|MANUFACTURING|PHARMA|RETAIL|WHOLESALE|HEALTHCARE","confidence":0.95}}"""
response = llm.generate(prompt, max_tokens=50, temperature=0.1)
result = json.loads(response)
llm_industry = result["industry"].upper()
llm_confidence = float(result["confidence"])
emit_mapper_log("info", "Industry LLM completed",
org_id=org_id, source_id=source_id,
llm_industry=llm_industry, llm_confidence=llm_confidence)
if llm_confidence > confidence:
return llm_industry, llm_confidence, True
return industry, confidence, False
except Exception as e:
emit_mapper_log("error", "Industry LLM fallback failed",
org_id=org_id, source_id=source_id, error=str(e))
if detection_errors:
detection_errors.labels(detection_type="industry", org_id=org_id, error_type=type(e).__name__).inc()
return industry, confidence, False
# ====================================================================
# πŸ”§ REDIS CACHE HELPERS (Shared by both)
# ====================================================================
def get_cached_detection(org_id: str, source_id: str, detection_type: str) -> Optional[Dict[str, Any]]:
"""
Check Redis for cached detection result
Args:
detection_type: "entity" or "industry"
Returns:
{"type": str, "confidence": float, "cached": True} or None
"""
key = f"{detection_type}:{org_id}:{source_id}"
cached = event_hub.get_key(key)
if cached:
data = json.loads(cached)
data["cached"] = True
return data
return None
def cache_detection(org_id: str, source_id: str, detection_type: str,
value: str, confidence: float):
"""Store detection result in Redis with 1-hour TTL"""
key = f"{detection_type}:{org_id}:{source_id}"
event_hub.setex(key, 3600, json.dumps({
"type": value,
"confidence": confidence,
"cached_by": "detection_engine",
"cached_at": datetime.utcnow().isoformat()
}))