samir12321's picture
Upload api.py with huggingface_hub
a5ad9d1 verified
Raw
History Blame Contribute Delete
63.3 kB
"""
FastAPI decision engine for promotion success prediction.
Predicts, explains, benchmarks, and evaluates risk. Prototype-grade.
Compatible with Salesforce Einstein Studio BYOM and Agentforce.
"""
import os
import threading
from dataclasses import dataclass
from pathlib import Path
from contextlib import asynccontextmanager
from typing import Optional
from datetime import datetime, timezone
import joblib
import numpy as np
import pandas as pd
import shap
from fastapi import Depends, FastAPI, Header, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from data_preprocessing import (
CATEGORICAL_FIELDS,
handle_missing_values,
engineer_features,
)
from field_mapping import apply_field_mapping, PDSD_TO_LEGACY_CPD
# ---------------------------------------------------------------------------
# Model artifacts (populated at startup)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ModelBundle:
"""Immutable snapshot of all model artifacts.
Published via a single pointer swap so readers never see a torn state
where _model is new but _features is still old (or vice versa).
"""
model: object
encoders: dict
features: list
explainer: object
roi_model: Optional[object]
roi_features: list
roi_log_transform: bool
_bundle: Optional[ModelBundle] = None # atomic pointer; None until lifespan completes
_benchmark_stats: dict = {}
_enrichment_context: dict = {} # PG-level stats from Salesforce history
_model_lock = threading.RLock() # guards the BUILD phase of hot-swap only
_retrain_in_progress = False
_unknown_category_count: int = 0 # novelty counter for OrdinalEncoder -1 hits
_analogues_matrix: Optional[np.ndarray] = None # L2-normalised, shape (N, n_analogue_features)
_analogues_meta: Optional[pd.DataFrame] = None # display metadata aligned with matrix rows
_analogue_features_used: list = [] # actual features in matrix (subset of _ANALOGUE_FEATURES)
# Numeric features used for k-NN similarity — PDSD fields + engineered features
_ANALOGUE_FEATURES = [
"Negoptim__SellOutDiscountPerc__c",
"Negoptim__SellOutDiscountUAmt__c",
"Negoptim__SellOutDiscountQtyTarget__c",
"Negoptim__SellOutSourceContribPerc__c",
"Negoptim__SellOutExecutionRateTarget__c",
"Negoptim__Perc__c",
"discount_depth_vs_gross",
"campaign_duration_days",
"sell_out_month",
"sell_out_quarter",
"supplier_acceptance_rate",
"mechanic_family_acceptance_rate",
"days_since_last_promo_for_supplier_pg",
]
MODEL_PATH = os.environ.get("MODEL_PATH", str(Path(__file__).parent / "model.joblib"))
ROI_MODEL_PATH = os.environ.get("ROI_MODEL_PATH", str(Path(__file__).parent / "roi_model.joblib"))
SERVING_CONTEXT_PATH = os.environ.get("SERVING_CONTEXT_PATH", str(Path(__file__).parent / "serving_context.joblib"))
_ADMIN_KEY = os.environ.get("ADMIN_API_KEY")
def _load_serving_context() -> bool:
"""
Load pre-serialized serving context from disk (produced by train_model.py).
Returns True on success. This path is used on HF Spaces where no Salesforce
credentials are available at runtime.
"""
global _benchmark_stats, _enrichment_context, _analogues_matrix, _analogues_meta, _analogue_features_used
ctx_path = Path(SERVING_CONTEXT_PATH)
if not ctx_path.exists():
return False
try:
ctx = joblib.load(ctx_path)
_benchmark_stats = ctx["benchmark_stats"]
_benchmark_stats.pop("degraded", None) # not degraded when loaded from artifact
_enrichment_context = ctx["enrichment_context"]
_analogues_matrix = ctx["analogues_matrix"]
_analogues_meta = ctx["analogues_meta"]
_analogue_features_used = ctx.get("analogue_features", _ANALOGUE_FEATURES)
n_records = len(_analogues_meta) if _analogues_meta is not None else 0
print(f"serving_context.joblib loaded: {_benchmark_stats['total_records']} records, "
f"{n_records} analogues rows")
return True
except Exception as e:
print(f"serving_context.joblib load failed: {e}")
return False
def _compute_benchmark_stats():
"""Pre-compute dataset benchmark statistics and enrichment context at startup.
Prefers serving_context.joblib (offline artifact); falls back to live Salesforce fetch."""
global _benchmark_stats, _enrichment_context
if _load_serving_context():
return
try:
from salesforce_client import connect_to_salesforce, fetch_promotion_data, SUCCESS_STATUSES
sf = connect_to_salesforce()
df = fetch_promotion_data(sf)
labels = df["Negoptim__Status__c"].apply(lambda s: 1 if s in SUCCESS_STATUSES else 0)
df["_label"] = labels
_benchmark_stats = {
"average_success_rate": round(float(labels.mean()), 4),
"total_records": len(df),
"success_count": int(labels.sum()),
"failure_count": int((1 - labels).sum()),
}
print(f"Benchmark stats loaded: {_benchmark_stats['total_records']} records")
_enrichment_context = _build_enrichment_context(df)
print(f"Enrichment context built for {len(_enrichment_context.get('pg_stats', {}))} product groups")
_build_analogues_index(df)
except Exception as e:
_benchmark_stats = {
"average_success_rate": 0.5,
"total_records": 0,
"success_count": 0,
"failure_count": 0,
"degraded": True,
}
_enrichment_context = {}
print(f"Benchmark stats unavailable (live fetch failed: {e}) — /health will report degraded=true")
def _build_enrichment_context(df: pd.DataFrame) -> dict:
"""Build supplier-level statistics for runtime enrichment of new predictions."""
context = {"pg_stats": {}, "monthly_demand": {}, "bu_id_map": {}}
# bu_id_map: PDSD uses BusinessUnit__c (Salesforce ID); no retailer_name available
if "Negoptim__BusinessUnit__c" in df.columns:
bu_ids = df["Negoptim__BusinessUnit__c"].dropna().unique()
context["bu_id_map"] = {str(b): str(b) for b in bu_ids}
df = df.copy()
df["_date"] = pd.to_datetime(df.get("Negoptim__SellOutBDate__c"), errors="coerce")
disc_pct = pd.to_numeric(df.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0)
df["_discount"] = (disc_pct / 100.0).clip(0, 1)
pg_col = "Negoptim__Supplier__c"
if pg_col in df.columns:
for pg, group in df.groupby(pg_col):
dates = group["_date"].dropna()
labels = group["_label"]
discounts = group["_discount"]
qtys = pd.to_numeric(group.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=group.index)), errors="coerce").fillna(0)
pg_data = {
"success_rate": round(float(labels.mean()), 4),
"count": len(group),
"avg_qty": round(float(qtys.mean()), 2),
"avg_discount": round(float(discounts.mean()), 4),
"recent_dates": sorted(dates.dt.strftime("%Y-%m-%d").tolist())[-20:],
"monthly_success": {},
}
if not dates.empty:
group_m = group.copy()
group_m["_month"] = group_m["_date"].dt.month
for month, mgroup in group_m.groupby("_month"):
pg_data["monthly_success"][int(month)] = round(float(mgroup["_label"].mean()), 4)
if len(discounts) >= 5 and discounts.std() > 0.001:
pg_data["price_elasticity"] = round(float(np.corrcoef(discounts, labels)[0, 1]), 4)
else:
pg_data["price_elasticity"] = 0.0
context["pg_stats"][str(pg)] = pg_data
dates_all = df["_date"].dropna()
if not dates_all.empty:
monthly = dates_all.dt.to_period("M").value_counts().sort_index()
avg_count = monthly.mean()
for period, count in monthly.items():
key = f"{period.year}-{period.month:02d}"
context["monthly_demand"][key] = round(min((count / (avg_count + 0.01)) * 50, 100), 1)
context["achievement"] = _build_achievement_context(df)
return context
def _build_achievement_context(df: pd.DataFrame) -> dict:
"""Precompute buyer BU × supplier achievement lookup from records with QuantityFact."""
bu_col = "Negoptim__BusinessUnit__c"
supplier_col = "Negoptim__Supplier__c"
ctx = {"retailer_supplier": {}, "retailer": {}, "supplier": {}}
raw_fact = df.get("Negoptim__QuantityFact__c")
if raw_fact is None or pd.to_numeric(raw_fact, errors="coerce").isna().all():
return ctx
qty_target = pd.to_numeric(df.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=df.index)), errors="coerce").clip(lower=1)
qty_fact = pd.to_numeric(raw_fact, errors="coerce")
df = df.copy()
df["_ach"] = (qty_fact / qty_target).clip(0, 5)
has_data = df[df["_ach"].notna()].copy()
for (r, s), grp in has_data.groupby([bu_col, supplier_col]):
vals = grp["_ach"].tolist()
ctx["retailer_supplier"][(str(r), str(s))] = {
"avg": round(float(np.mean(vals)), 4),
"last": round(float(vals[-1]), 4),
"n": len(vals),
"std": round(float(np.std(vals)), 4) if len(vals) > 1 else 0.0,
}
for r, grp in has_data.groupby(bu_col):
ctx["retailer"][str(r)] = round(float(grp["_ach"].mean()), 4)
for s, grp in has_data.groupby(supplier_col):
ctx["supplier"][str(s)] = round(float(grp["_ach"].mean()), 4)
return ctx
def _build_analogues_index(df: pd.DataFrame) -> None:
"""Build L2-normalised feature matrix for k-NN historical analogue retrieval."""
global _analogues_matrix, _analogues_meta, _analogue_features_used
from salesforce_client import SUCCESS_STATUSES
from data_preprocessing import handle_missing_values, engineer_features
try:
adf = df.copy()
adf = handle_missing_values(adf)
adf = engineer_features(adf)
present = [f for f in _ANALOGUE_FEATURES if f in adf.columns]
_analogue_features_used = present
mat = adf[present].fillna(0.0).values.astype(np.float64)
norms = np.linalg.norm(mat, axis=1, keepdims=True)
norms = np.where(norms < 1e-9, 1.0, norms)
mat_norm = mat / norms
sell_out_dates = pd.to_datetime(adf.get("Negoptim__SellOutBDate__c"), errors="coerce")
meta = pd.DataFrame({
"status": adf["Negoptim__Status__c"].apply(
lambda s: "SUCCESS" if s in SUCCESS_STATUSES else "FAILURE"),
"retailer": adf.get("retailer_name", pd.Series(["UNKNOWN"] * len(adf))).fillna("UNKNOWN").astype(str),
"supplier": adf.get("Negoptim__Supplier__c", pd.Series(["Unknown"] * len(adf))).fillna("Unknown").astype(str),
"discount_pct": pd.to_numeric(adf.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=adf.index)), errors="coerce").fillna(0),
"qty_target": pd.to_numeric(adf.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=adf.index)), errors="coerce").fillna(0),
"campaign_duration_days": pd.to_numeric(adf.get("campaign_duration_days", pd.Series(0, index=adf.index)), errors="coerce").fillna(0),
"discount_depth_vs_gross":pd.to_numeric(adf.get("discount_depth_vs_gross", pd.Series(0, index=adf.index)), errors="coerce").fillna(0).round(4),
"supplier_acceptance_rate":pd.to_numeric(adf.get("supplier_acceptance_rate", pd.Series(0, index=adf.index)), errors="coerce").fillna(0).round(4),
"sell_out_date": sell_out_dates.dt.strftime("%Y-%m-%d").fillna(""),
"n_features": len(present),
})
_analogues_matrix = mat_norm
_analogues_meta = meta.reset_index(drop=True)
print(f"Analogues index: {len(meta)} records × {len(present)} features")
except Exception as exc:
print(f"Analogues index build failed (non-fatal): {exc}")
def _hot_swap_models():
"""Build a new ModelBundle from disk and publish it via atomic pointer swap.
The lock guards the *build* phase (disk I/O + SHAP init) so concurrent
retrain calls don't race each other. The actual pointer assignment is
a single Python bytecode STORE_GLOBAL — atomic in CPython — so readers
on the /predict path never need to acquire the lock.
"""
global _bundle
with _model_lock:
artifacts = joblib.load(MODEL_PATH)
model = artifacts["model"]
encoders = artifacts["encoders"]
features = artifacts["features"]
explainer = shap.TreeExplainer(model)
roi_model = None
roi_features: list = []
roi_log_transform = False
if Path(ROI_MODEL_PATH).exists():
roi_artifacts = joblib.load(ROI_MODEL_PATH)
roi_model = roi_artifacts.get("models") or roi_artifacts.get("model")
roi_features = roi_artifacts["features"]
roi_log_transform = roi_artifacts.get("log_transform", False)
_bundle = ModelBundle(
model=model,
encoders=encoders,
features=features,
explainer=explainer,
roi_model=roi_model,
roi_features=roi_features,
roi_log_transform=roi_log_transform,
)
print(f"Hot-swap complete: {len(_bundle.features)} features loaded")
def _run_retrain():
"""Run full retraining pipeline in a background thread, then hot-swap."""
global _retrain_in_progress
try:
print("Background retraining started...")
from train_model import train_and_evaluate
train_and_evaluate()
_hot_swap_models()
_compute_benchmark_stats()
print("Background retraining complete — models updated in memory")
except Exception as e:
print(f"Background retraining failed: {e}")
finally:
_retrain_in_progress = False
def _trigger_retrain():
"""Spawn a background thread to retrain without blocking the API."""
global _retrain_in_progress
if _retrain_in_progress:
print("Retraining already in progress — skipping")
return
_retrain_in_progress = True
t = threading.Thread(target=_run_retrain, daemon=True)
t.start()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Load model artifacts and publish as an atomic ModelBundle at startup."""
global _bundle
artifacts = joblib.load(MODEL_PATH)
model = artifacts["model"]
encoders = artifacts["encoders"]
features = artifacts["features"]
explainer = shap.TreeExplainer(model)
roi_model = None
roi_features: list = []
roi_log_transform = False
if Path(ROI_MODEL_PATH).exists():
roi_artifacts = joblib.load(ROI_MODEL_PATH)
# Support both new quantile dict {"models": {...}} and legacy {"model": ...}
roi_model = roi_artifacts.get("models") or roi_artifacts.get("model")
roi_features = roi_artifacts["features"]
roi_log_transform = roi_artifacts.get("log_transform", False)
print(f"ROI model loaded ({len(roi_features)} features, log_transform={roi_log_transform})")
else:
print("ROI model not found — ROI predictions will use formula only")
_bundle = ModelBundle(
model=model,
encoders=encoders,
features=features,
explainer=explainer,
roi_model=roi_model,
roi_features=roi_features,
roi_log_transform=roi_log_transform,
)
try:
from signals_db import init_db, DB_PATH as _DB_PATH
init_db(_DB_PATH)
except Exception as e:
print(f"DB init warning: {e}")
_compute_benchmark_stats()
# Warn for any legacy-shim target fields no longer in the model —
# indicates the PDSD-native retrain has made that shim obsolete.
for legacy_field in PDSD_TO_LEGACY_CPD.values():
if legacy_field not in _bundle.features:
print(f" INFO: legacy shim target '{legacy_field}' not in model features — "
f"safe to remove from PDSD_TO_LEGACY_CPD after confirming retrain.")
print(f"Model loaded from {MODEL_PATH} ({len(_bundle.features)} features)")
yield
app = FastAPI(
title="Negoptim Promotion Decision Engine",
description="Predicts, explains, benchmarks, and evaluates promotion risk. "
"Prototype-grade. Designed for Salesforce Einstein Studio BYOM.",
version="2.0.0",
lifespan=lifespan,
)
_CORS_ORIGIN_REGEX = (
os.environ.get("CORS_ORIGIN_REGEX")
or r"https://[^/]+\.(lightning\.force\.com|my\.salesforce\.com)$"
)
app.add_middleware(
CORSMiddleware,
allow_origin_regex=_CORS_ORIGIN_REGEX,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class PromotionInput(BaseModel):
"""
Input schema covering both PDSD fields (primary, per PROJECT_CONTEXT §5) and
legacy CPD fields (retained for backward compat until model is retrained on PDSD).
buyer_role: 'sender' (buyer sending PDS to supplier) or 'receiver' (buyer reviewing
inbound supplier PDS). Defaults to 'receiver' — the primary use case per §1.
"""
record_id: Optional[str] = None
buyer_role: Optional[str] = "receiver" # 'sender' | 'receiver'
pds_status: Optional[str] = None # PDS Header Status__c for role auto-detection
# ── PDSD SellOut fields (primary schema) ──────────────────────────────────
Negoptim__SellOutDiscountType__c: Optional[str] = None
Negoptim__SellOutDiscountPerc__c: Optional[float] = None
Negoptim__SellOutDiscountUAmt__c: Optional[float] = None
Negoptim__SellOutDiscountQtyTarget__c: Optional[float] = None
Negoptim__SellOutSourceContribPerc__c: Optional[float] = None
Negoptim__SellOutExecutionRateTarget__c:Optional[float] = None
Negoptim__SellOutManagementFix__c: Optional[float] = None
Negoptim__SellOutManagementUnit__c: Optional[float] = None
Negoptim__SellOutBDate__c: Optional[str] = None
Negoptim__SellOutEDate__c: Optional[str] = None
# ── PDSD SellIn fields ────────────────────────────────────────────────────
Negoptim__Perc__c: Optional[float] = None
Negoptim__UAmt__c: Optional[float] = None
# ── PDS Header fields (flattened) ─────────────────────────────────────────
Negoptim__Supplier__c: Optional[str] = None
Negoptim__NegoScope__c: Optional[str] = None
Negoptim__BusinessUnit__c: Optional[str] = None
Negoptim__Product__c: Optional[str] = None
pds_promo_agreement_type: Optional[str] = None
pds_is_sell_in: Optional[bool] = None
pds_is_sell_out: Optional[bool] = None
pds_is_services: Optional[bool] = None
# ── Historical aggregations (computed by Apex or provided by training pipeline) ──
supplier_acceptance_rate: Optional[float] = None
supplier_rejection_rate: Optional[float] = None
negoscope_acceptance_rate: Optional[float] = None
mechanic_family_acceptance_rate: Optional[float] = None
supplier_pg_prior_count: Optional[float] = None
days_since_last_promo_for_supplier_pg: Optional[float] = None
# ── PG pricing (from Apex SOQL join on Product → PG) ─────────────────────
pg_gross_price: Optional[float] = None
pg_net_price: Optional[float] = None
# ── Legacy CPD fields (kept for backward compat — old model still uses these) ──
Negoptim__NetPricePromo__c: Optional[float] = None
Negoptim__NetPriceRegular__c: Optional[float] = None
Negoptim__TariffCostPromo__c: Optional[float] = None
Negoptim__COGS__c: Optional[float] = None
Negoptim__RetailPriceRegular__c: Optional[float] = None
Negoptim__SellOutDiscountValue__c: Optional[float] = None
Negoptim__QuantityTarget__c: Optional[float] = None
Negoptim__QuantityFact__c: Optional[float] = None
Negoptim__OpType__c: Optional[str] = None
Negoptim__TradeOpType__c: Optional[str] = None
Negoptim__BudgetType__c: Optional[str] = None
Negoptim__SellOutOfferType__c: Optional[str] = None
Negoptim__LineStatus__c: Optional[str] = None
Negoptim__SellInDiscountUType__c: Optional[str] = None
Negoptim__BusinessUnitTarget__c: Optional[str] = None
Negoptim__PG__c: Optional[str] = None
Negoptim__IsFree__c: Optional[bool] = None
Negoptim__IsFromAgreementLinePrenego__c: Optional[bool] = None
Negoptim__IsIncludetoNetPriceStmt__c: Optional[bool] = None
Negoptim__SellInFact__c: Optional[float] = None
Negoptim__SellInTarget__c: Optional[float] = None
Negoptim__SellOutDiscMassValueContribTarget__c: Optional[float] = None
Negoptim__SellInDiscountMassValueTarget__c: Optional[float] = None
Negoptim__SalesExecutionRateTarget__c: Optional[float] = None
Negoptim__TariffCostRegular__c: Optional[float] = None
Negoptim__SellInBDateMonth__c: Optional[float] = None
Negoptim__SellInBDateWeek__c: Optional[float] = None
Negoptim__SellOutDiscountUnitValue__c: Optional[float] = None
Negoptim__NetPriceWoTaxPromo__c: Optional[float] = None
class FeatureImpact(BaseModel):
feature: str
impact: float = Field(description="SHAP value - positive pushes toward SUCCESS")
direction: str = Field(description="'positive' or 'negative' for success")
class Recommendation(BaseModel):
message: str
priority: str = Field(description="'high', 'medium', or 'low'")
class Benchmark(BaseModel):
average_success_rate: float
percentile: float = Field(description="Where this prediction ranks vs dataset (0-100)")
total_records: int
vs_average: str = Field(description="'above', 'below', or 'at' average")
class ROIPrediction(BaseModel):
# ── Point estimates (p50 / backward-compat aliases) ───────────────────────
roi_pct: Optional[float] = Field(None, description="Return on trade investment (%) — p50. None when cost basis unavailable")
roi_label: str = Field(description="'Strong', 'Moderate', 'Weak', 'Negative', or 'Unknown'")
predicted_units: Optional[float] = Field(None, description="Predicted units sold (p50 estimate). None when cost basis unavailable")
predicted_profit: Optional[float] = Field(None, description="Predicted profit €, p50 — prototype estimate, not a forecast")
net_margin_per_unit: Optional[float] = Field(None, description="Profit per unit after trade funding (€)")
funding_per_unit: Optional[float] = Field(None, description="Trade funding cost per unit (€)")
break_even_roi: Optional[float] = Field(None, description="Minimum ROI% for this deal to be profitable")
cost_basis_source: str = Field(description="'direct_cogs', 'pg_margin_proxy', or 'unknown'")
# ── Quantile interval fields (populated when quantile model is available) ─
predicted_units_p10: Optional[float] = Field(None, description="Pessimistic units sold (p10)")
predicted_units_p50: Optional[float] = Field(None, description="Median units sold (p50)")
predicted_units_p90: Optional[float] = Field(None, description="Optimistic units sold (p90)")
predicted_profit_low: Optional[float] = Field(None, description="Lower-bound profit € (p10)")
predicted_profit_mid: Optional[float] = Field(None, description="Median profit € (p50)")
predicted_profit_high: Optional[float] = Field(None, description="Upper-bound profit € (p90)")
roi_pct_low: Optional[float] = Field(None, description="ROI % at p10 units")
roi_pct_mid: Optional[float] = Field(None, description="ROI % at p50 units")
roi_pct_high: Optional[float] = Field(None, description="ROI % at p90 units")
class HistoricalAnalogue(BaseModel):
rank: int
similarity: float = Field(description="Cosine similarity (0-1) to this promotion's feature vector")
status: str = Field(description="'SUCCESS' or 'FAILURE'")
supplier: str
discount_pct: float
qty_target: int
campaign_duration_days: int
discount_depth_vs_gross: float
supplier_acceptance_rate: float
sell_out_date: str
class BatchPredictionInput(BaseModel):
promotions: list[PromotionInput] = Field(
..., description="List of PDSD promotion inputs (max 200)",
max_length=200,
)
class PredictionOutput(BaseModel):
# Core prediction
success_probability: float
prediction: str
expected_profit: Optional[float]
# Role context — drives which view the LWC renders
buyer_role: str = Field(
default="receiver",
description="'sender': buyer sent PDS, predicting supplier validation. "
"'receiver': buyer reviewing inbound PDS, predicting financial impact."
)
# ROI dimension
roi: ROIPrediction
# Explainability
top_factors: list[FeatureImpact]
recommendations: list[Recommendation]
# Confidence & risk
confidence: float
risk_level: str
# Benchmark
benchmark: Benchmark
# Historical analogues (k-NN)
analogues: list[HistoricalAnalogue] = Field(
default_factory=list,
description="k most similar historical promotions by feature cosine similarity"
)
# ---------------------------------------------------------------------------
# Preprocessing (mirrors training pipeline exactly)
# ---------------------------------------------------------------------------
def _apply_legacy_field_mapping(row: dict) -> dict:
"""Delegate to the canonical mapping in field_mapping.py."""
return apply_field_mapping(row)
def preprocess_input(data: PromotionInput, bundle: Optional[ModelBundle] = None) -> pd.DataFrame:
"""Transform a single promotion input into a model-ready feature row.
Accepts an explicit bundle snapshot so the caller can pin to one consistent
artifact set for the duration of a request (avoids reading _bundle twice).
Falls back to the current global _bundle when not supplied.
"""
global _unknown_category_count
if bundle is None:
bundle = _bundle
row = data.model_dump()
row = _apply_legacy_field_mapping(row)
df = pd.DataFrame([row])
df = handle_missing_values(df)
df = engineer_features(df)
# Categorical encoding — support both new OrdinalEncoder and legacy LabelEncoders
enc = bundle.encoders.get("_ordinal")
if enc is not None:
cats = [c for c in CATEGORICAL_FIELDS if c in df.columns]
encoded = enc.transform(df[cats].astype(str))
# Count novelty hits (encoded value == -1 means unseen category)
_unknown_category_count += int((encoded == -1).sum())
df[cats] = encoded
else:
# Legacy per-column LabelEncoder path (pre-retrain model artifacts)
for col in CATEGORICAL_FIELDS:
if col in df.columns and col in bundle.encoders:
le = bundle.encoders[col]
val = str(df[col].iloc[0])
df[col] = le.transform([val])[0] if val in le.classes_ else -1
df = _apply_historical_aggregates(df, data)
df = _enrich_single_prediction(df, data)
df = _enrich_achievement(df, data, bundle)
for col in bundle.features:
if col not in df.columns:
df[col] = 0.0
else:
if df[col].isna().all():
df[col] = 0.0
elif str(df[col].dtype) == "object":
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0)
return df[bundle.features]
def _apply_historical_aggregates(df: pd.DataFrame, data: PromotionInput) -> pd.DataFrame:
"""
Populate the six leakage-safe historical-aggregate features from serving_context.
Called after handle_missing_values (which set them to 0.0) and before feature
alignment, so the model receives values matching the training distribution.
These features were previously always 0.0 at serving because Apex does not
compute them — fix defers the computation to the API using the pre-built
enrichment context serialised at training time (Fix 2).
"""
ctx = _enrichment_context
if not ctx:
return df
dataset_avg = ctx.get("dataset_avg_success", _benchmark_stats.get("average_success_rate", 0.5))
supplier_id = str(data.Negoptim__Supplier__c) if data.Negoptim__Supplier__c else None
scope_id = str(data.Negoptim__NegoScope__c) if data.Negoptim__NegoScope__c else None
product_id = str(data.Negoptim__Product__c) if data.Negoptim__Product__c else None
disc_type = data.Negoptim__SellOutDiscountType__c or ""
family_code = disc_type[:2] if disc_type else ""
# supplier_acceptance_rate / supplier_rejection_rate
pg_stats = ctx.get("pg_stats", {})
sup_stats = pg_stats.get(supplier_id, {}) if supplier_id else {}
sup_rate = sup_stats.get("success_rate", dataset_avg)
df["supplier_acceptance_rate"] = sup_rate
df["supplier_rejection_rate"] = 1.0 - sup_rate
# negoscope_acceptance_rate
scope_stats = ctx.get("negoscope_stats", {}).get(scope_id, {}) if scope_id else {}
df["negoscope_acceptance_rate"] = scope_stats.get("success_rate", dataset_avg)
# mechanic_family_acceptance_rate
fam_stats = ctx.get("mechanic_family_stats", {}).get(family_code, {}) if family_code else {}
df["mechanic_family_acceptance_rate"] = fam_stats.get("success_rate", dataset_avg)
# supplier_pg_prior_count and days_since_last_promo_for_supplier_pg
if supplier_id and product_id:
sp_key = f"{supplier_id}|{product_id}"
sp_data = ctx.get("supplier_product_stats", {}).get(sp_key, {})
df["supplier_pg_prior_count"] = float(sp_data.get("count", 0))
last_iso = sp_data.get("last_date_iso")
if last_iso:
try:
last_date = pd.to_datetime(last_iso, utc=True)
today = pd.Timestamp.now(tz="UTC")
df["days_since_last_promo_for_supplier_pg"] = float((today - last_date).days)
except Exception:
df["days_since_last_promo_for_supplier_pg"] = 365.0
else:
df["days_since_last_promo_for_supplier_pg"] = 365.0
else:
df["supplier_pg_prior_count"] = 0.0
df["days_since_last_promo_for_supplier_pg"] = 365.0
return df
def _enrich_single_prediction(df: pd.DataFrame, data: PromotionInput) -> pd.DataFrame:
"""Compute enrichment features for a single prediction using pre-built context."""
# pg_stats is keyed by Supplier__c on PDSD (was retailer_name on CPD)
supplier_id = str(data.Negoptim__Supplier__c) if data.Negoptim__Supplier__c else None
pg_stats = _enrichment_context.get("pg_stats", {}).get(supplier_id, {}) if supplier_id else {}
pred_date = None
if data.Negoptim__SellOutBDate__c:
try:
pred_date = pd.to_datetime(data.Negoptim__SellOutBDate__c)
except Exception:
pass
# Fatigue
if pg_stats and pred_date:
recent_dates = [pd.to_datetime(d) for d in pg_stats.get("recent_dates", [])]
prior = [d for d in recent_dates if d < pred_date]
if prior:
days_diff = [(pred_date - d).days for d in prior]
df["promo_count_last_90d"] = sum(1 for d in days_diff if d <= 90)
df["promo_count_last_180d"] = sum(1 for d in days_diff if d <= 180)
df["days_since_last_promo"] = min(days_diff)
else:
df["promo_count_last_90d"] = 0.0
df["promo_count_last_180d"] = 0.0
df["days_since_last_promo"] = 365.0
df["avg_discount_last_90d"] = pg_stats.get("avg_discount", 0.0)
else:
df["promo_count_last_90d"] = 0.0
df["promo_count_last_180d"] = 0.0
df["avg_discount_last_90d"] = 0.0
df["days_since_last_promo"] = 365.0
# Category trends
if pg_stats:
df["category_success_rate"] = pg_stats.get("success_rate", 0.5)
df["category_avg_qty"] = pg_stats.get("avg_qty", 0.0)
total_records = _benchmark_stats.get("total_records", 0)
df["category_promo_intensity"] = pg_stats.get("count", 0) / max(total_records, 1)
else:
df["category_success_rate"] = 0.5
df["category_avg_qty"] = 0.0
df["category_promo_intensity"] = 0.0
# Price sensitivity
df["price_elasticity"] = pg_stats.get("price_elasticity", 0.0) if pg_stats else 0.0
# Use SellOutDiscountPerc__c (0-100 scale) as current discount depth
current_discount = (data.Negoptim__SellOutDiscountPerc__c or 0) / 100.0
avg_discount = pg_stats.get("avg_discount", 0.0) if pg_stats else 0.0
df["discount_vs_pg_avg"] = current_discount - avg_discount
df["optimal_discount_gap"] = 0.0
return df
def _enrich_achievement(df: pd.DataFrame, data: PromotionInput, bundle: ModelBundle) -> pd.DataFrame:
"""Inject buyer BU × supplier achievement stats from Salesforce history."""
ach_ctx = _enrichment_context.get("achievement", {})
if not ach_ctx:
return df
retailer = str(data.Negoptim__BusinessUnit__c or "")
supplier = str(data.Negoptim__Supplier__c or "")
rs_stats = ach_ctx.get("retailer_supplier", {}).get((retailer, supplier), {})
ach_features = {
"retailer_supplier_avg_achievement": rs_stats.get("avg", np.nan),
"retailer_supplier_last_achievement": rs_stats.get("last", np.nan),
"retailer_supplier_n_promos": rs_stats.get("n", 0.0),
"retailer_supplier_achievement_std": rs_stats.get("std", np.nan),
"retailer_avg_achievement": ach_ctx.get("retailer", {}).get(retailer, np.nan),
"supplier_avg_achievement": ach_ctx.get("supplier", {}).get(supplier, np.nan),
}
for feat, val in ach_features.items():
if feat in bundle.features or feat in bundle.roi_features:
df[feat] = 0.0 if (val is None or (isinstance(val, float) and np.isnan(val))) else val
return df
def _predict_proba(feature_row: pd.DataFrame, bundle: ModelBundle) -> float:
"""Get success probability from a preprocessed feature row."""
return float(bundle.model.predict_proba(feature_row)[:, 1][0])
# ---------------------------------------------------------------------------
# SHAP explanation
# ---------------------------------------------------------------------------
def explain_prediction(feature_row: pd.DataFrame, bundle: ModelBundle, top_n: int = 5) -> list[FeatureImpact]:
"""Return top SHAP feature impacts for a single prediction."""
shap_values = bundle.explainer.shap_values(feature_row)
if isinstance(shap_values, list):
values = shap_values[1][0]
else:
values = shap_values[0]
indices = np.argsort(np.abs(values))[::-1][:top_n]
return [
FeatureImpact(
feature=bundle.features[i],
impact=round(float(values[i]), 4),
direction="positive" if values[i] > 0 else "negative",
)
for i in indices
]
# ---------------------------------------------------------------------------
# Recommendations engine
# ---------------------------------------------------------------------------
def generate_recommendations(
data: PromotionInput, probability: float, factors: list[FeatureImpact],
) -> list[Recommendation]:
"""Explanation-grounded suggestions based on SHAP factors and PDSD field values."""
recs = []
factor_names = {f.feature for f in factors}
# Discount depth check
disc_pct = data.Negoptim__SellOutDiscountPerc__c or data.Negoptim__SellOutDiscountValue__c or 0
if disc_pct > 40:
recs.append(Recommendation(
message="Discount depth exceeds 40%. Very high discounts historically have "
"lower validation rates — consider whether a lighter mechanic achieves the same sell-out.",
priority="high",
))
if "campaign_duration_days" in factor_names:
f = next(f for f in factors if f.feature == "campaign_duration_days")
if f.direction == "negative":
recs.append(Recommendation(
message="Campaign duration is negatively impacting validation probability. "
"Shorter, more focused promotions tend to score better.",
priority="medium",
))
qty_feat = "Negoptim__SellOutDiscountQtyTarget__c"
if qty_feat not in factor_names:
qty_feat = "Negoptim__QuantityTarget__c"
if qty_feat in factor_names:
f = next(f for f in factors if f.feature == qty_feat)
if f.direction == "negative":
recs.append(Recommendation(
message="Volume target is flagged as a risk factor. "
"Consider aligning the quantity target more closely with historical averages for this mechanic.",
priority="medium",
))
if "supplier_acceptance_rate" in factor_names:
f = next(f for f in factors if f.feature == "supplier_acceptance_rate")
if f.direction == "negative":
recs.append(Recommendation(
message="This supplier has a below-average historical validation rate with your organisation. "
"Scrutinise the terms carefully before responding.",
priority="high",
))
if probability < 0.5:
recs.append(Recommendation(
message="Validation probability is below 50%. Review the top risk factors "
"and consider requesting revised terms from the supplier.",
priority="high",
))
elif probability < 0.75:
recs.append(Recommendation(
message="Validation probability is moderate. Targeted adjustments to the "
"top negative drivers could significantly improve the outlook.",
priority="low",
))
return recs
# ---------------------------------------------------------------------------
# Confidence & risk
# ---------------------------------------------------------------------------
def compute_confidence_and_risk(probability: float) -> tuple[float, str]:
"""Confidence = distance from 0.5 (decision boundary), scaled to 0-1."""
confidence = round(abs(probability - 0.5) * 2, 4)
if confidence >= 0.6:
risk_level = "LOW"
elif confidence >= 0.3:
risk_level = "MEDIUM"
else:
risk_level = "HIGH"
return confidence, risk_level
# ---------------------------------------------------------------------------
# Benchmarking
# ---------------------------------------------------------------------------
def compute_benchmark(probability: float) -> Benchmark:
"""Compare this prediction against the dataset average."""
avg_rate = _benchmark_stats.get("average_success_rate", 0.5)
total = _benchmark_stats.get("total_records", 0)
if probability >= avg_rate:
percentile = 50 + (probability - avg_rate) / (1.0 - avg_rate + 0.001) * 50
else:
percentile = (probability / (avg_rate + 0.001)) * 50
percentile = round(min(max(percentile, 0), 100), 1)
if probability > avg_rate + 0.02:
vs_average = "above"
elif probability < avg_rate - 0.02:
vs_average = "below"
else:
vs_average = "at"
return Benchmark(
average_success_rate=avg_rate,
percentile=percentile,
total_records=total,
vs_average=vs_average,
)
# ---------------------------------------------------------------------------
# ROI prediction
# ---------------------------------------------------------------------------
def _resolve_cost_basis(data: PromotionInput) -> dict:
"""
Determine unit cost basis without fabrication (PROJECT_CONTEXT §2).
Returns dict with keys: source, cogs (float|None), margin (float|None).
source='direct_cogs' — Negoptim__COGS__c supplied by Apex
source='pg_margin_proxy' — gross minus net price used as margin proxy
source='unknown' — no cost data; ROI currency fields must be None
"""
if data.Negoptim__COGS__c is not None:
return {"source": "direct_cogs", "cogs": data.Negoptim__COGS__c, "margin": None}
if data.pg_gross_price is not None and data.pg_net_price is not None:
return {"source": "pg_margin_proxy", "cogs": None,
"margin": data.pg_gross_price - data.pg_net_price}
return {"source": "unknown", "cogs": None, "margin": None}
def compute_roi_prediction(data: PromotionInput, feature_row: pd.DataFrame,
bundle: Optional[ModelBundle] = None,
acceptance_proba: float = 1.0) -> ROIPrediction:
"""
Prototype ROI estimate — directional only, not a forecast.
Currency fields are None when no cost basis is available (PROJECT_CONTEXT §2).
"""
cost = _resolve_cost_basis(data)
net_price = data.pg_net_price or data.Negoptim__NetPricePromo__c or 0.0
discount_pct = data.Negoptim__SellOutDiscountPerc__c or data.Negoptim__SellOutDiscountValue__c or 0.0
qty_target = (data.Negoptim__SellOutDiscountQtyTarget__c
or data.Negoptim__QuantityTarget__c or 0.0)
funding_per_unit = net_price * (discount_pct / 100)
if cost["source"] == "unknown":
return ROIPrediction(
roi_pct=None,
roi_label="Unknown",
predicted_units=None,
predicted_profit=None,
net_margin_per_unit=None,
funding_per_unit=round(funding_per_unit, 4) if funding_per_unit else None,
break_even_roi=None,
cost_basis_source="unknown",
)
if cost["source"] == "direct_cogs":
net_margin_per_unit = net_price - cost["cogs"] - funding_per_unit
else:
net_margin_per_unit = cost["margin"] - funding_per_unit
if funding_per_unit > 0.001:
roi_pct = (net_margin_per_unit / funding_per_unit) * 100
else:
gross_margin = cost["cogs"] if cost["source"] == "direct_cogs" else cost["margin"]
roi_pct = (gross_margin / (net_price + 0.0001)) * 100 if gross_margin is not None else 0.0
if roi_pct >= 100:
roi_label = "Strong"
elif roi_pct >= 50:
roi_label = "Moderate"
elif roi_pct >= 0:
roi_label = "Weak"
else:
roi_label = "Negative"
b = bundle if bundle is not None else _bundle
roi_m = b.roi_model if b is not None else None
roi_feats = b.roi_features if b is not None else []
roi_log = b.roi_log_transform if b is not None else False
def _predict_ratio(model_or_dict, key: str) -> float:
"""Run a single quantile model and return the clamped achievement ratio."""
m = model_or_dict[key] if isinstance(model_or_dict, dict) else model_or_dict
roi_row = feature_row[[f for f in roi_feats if f in feature_row.columns]].copy()
for f in roi_feats:
if f not in roi_row.columns:
roi_row[f] = 0.0
roi_row = roi_row[roi_feats].fillna(0.0)
raw = float(m.predict(roi_row)[0])
ratio = float(np.expm1(raw)) if roi_log else raw
return max(0.0, min(ratio, 5.0))
# --- Quantile predictions ------------------------------------------------
p10_ratio = p50_ratio = p90_ratio = None
units_p10 = units_p50 = units_p90 = None
profit_low = profit_mid = profit_high = None
roi_pct_low = roi_pct_mid = roi_pct_high = None
if roi_m is not None and qty_target > 0:
try:
is_quantile = isinstance(roi_m, dict) and "p50" in roi_m
if is_quantile:
p10_ratio = _predict_ratio(roi_m, "p10")
p50_ratio = _predict_ratio(roi_m, "p50")
p90_ratio = _predict_ratio(roi_m, "p90")
else:
# Legacy single model — treat as p50 only
p50_ratio = _predict_ratio(roi_m, "p50") if isinstance(roi_m, dict) else max(0.0, min(float(np.expm1(roi_m.predict(feature_row[[f for f in roi_feats if f in feature_row.columns]].reindex(columns=roi_feats, fill_value=0.0).fillna(0.0))[0])) if roi_log else float(roi_m.predict(feature_row[[f for f in roi_feats if f in feature_row.columns]].reindex(columns=roi_feats, fill_value=0.0).fillna(0.0))[0]), 5.0), 0.0)
def _make_units_profit(ratio):
if ratio is None:
return None, None, None
u = round(ratio * qty_target, 0)
p = round(u * net_margin_per_unit, 2)
r = round((net_margin_per_unit / funding_per_unit) * 100, 1) if funding_per_unit > 0.001 else roi_pct
return u, p, r
units_p10, profit_low, roi_pct_low = _make_units_profit(p10_ratio)
units_p50, profit_mid, roi_pct_mid = _make_units_profit(p50_ratio)
units_p90, profit_high, roi_pct_high = _make_units_profit(p90_ratio)
except Exception:
pass
# Fall back to heuristic when model unavailable or errored
conditional_ratio = p50_ratio if p50_ratio is not None else 0.85
predicted_units = units_p50 if units_p50 is not None else round(conditional_ratio * qty_target, 0)
predicted_profit = profit_mid if profit_mid is not None else round(predicted_units * net_margin_per_unit, 2)
return ROIPrediction(
roi_pct=round(roi_pct, 1),
roi_label=roi_label,
predicted_units=predicted_units,
predicted_profit=predicted_profit,
net_margin_per_unit=round(net_margin_per_unit, 4),
funding_per_unit=round(funding_per_unit, 4),
break_even_roi=0.0,
cost_basis_source=cost["source"],
# Quantile interval fields
predicted_units_p10=units_p10,
predicted_units_p50=units_p50,
predicted_units_p90=units_p90,
predicted_profit_low=profit_low,
predicted_profit_mid=profit_mid,
predicted_profit_high=profit_high,
roi_pct_low=roi_pct_low,
roi_pct_mid=roi_pct_mid,
roi_pct_high=roi_pct_high,
)
# ---------------------------------------------------------------------------
# Historical analogues retrieval
# ---------------------------------------------------------------------------
def _find_analogues(feature_row: pd.DataFrame, k: int = 5) -> list[HistoricalAnalogue]:
"""Return k historical promotions most similar to feature_row by cosine similarity."""
if _analogues_matrix is None or _analogues_meta is None:
return []
# Build query vector aligned to the features actually used in the matrix
feat_list = _analogue_features_used or _ANALOGUE_FEATURES
vec = np.zeros(len(feat_list), dtype=np.float64)
for i, feat in enumerate(feat_list):
if feat in feature_row.columns:
vec[i] = float(feature_row[feat].fillna(0.0).iloc[0])
norm = np.linalg.norm(vec)
if norm < 1e-9:
return []
vec_norm = vec / norm
# Cosine similarity against pre-normalised index (clamp to [-1, 1] for safety)
sims = np.clip(_analogues_matrix.dot(vec_norm), -1.0, 1.0)
top_idx = np.argsort(sims)[::-1][:k]
results = []
for rank, idx in enumerate(top_idx):
m = _analogues_meta.iloc[idx]
results.append(HistoricalAnalogue(
rank=rank + 1,
similarity=round(float(sims[idx]), 3),
status=str(m["status"]),
supplier=str(m.get("supplier", "")),
discount_pct=round(float(m["discount_pct"]), 1),
qty_target=int(m["qty_target"]),
campaign_duration_days=int(m["campaign_duration_days"]),
discount_depth_vs_gross=round(float(m.get("discount_depth_vs_gross", 0)), 4),
supplier_acceptance_rate=round(float(m.get("supplier_acceptance_rate", 0)), 4),
sell_out_date=str(m["sell_out_date"]),
))
return results
# ---------------------------------------------------------------------------
# Admin auth dependency
# ---------------------------------------------------------------------------
def _require_admin_key(x_admin_key: Optional[str] = Header(None)):
if not _ADMIN_KEY:
raise HTTPException(status_code=503, detail="Admin endpoints disabled: ADMIN_API_KEY env var not set")
if x_admin_key != _ADMIN_KEY:
raise HTTPException(status_code=401, detail="Invalid admin key")
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health")
def health():
"""Health check for monitoring and BYOM connectivity tests."""
b = _bundle
degraded = b is None or _benchmark_stats.get("degraded", False)
return {
"status": "ok",
"version": "2.0.0",
"features": len(b.features) if b else 0,
"degraded": degraded,
}
@app.get("/warmup")
def warmup_probe():
"""Dedicated warmup probe — hit this to wake the HF Space and verify model readiness.
Uptime-ping services (UptimeRobot, cron) should target this endpoint every 5 minutes
during business hours so the free-tier Space doesn't sleep mid-session."""
b = _bundle
return {
"status": "ok",
"model_loaded": b is not None,
"features": len(b.features) if b else 0,
"ready_at": datetime.now(timezone.utc).isoformat(),
}
def _resolve_buyer_role(promotion: PromotionInput) -> str:
"""
Determine whether the buyer is acting as sender or receiver.
'receiver' is the primary use case (buyer reviewing inbound supplier PDS).
'sender' applies when buyer created the PDS themselves.
"""
if promotion.buyer_role in ("sender", "receiver"):
return promotion.buyer_role
# Auto-detect from PDS status: if PDS is submitted/locked, buyer is the receiver.
submitted = {"Submitted", "Soumis", "En cours de traitement", "Under Review"}
if promotion.pds_status and promotion.pds_status in submitted:
return "receiver"
return "receiver" # safe default per PROJECT_CONTEXT §1
@app.post("/predict", response_model=PredictionOutput)
def predict(promotion: PromotionInput):
"""Prediction with SHAP explanation, ROI estimate, benchmark, risk level, and buyer role."""
try:
# Pin a single bundle snapshot for the entire request so a concurrent
# hot-swap cannot produce mismatched model/features/explainer.
bundle = _bundle
if bundle is None:
raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.")
feature_row = preprocess_input(promotion, bundle=bundle)
proba = _predict_proba(feature_row, bundle=bundle)
label = "SUCCESS" if proba >= 0.5 else "FAILURE"
buyer_role = _resolve_buyer_role(promotion)
qty = (
promotion.Negoptim__SellOutDiscountQtyTarget__c
or promotion.Negoptim__QuantityTarget__c
or 0
)
cost = _resolve_cost_basis(promotion)
net = promotion.pg_net_price or promotion.Negoptim__NetPricePromo__c or 0
if cost["source"] == "unknown":
expected_profit = None
elif cost["source"] == "direct_cogs":
expected_profit = round(proba * (net - cost["cogs"]) * qty, 2)
else:
expected_profit = round(proba * cost["margin"] * qty, 2)
top_factors = explain_prediction(feature_row, bundle=bundle)
roi = compute_roi_prediction(promotion, feature_row, bundle=bundle, acceptance_proba=proba)
recommendations = generate_recommendations(promotion, proba, top_factors)
confidence, risk_level = compute_confidence_and_risk(proba)
benchmark = compute_benchmark(proba)
analogues = _find_analogues(feature_row, k=5)
output = PredictionOutput(
success_probability=round(proba, 4),
prediction=label,
expected_profit=expected_profit,
buyer_role=buyer_role,
roi=roi,
top_factors=top_factors,
recommendations=recommendations,
confidence=confidence,
risk_level=risk_level,
benchmark=benchmark,
analogues=analogues,
)
if promotion.record_id:
try:
from signals_db import log_prediction, DB_PATH as _DB_PATH
log_prediction(
record_id=promotion.record_id,
predicted_at=datetime.now(timezone.utc).isoformat(),
predicted_probability=round(proba, 4),
predicted_label=label,
predicted_roi_pct=round(roi.roi_pct, 2) if roi else None,
db_path=_DB_PATH,
)
except Exception:
pass
return output
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=422, detail=f"Prediction failed: {e}")
@app.post("/predict/batch", response_model=list[PredictionOutput])
def predict_batch(batch: BatchPredictionInput):
"""Score a list of PDSD lines in one call (max 200).
Vectorises model.predict_proba and SHAP over the stacked feature matrix
so the compute cost scales sub-linearly with batch size.
Designed for PromotionInsightsAction bulk Agentforce invocations.
"""
try:
bundle = _bundle
if bundle is None:
raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.")
if not batch.promotions:
return []
# Preprocess each row individually (enrichment is per-row)
rows = [preprocess_input(p, bundle=bundle) for p in batch.promotions]
stacked = pd.concat(rows, ignore_index=True)
# Vectorised inference
probas = bundle.model.predict_proba(stacked)[:, 1]
# Vectorised SHAP — TreeExplainer accepts 2-D arrays
shap_vals_all = bundle.explainer.shap_values(stacked)
if isinstance(shap_vals_all, list):
shap_matrix = shap_vals_all[1] # class-1 values
else:
shap_matrix = shap_vals_all
now_iso = datetime.now(timezone.utc).isoformat()
outputs = []
log_entries = []
for i, (promotion, feature_row, proba) in enumerate(
zip(batch.promotions, rows, probas)
):
proba = float(proba)
label = "SUCCESS" if proba >= 0.5 else "FAILURE"
buyer_role = _resolve_buyer_role(promotion)
qty = promotion.Negoptim__SellOutDiscountQtyTarget__c or promotion.Negoptim__QuantityTarget__c or 0
cost = _resolve_cost_basis(promotion)
net = promotion.pg_net_price or promotion.Negoptim__NetPricePromo__c or 0
if cost["source"] == "unknown":
expected_profit = None
elif cost["source"] == "direct_cogs":
expected_profit = round(proba * (net - cost["cogs"]) * qty, 2)
else:
expected_profit = round(proba * cost["margin"] * qty, 2)
# Build FeatureImpact from pre-computed SHAP row
shap_row = shap_matrix[i]
top_n = 5
indices = np.argsort(np.abs(shap_row))[::-1][:top_n]
top_factors = [
FeatureImpact(
feature=bundle.features[j],
impact=round(float(shap_row[j]), 4),
direction="positive" if shap_row[j] > 0 else "negative",
)
for j in indices
]
roi = compute_roi_prediction(promotion, feature_row, bundle=bundle, acceptance_proba=proba)
recommendations = generate_recommendations(promotion, proba, top_factors)
confidence, risk_level = compute_confidence_and_risk(proba)
benchmark = compute_benchmark(proba)
analogues = _find_analogues(feature_row, k=3)
outputs.append(PredictionOutput(
success_probability=round(proba, 4),
prediction=label,
expected_profit=expected_profit,
buyer_role=buyer_role,
roi=roi,
top_factors=top_factors,
recommendations=recommendations,
confidence=confidence,
risk_level=risk_level,
benchmark=benchmark,
analogues=analogues,
))
if promotion.record_id:
log_entries.append(dict(
record_id=promotion.record_id,
predicted_at=now_iso,
predicted_probability=round(proba, 4),
predicted_label=label,
predicted_roi_pct=round(roi.roi_pct, 2) if roi.roi_pct is not None else None,
))
# Bulk-log all predictions in one transaction
if log_entries:
try:
from signals_db import DB_PATH as _DB_PATH, get_connection
from contextlib import closing
with closing(get_connection(_DB_PATH)) as conn, conn:
conn.executemany(
"""INSERT INTO prediction_log
(record_id, predicted_at, predicted_probability, predicted_label, predicted_roi_pct)
VALUES (:record_id, :predicted_at, :predicted_probability,
:predicted_label, :predicted_roi_pct)""",
log_entries,
)
except Exception:
pass
return outputs
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=422, detail=f"Batch prediction failed: {e}")
# ---------------------------------------------------------------------------
# Feedback loop endpoints
# ---------------------------------------------------------------------------
class OutcomeInput(BaseModel):
record_id: str
actual_status: str
class BatchOutcomeInput(BaseModel):
outcomes: list[OutcomeInput] = Field(..., description="List of outcomes to record (max 200)", max_length=200)
@app.post("/outcome")
def record_outcome(outcome: OutcomeInput):
"""Record the actual outcome for a previously predicted promotion."""
try:
from signals_db import record_outcome as _record, DB_PATH as _DB_PATH
updated = _record(
record_id=outcome.record_id,
actual_outcome=outcome.actual_status,
outcome_at=datetime.now(timezone.utc).isoformat(),
db_path=_DB_PATH,
)
return {"status": "ok", "rows_updated": updated}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Outcome recording failed: {e}")
@app.post("/outcome/batch")
def record_outcome_batch(batch: BatchOutcomeInput):
"""Record outcomes for multiple promotions in a single database transaction.
Called by the Salesforce PDSDOutcomeTrigger when multiple lines reach a
terminal status in one DML operation — avoids one @future per record.
"""
try:
from signals_db import record_outcomes_many, DB_PATH as _DB_PATH
now_iso = datetime.now(timezone.utc).isoformat()
outcomes = [
{"record_id": o.record_id, "actual_outcome": o.actual_status, "outcome_at": now_iso}
for o in batch.outcomes
]
total_updated = record_outcomes_many(outcomes, db_path=_DB_PATH)
return {"status": "ok", "rows_updated": total_updated, "submitted": len(outcomes)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Batch outcome recording failed: {e}")
@app.post("/analogues")
def get_analogues(promotion: PromotionInput, k: int = 5):
"""Return k historical promotions most similar to this one (k-NN cosine similarity)."""
try:
bundle = _bundle
if bundle is None:
raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.")
feature_row = preprocess_input(promotion, bundle=bundle)
results = _find_analogues(feature_row, k=min(k, 20))
return {
"analogues": [a.model_dump() for a in results],
"index_size": len(_analogues_meta) if _analogues_meta is not None else 0,
"features_used": len(_ANALOGUE_FEATURES),
}
except Exception as e:
raise HTTPException(status_code=422, detail=f"Analogues retrieval failed: {e}")
@app.post("/admin/retrain")
def trigger_retrain(_: None = Depends(_require_admin_key)):
"""Manually trigger a background retraining cycle."""
if _retrain_in_progress:
return {"status": "already_running", "message": "Retraining already in progress"}
_trigger_retrain()
return {"status": "started", "message": "Retraining started in background"}
@app.get("/admin/feedback-stats")
def feedback_stats(_: None = Depends(_require_admin_key)):
"""Return prediction log statistics for monitoring."""
try:
from signals_db import get_prediction_log_df, DB_PATH as _DB_PATH
df = get_prediction_log_df(_DB_PATH)
if df.empty:
return {"total_predictions": 0, "resolved": 0, "pending": 0}
resolved = df["actual_outcome"].notna().sum()
return {
"total_predictions": len(df),
"resolved": int(resolved),
"pending": int(len(df) - resolved),
"retrain_in_progress": _retrain_in_progress,
"unknown_category_hits": _unknown_category_count,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))