| """ |
| FastAPI decision engine for promotion success prediction. |
| Predicts, explains, benchmarks, and evaluates risk. Prototype-grade. |
| Compatible with Salesforce Einstein Studio BYOM and Agentforce. |
| """ |
|
|
| import os |
| import threading |
| from dataclasses import dataclass |
| from pathlib import Path |
| from contextlib import asynccontextmanager |
| from typing import Optional |
| from datetime import datetime, timezone |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| import shap |
| from fastapi import Depends, FastAPI, Header, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
|
|
| from data_preprocessing import ( |
| CATEGORICAL_FIELDS, |
| handle_missing_values, |
| engineer_features, |
| ) |
| from field_mapping import apply_field_mapping, PDSD_TO_LEGACY_CPD |
|
|
| |
| |
| |
|
|
| @dataclass(frozen=True) |
| class ModelBundle: |
| """Immutable snapshot of all model artifacts. |
| Published via a single pointer swap so readers never see a torn state |
| where _model is new but _features is still old (or vice versa). |
| """ |
| model: object |
| encoders: dict |
| features: list |
| explainer: object |
| roi_model: Optional[object] |
| roi_features: list |
| roi_log_transform: bool |
|
|
|
|
| _bundle: Optional[ModelBundle] = None |
| _benchmark_stats: dict = {} |
| _enrichment_context: dict = {} |
| _model_lock = threading.RLock() |
| _retrain_in_progress = False |
| _unknown_category_count: int = 0 |
| _analogues_matrix: Optional[np.ndarray] = None |
| _analogues_meta: Optional[pd.DataFrame] = None |
| _analogue_features_used: list = [] |
|
|
| |
| _ANALOGUE_FEATURES = [ |
| "Negoptim__SellOutDiscountPerc__c", |
| "Negoptim__SellOutDiscountUAmt__c", |
| "Negoptim__SellOutDiscountQtyTarget__c", |
| "Negoptim__SellOutSourceContribPerc__c", |
| "Negoptim__SellOutExecutionRateTarget__c", |
| "Negoptim__Perc__c", |
| "discount_depth_vs_gross", |
| "campaign_duration_days", |
| "sell_out_month", |
| "sell_out_quarter", |
| "supplier_acceptance_rate", |
| "mechanic_family_acceptance_rate", |
| "days_since_last_promo_for_supplier_pg", |
| ] |
|
|
| MODEL_PATH = os.environ.get("MODEL_PATH", str(Path(__file__).parent / "model.joblib")) |
| ROI_MODEL_PATH = os.environ.get("ROI_MODEL_PATH", str(Path(__file__).parent / "roi_model.joblib")) |
| SERVING_CONTEXT_PATH = os.environ.get("SERVING_CONTEXT_PATH", str(Path(__file__).parent / "serving_context.joblib")) |
| _ADMIN_KEY = os.environ.get("ADMIN_API_KEY") |
|
|
|
|
| def _load_serving_context() -> bool: |
| """ |
| Load pre-serialized serving context from disk (produced by train_model.py). |
| Returns True on success. This path is used on HF Spaces where no Salesforce |
| credentials are available at runtime. |
| """ |
| global _benchmark_stats, _enrichment_context, _analogues_matrix, _analogues_meta, _analogue_features_used |
| ctx_path = Path(SERVING_CONTEXT_PATH) |
| if not ctx_path.exists(): |
| return False |
| try: |
| ctx = joblib.load(ctx_path) |
| _benchmark_stats = ctx["benchmark_stats"] |
| _benchmark_stats.pop("degraded", None) |
| _enrichment_context = ctx["enrichment_context"] |
| _analogues_matrix = ctx["analogues_matrix"] |
| _analogues_meta = ctx["analogues_meta"] |
| _analogue_features_used = ctx.get("analogue_features", _ANALOGUE_FEATURES) |
| n_records = len(_analogues_meta) if _analogues_meta is not None else 0 |
| print(f"serving_context.joblib loaded: {_benchmark_stats['total_records']} records, " |
| f"{n_records} analogues rows") |
| return True |
| except Exception as e: |
| print(f"serving_context.joblib load failed: {e}") |
| return False |
|
|
|
|
| def _compute_benchmark_stats(): |
| """Pre-compute dataset benchmark statistics and enrichment context at startup. |
| Prefers serving_context.joblib (offline artifact); falls back to live Salesforce fetch.""" |
| global _benchmark_stats, _enrichment_context |
| if _load_serving_context(): |
| return |
| try: |
| from salesforce_client import connect_to_salesforce, fetch_promotion_data, SUCCESS_STATUSES |
| sf = connect_to_salesforce() |
| df = fetch_promotion_data(sf) |
| labels = df["Negoptim__Status__c"].apply(lambda s: 1 if s in SUCCESS_STATUSES else 0) |
| df["_label"] = labels |
| _benchmark_stats = { |
| "average_success_rate": round(float(labels.mean()), 4), |
| "total_records": len(df), |
| "success_count": int(labels.sum()), |
| "failure_count": int((1 - labels).sum()), |
| } |
| print(f"Benchmark stats loaded: {_benchmark_stats['total_records']} records") |
|
|
| _enrichment_context = _build_enrichment_context(df) |
| print(f"Enrichment context built for {len(_enrichment_context.get('pg_stats', {}))} product groups") |
|
|
| _build_analogues_index(df) |
|
|
| except Exception as e: |
| _benchmark_stats = { |
| "average_success_rate": 0.5, |
| "total_records": 0, |
| "success_count": 0, |
| "failure_count": 0, |
| "degraded": True, |
| } |
| _enrichment_context = {} |
| print(f"Benchmark stats unavailable (live fetch failed: {e}) — /health will report degraded=true") |
|
|
|
|
| def _build_enrichment_context(df: pd.DataFrame) -> dict: |
| """Build supplier-level statistics for runtime enrichment of new predictions.""" |
| context = {"pg_stats": {}, "monthly_demand": {}, "bu_id_map": {}} |
|
|
| |
| if "Negoptim__BusinessUnit__c" in df.columns: |
| bu_ids = df["Negoptim__BusinessUnit__c"].dropna().unique() |
| context["bu_id_map"] = {str(b): str(b) for b in bu_ids} |
|
|
| df = df.copy() |
| df["_date"] = pd.to_datetime(df.get("Negoptim__SellOutBDate__c"), errors="coerce") |
| disc_pct = pd.to_numeric(df.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) |
| df["_discount"] = (disc_pct / 100.0).clip(0, 1) |
|
|
| pg_col = "Negoptim__Supplier__c" |
| if pg_col in df.columns: |
| for pg, group in df.groupby(pg_col): |
| dates = group["_date"].dropna() |
| labels = group["_label"] |
| discounts = group["_discount"] |
| qtys = pd.to_numeric(group.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=group.index)), errors="coerce").fillna(0) |
|
|
| pg_data = { |
| "success_rate": round(float(labels.mean()), 4), |
| "count": len(group), |
| "avg_qty": round(float(qtys.mean()), 2), |
| "avg_discount": round(float(discounts.mean()), 4), |
| "recent_dates": sorted(dates.dt.strftime("%Y-%m-%d").tolist())[-20:], |
| "monthly_success": {}, |
| } |
|
|
| if not dates.empty: |
| group_m = group.copy() |
| group_m["_month"] = group_m["_date"].dt.month |
| for month, mgroup in group_m.groupby("_month"): |
| pg_data["monthly_success"][int(month)] = round(float(mgroup["_label"].mean()), 4) |
|
|
| if len(discounts) >= 5 and discounts.std() > 0.001: |
| pg_data["price_elasticity"] = round(float(np.corrcoef(discounts, labels)[0, 1]), 4) |
| else: |
| pg_data["price_elasticity"] = 0.0 |
|
|
| context["pg_stats"][str(pg)] = pg_data |
|
|
| dates_all = df["_date"].dropna() |
| if not dates_all.empty: |
| monthly = dates_all.dt.to_period("M").value_counts().sort_index() |
| avg_count = monthly.mean() |
| for period, count in monthly.items(): |
| key = f"{period.year}-{period.month:02d}" |
| context["monthly_demand"][key] = round(min((count / (avg_count + 0.01)) * 50, 100), 1) |
|
|
| context["achievement"] = _build_achievement_context(df) |
|
|
| return context |
|
|
|
|
| def _build_achievement_context(df: pd.DataFrame) -> dict: |
| """Precompute buyer BU × supplier achievement lookup from records with QuantityFact.""" |
| bu_col = "Negoptim__BusinessUnit__c" |
| supplier_col = "Negoptim__Supplier__c" |
|
|
| ctx = {"retailer_supplier": {}, "retailer": {}, "supplier": {}} |
|
|
| raw_fact = df.get("Negoptim__QuantityFact__c") |
| if raw_fact is None or pd.to_numeric(raw_fact, errors="coerce").isna().all(): |
| return ctx |
|
|
| qty_target = pd.to_numeric(df.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=df.index)), errors="coerce").clip(lower=1) |
| qty_fact = pd.to_numeric(raw_fact, errors="coerce") |
| df = df.copy() |
| df["_ach"] = (qty_fact / qty_target).clip(0, 5) |
|
|
| has_data = df[df["_ach"].notna()].copy() |
|
|
| for (r, s), grp in has_data.groupby([bu_col, supplier_col]): |
| vals = grp["_ach"].tolist() |
| ctx["retailer_supplier"][(str(r), str(s))] = { |
| "avg": round(float(np.mean(vals)), 4), |
| "last": round(float(vals[-1]), 4), |
| "n": len(vals), |
| "std": round(float(np.std(vals)), 4) if len(vals) > 1 else 0.0, |
| } |
|
|
| for r, grp in has_data.groupby(bu_col): |
| ctx["retailer"][str(r)] = round(float(grp["_ach"].mean()), 4) |
|
|
| for s, grp in has_data.groupby(supplier_col): |
| ctx["supplier"][str(s)] = round(float(grp["_ach"].mean()), 4) |
|
|
| return ctx |
|
|
|
|
| def _build_analogues_index(df: pd.DataFrame) -> None: |
| """Build L2-normalised feature matrix for k-NN historical analogue retrieval.""" |
| global _analogues_matrix, _analogues_meta, _analogue_features_used |
| from salesforce_client import SUCCESS_STATUSES |
| from data_preprocessing import handle_missing_values, engineer_features |
|
|
| try: |
| adf = df.copy() |
| adf = handle_missing_values(adf) |
| adf = engineer_features(adf) |
|
|
| present = [f for f in _ANALOGUE_FEATURES if f in adf.columns] |
| _analogue_features_used = present |
| mat = adf[present].fillna(0.0).values.astype(np.float64) |
|
|
| norms = np.linalg.norm(mat, axis=1, keepdims=True) |
| norms = np.where(norms < 1e-9, 1.0, norms) |
| mat_norm = mat / norms |
|
|
| sell_out_dates = pd.to_datetime(adf.get("Negoptim__SellOutBDate__c"), errors="coerce") |
| meta = pd.DataFrame({ |
| "status": adf["Negoptim__Status__c"].apply( |
| lambda s: "SUCCESS" if s in SUCCESS_STATUSES else "FAILURE"), |
| "retailer": adf.get("retailer_name", pd.Series(["UNKNOWN"] * len(adf))).fillna("UNKNOWN").astype(str), |
| "supplier": adf.get("Negoptim__Supplier__c", pd.Series(["Unknown"] * len(adf))).fillna("Unknown").astype(str), |
| "discount_pct": pd.to_numeric(adf.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=adf.index)), errors="coerce").fillna(0), |
| "qty_target": pd.to_numeric(adf.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=adf.index)), errors="coerce").fillna(0), |
| "campaign_duration_days": pd.to_numeric(adf.get("campaign_duration_days", pd.Series(0, index=adf.index)), errors="coerce").fillna(0), |
| "discount_depth_vs_gross":pd.to_numeric(adf.get("discount_depth_vs_gross", pd.Series(0, index=adf.index)), errors="coerce").fillna(0).round(4), |
| "supplier_acceptance_rate":pd.to_numeric(adf.get("supplier_acceptance_rate", pd.Series(0, index=adf.index)), errors="coerce").fillna(0).round(4), |
| "sell_out_date": sell_out_dates.dt.strftime("%Y-%m-%d").fillna(""), |
| "n_features": len(present), |
| }) |
|
|
| _analogues_matrix = mat_norm |
| _analogues_meta = meta.reset_index(drop=True) |
| print(f"Analogues index: {len(meta)} records × {len(present)} features") |
|
|
| except Exception as exc: |
| print(f"Analogues index build failed (non-fatal): {exc}") |
|
|
|
|
| def _hot_swap_models(): |
| """Build a new ModelBundle from disk and publish it via atomic pointer swap. |
| |
| The lock guards the *build* phase (disk I/O + SHAP init) so concurrent |
| retrain calls don't race each other. The actual pointer assignment is |
| a single Python bytecode STORE_GLOBAL — atomic in CPython — so readers |
| on the /predict path never need to acquire the lock. |
| """ |
| global _bundle |
| with _model_lock: |
| artifacts = joblib.load(MODEL_PATH) |
| model = artifacts["model"] |
| encoders = artifacts["encoders"] |
| features = artifacts["features"] |
| explainer = shap.TreeExplainer(model) |
|
|
| roi_model = None |
| roi_features: list = [] |
| roi_log_transform = False |
| if Path(ROI_MODEL_PATH).exists(): |
| roi_artifacts = joblib.load(ROI_MODEL_PATH) |
| roi_model = roi_artifacts.get("models") or roi_artifacts.get("model") |
| roi_features = roi_artifacts["features"] |
| roi_log_transform = roi_artifacts.get("log_transform", False) |
|
|
| _bundle = ModelBundle( |
| model=model, |
| encoders=encoders, |
| features=features, |
| explainer=explainer, |
| roi_model=roi_model, |
| roi_features=roi_features, |
| roi_log_transform=roi_log_transform, |
| ) |
| print(f"Hot-swap complete: {len(_bundle.features)} features loaded") |
|
|
|
|
| def _run_retrain(): |
| """Run full retraining pipeline in a background thread, then hot-swap.""" |
| global _retrain_in_progress |
| try: |
| print("Background retraining started...") |
| from train_model import train_and_evaluate |
| train_and_evaluate() |
| _hot_swap_models() |
| _compute_benchmark_stats() |
| print("Background retraining complete — models updated in memory") |
| except Exception as e: |
| print(f"Background retraining failed: {e}") |
| finally: |
| _retrain_in_progress = False |
|
|
|
|
| def _trigger_retrain(): |
| """Spawn a background thread to retrain without blocking the API.""" |
| global _retrain_in_progress |
| if _retrain_in_progress: |
| print("Retraining already in progress — skipping") |
| return |
| _retrain_in_progress = True |
| t = threading.Thread(target=_run_retrain, daemon=True) |
| t.start() |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Load model artifacts and publish as an atomic ModelBundle at startup.""" |
| global _bundle |
|
|
| artifacts = joblib.load(MODEL_PATH) |
| model = artifacts["model"] |
| encoders = artifacts["encoders"] |
| features = artifacts["features"] |
| explainer = shap.TreeExplainer(model) |
|
|
| roi_model = None |
| roi_features: list = [] |
| roi_log_transform = False |
| if Path(ROI_MODEL_PATH).exists(): |
| roi_artifacts = joblib.load(ROI_MODEL_PATH) |
| |
| roi_model = roi_artifacts.get("models") or roi_artifacts.get("model") |
| roi_features = roi_artifacts["features"] |
| roi_log_transform = roi_artifacts.get("log_transform", False) |
| print(f"ROI model loaded ({len(roi_features)} features, log_transform={roi_log_transform})") |
| else: |
| print("ROI model not found — ROI predictions will use formula only") |
|
|
| _bundle = ModelBundle( |
| model=model, |
| encoders=encoders, |
| features=features, |
| explainer=explainer, |
| roi_model=roi_model, |
| roi_features=roi_features, |
| roi_log_transform=roi_log_transform, |
| ) |
|
|
| try: |
| from signals_db import init_db, DB_PATH as _DB_PATH |
| init_db(_DB_PATH) |
| except Exception as e: |
| print(f"DB init warning: {e}") |
|
|
| _compute_benchmark_stats() |
|
|
| |
| |
| for legacy_field in PDSD_TO_LEGACY_CPD.values(): |
| if legacy_field not in _bundle.features: |
| print(f" INFO: legacy shim target '{legacy_field}' not in model features — " |
| f"safe to remove from PDSD_TO_LEGACY_CPD after confirming retrain.") |
|
|
| print(f"Model loaded from {MODEL_PATH} ({len(_bundle.features)} features)") |
|
|
| yield |
|
|
|
|
| app = FastAPI( |
| title="Negoptim Promotion Decision Engine", |
| description="Predicts, explains, benchmarks, and evaluates promotion risk. " |
| "Prototype-grade. Designed for Salesforce Einstein Studio BYOM.", |
| version="2.0.0", |
| lifespan=lifespan, |
| ) |
|
|
| _CORS_ORIGIN_REGEX = ( |
| os.environ.get("CORS_ORIGIN_REGEX") |
| or r"https://[^/]+\.(lightning\.force\.com|my\.salesforce\.com)$" |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origin_regex=_CORS_ORIGIN_REGEX, |
| allow_methods=["GET", "POST"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
| class PromotionInput(BaseModel): |
| """ |
| Input schema covering both PDSD fields (primary, per PROJECT_CONTEXT §5) and |
| legacy CPD fields (retained for backward compat until model is retrained on PDSD). |
| buyer_role: 'sender' (buyer sending PDS to supplier) or 'receiver' (buyer reviewing |
| inbound supplier PDS). Defaults to 'receiver' — the primary use case per §1. |
| """ |
| record_id: Optional[str] = None |
| buyer_role: Optional[str] = "receiver" |
| pds_status: Optional[str] = None |
|
|
| |
| Negoptim__SellOutDiscountType__c: Optional[str] = None |
| Negoptim__SellOutDiscountPerc__c: Optional[float] = None |
| Negoptim__SellOutDiscountUAmt__c: Optional[float] = None |
| Negoptim__SellOutDiscountQtyTarget__c: Optional[float] = None |
| Negoptim__SellOutSourceContribPerc__c: Optional[float] = None |
| Negoptim__SellOutExecutionRateTarget__c:Optional[float] = None |
| Negoptim__SellOutManagementFix__c: Optional[float] = None |
| Negoptim__SellOutManagementUnit__c: Optional[float] = None |
| Negoptim__SellOutBDate__c: Optional[str] = None |
| Negoptim__SellOutEDate__c: Optional[str] = None |
|
|
| |
| Negoptim__Perc__c: Optional[float] = None |
| Negoptim__UAmt__c: Optional[float] = None |
|
|
| |
| Negoptim__Supplier__c: Optional[str] = None |
| Negoptim__NegoScope__c: Optional[str] = None |
| Negoptim__BusinessUnit__c: Optional[str] = None |
| Negoptim__Product__c: Optional[str] = None |
| pds_promo_agreement_type: Optional[str] = None |
| pds_is_sell_in: Optional[bool] = None |
| pds_is_sell_out: Optional[bool] = None |
| pds_is_services: Optional[bool] = None |
|
|
| |
| supplier_acceptance_rate: Optional[float] = None |
| supplier_rejection_rate: Optional[float] = None |
| negoscope_acceptance_rate: Optional[float] = None |
| mechanic_family_acceptance_rate: Optional[float] = None |
| supplier_pg_prior_count: Optional[float] = None |
| days_since_last_promo_for_supplier_pg: Optional[float] = None |
|
|
| |
| pg_gross_price: Optional[float] = None |
| pg_net_price: Optional[float] = None |
|
|
| |
| Negoptim__NetPricePromo__c: Optional[float] = None |
| Negoptim__NetPriceRegular__c: Optional[float] = None |
| Negoptim__TariffCostPromo__c: Optional[float] = None |
| Negoptim__COGS__c: Optional[float] = None |
| Negoptim__RetailPriceRegular__c: Optional[float] = None |
| Negoptim__SellOutDiscountValue__c: Optional[float] = None |
| Negoptim__QuantityTarget__c: Optional[float] = None |
| Negoptim__QuantityFact__c: Optional[float] = None |
| Negoptim__OpType__c: Optional[str] = None |
| Negoptim__TradeOpType__c: Optional[str] = None |
| Negoptim__BudgetType__c: Optional[str] = None |
| Negoptim__SellOutOfferType__c: Optional[str] = None |
| Negoptim__LineStatus__c: Optional[str] = None |
| Negoptim__SellInDiscountUType__c: Optional[str] = None |
| Negoptim__BusinessUnitTarget__c: Optional[str] = None |
| Negoptim__PG__c: Optional[str] = None |
| Negoptim__IsFree__c: Optional[bool] = None |
| Negoptim__IsFromAgreementLinePrenego__c: Optional[bool] = None |
| Negoptim__IsIncludetoNetPriceStmt__c: Optional[bool] = None |
| Negoptim__SellInFact__c: Optional[float] = None |
| Negoptim__SellInTarget__c: Optional[float] = None |
| Negoptim__SellOutDiscMassValueContribTarget__c: Optional[float] = None |
| Negoptim__SellInDiscountMassValueTarget__c: Optional[float] = None |
| Negoptim__SalesExecutionRateTarget__c: Optional[float] = None |
| Negoptim__TariffCostRegular__c: Optional[float] = None |
| Negoptim__SellInBDateMonth__c: Optional[float] = None |
| Negoptim__SellInBDateWeek__c: Optional[float] = None |
| Negoptim__SellOutDiscountUnitValue__c: Optional[float] = None |
| Negoptim__NetPriceWoTaxPromo__c: Optional[float] = None |
|
|
|
|
| class FeatureImpact(BaseModel): |
| feature: str |
| impact: float = Field(description="SHAP value - positive pushes toward SUCCESS") |
| direction: str = Field(description="'positive' or 'negative' for success") |
|
|
|
|
| class Recommendation(BaseModel): |
| message: str |
| priority: str = Field(description="'high', 'medium', or 'low'") |
|
|
|
|
| class Benchmark(BaseModel): |
| average_success_rate: float |
| percentile: float = Field(description="Where this prediction ranks vs dataset (0-100)") |
| total_records: int |
| vs_average: str = Field(description="'above', 'below', or 'at' average") |
|
|
|
|
| class ROIPrediction(BaseModel): |
| |
| roi_pct: Optional[float] = Field(None, description="Return on trade investment (%) — p50. None when cost basis unavailable") |
| roi_label: str = Field(description="'Strong', 'Moderate', 'Weak', 'Negative', or 'Unknown'") |
| predicted_units: Optional[float] = Field(None, description="Predicted units sold (p50 estimate). None when cost basis unavailable") |
| predicted_profit: Optional[float] = Field(None, description="Predicted profit €, p50 — prototype estimate, not a forecast") |
| net_margin_per_unit: Optional[float] = Field(None, description="Profit per unit after trade funding (€)") |
| funding_per_unit: Optional[float] = Field(None, description="Trade funding cost per unit (€)") |
| break_even_roi: Optional[float] = Field(None, description="Minimum ROI% for this deal to be profitable") |
| cost_basis_source: str = Field(description="'direct_cogs', 'pg_margin_proxy', or 'unknown'") |
| |
| predicted_units_p10: Optional[float] = Field(None, description="Pessimistic units sold (p10)") |
| predicted_units_p50: Optional[float] = Field(None, description="Median units sold (p50)") |
| predicted_units_p90: Optional[float] = Field(None, description="Optimistic units sold (p90)") |
| predicted_profit_low: Optional[float] = Field(None, description="Lower-bound profit € (p10)") |
| predicted_profit_mid: Optional[float] = Field(None, description="Median profit € (p50)") |
| predicted_profit_high: Optional[float] = Field(None, description="Upper-bound profit € (p90)") |
| roi_pct_low: Optional[float] = Field(None, description="ROI % at p10 units") |
| roi_pct_mid: Optional[float] = Field(None, description="ROI % at p50 units") |
| roi_pct_high: Optional[float] = Field(None, description="ROI % at p90 units") |
|
|
|
|
| class HistoricalAnalogue(BaseModel): |
| rank: int |
| similarity: float = Field(description="Cosine similarity (0-1) to this promotion's feature vector") |
| status: str = Field(description="'SUCCESS' or 'FAILURE'") |
| supplier: str |
| discount_pct: float |
| qty_target: int |
| campaign_duration_days: int |
| discount_depth_vs_gross: float |
| supplier_acceptance_rate: float |
| sell_out_date: str |
|
|
|
|
| class BatchPredictionInput(BaseModel): |
| promotions: list[PromotionInput] = Field( |
| ..., description="List of PDSD promotion inputs (max 200)", |
| max_length=200, |
| ) |
|
|
|
|
| class PredictionOutput(BaseModel): |
| |
| success_probability: float |
| prediction: str |
| expected_profit: Optional[float] |
| |
| buyer_role: str = Field( |
| default="receiver", |
| description="'sender': buyer sent PDS, predicting supplier validation. " |
| "'receiver': buyer reviewing inbound PDS, predicting financial impact." |
| ) |
| |
| roi: ROIPrediction |
| |
| top_factors: list[FeatureImpact] |
| recommendations: list[Recommendation] |
| |
| confidence: float |
| risk_level: str |
| |
| benchmark: Benchmark |
| |
| analogues: list[HistoricalAnalogue] = Field( |
| default_factory=list, |
| description="k most similar historical promotions by feature cosine similarity" |
| ) |
|
|
|
|
| |
| |
| |
| def _apply_legacy_field_mapping(row: dict) -> dict: |
| """Delegate to the canonical mapping in field_mapping.py.""" |
| return apply_field_mapping(row) |
|
|
|
|
| def preprocess_input(data: PromotionInput, bundle: Optional[ModelBundle] = None) -> pd.DataFrame: |
| """Transform a single promotion input into a model-ready feature row. |
| |
| Accepts an explicit bundle snapshot so the caller can pin to one consistent |
| artifact set for the duration of a request (avoids reading _bundle twice). |
| Falls back to the current global _bundle when not supplied. |
| """ |
| global _unknown_category_count |
| if bundle is None: |
| bundle = _bundle |
|
|
| row = data.model_dump() |
| row = _apply_legacy_field_mapping(row) |
|
|
| df = pd.DataFrame([row]) |
| df = handle_missing_values(df) |
| df = engineer_features(df) |
|
|
| |
| enc = bundle.encoders.get("_ordinal") |
| if enc is not None: |
| cats = [c for c in CATEGORICAL_FIELDS if c in df.columns] |
| encoded = enc.transform(df[cats].astype(str)) |
| |
| _unknown_category_count += int((encoded == -1).sum()) |
| df[cats] = encoded |
| else: |
| |
| for col in CATEGORICAL_FIELDS: |
| if col in df.columns and col in bundle.encoders: |
| le = bundle.encoders[col] |
| val = str(df[col].iloc[0]) |
| df[col] = le.transform([val])[0] if val in le.classes_ else -1 |
|
|
| df = _apply_historical_aggregates(df, data) |
| df = _enrich_single_prediction(df, data) |
| df = _enrich_achievement(df, data, bundle) |
|
|
| for col in bundle.features: |
| if col not in df.columns: |
| df[col] = 0.0 |
| else: |
| if df[col].isna().all(): |
| df[col] = 0.0 |
| elif str(df[col].dtype) == "object": |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0) |
|
|
| return df[bundle.features] |
|
|
|
|
| def _apply_historical_aggregates(df: pd.DataFrame, data: PromotionInput) -> pd.DataFrame: |
| """ |
| Populate the six leakage-safe historical-aggregate features from serving_context. |
| Called after handle_missing_values (which set them to 0.0) and before feature |
| alignment, so the model receives values matching the training distribution. |
| |
| These features were previously always 0.0 at serving because Apex does not |
| compute them — fix defers the computation to the API using the pre-built |
| enrichment context serialised at training time (Fix 2). |
| """ |
| ctx = _enrichment_context |
| if not ctx: |
| return df |
|
|
| dataset_avg = ctx.get("dataset_avg_success", _benchmark_stats.get("average_success_rate", 0.5)) |
| supplier_id = str(data.Negoptim__Supplier__c) if data.Negoptim__Supplier__c else None |
| scope_id = str(data.Negoptim__NegoScope__c) if data.Negoptim__NegoScope__c else None |
| product_id = str(data.Negoptim__Product__c) if data.Negoptim__Product__c else None |
| disc_type = data.Negoptim__SellOutDiscountType__c or "" |
| family_code = disc_type[:2] if disc_type else "" |
|
|
| |
| pg_stats = ctx.get("pg_stats", {}) |
| sup_stats = pg_stats.get(supplier_id, {}) if supplier_id else {} |
| sup_rate = sup_stats.get("success_rate", dataset_avg) |
| df["supplier_acceptance_rate"] = sup_rate |
| df["supplier_rejection_rate"] = 1.0 - sup_rate |
|
|
| |
| scope_stats = ctx.get("negoscope_stats", {}).get(scope_id, {}) if scope_id else {} |
| df["negoscope_acceptance_rate"] = scope_stats.get("success_rate", dataset_avg) |
|
|
| |
| fam_stats = ctx.get("mechanic_family_stats", {}).get(family_code, {}) if family_code else {} |
| df["mechanic_family_acceptance_rate"] = fam_stats.get("success_rate", dataset_avg) |
|
|
| |
| if supplier_id and product_id: |
| sp_key = f"{supplier_id}|{product_id}" |
| sp_data = ctx.get("supplier_product_stats", {}).get(sp_key, {}) |
| df["supplier_pg_prior_count"] = float(sp_data.get("count", 0)) |
| last_iso = sp_data.get("last_date_iso") |
| if last_iso: |
| try: |
| last_date = pd.to_datetime(last_iso, utc=True) |
| today = pd.Timestamp.now(tz="UTC") |
| df["days_since_last_promo_for_supplier_pg"] = float((today - last_date).days) |
| except Exception: |
| df["days_since_last_promo_for_supplier_pg"] = 365.0 |
| else: |
| df["days_since_last_promo_for_supplier_pg"] = 365.0 |
| else: |
| df["supplier_pg_prior_count"] = 0.0 |
| df["days_since_last_promo_for_supplier_pg"] = 365.0 |
|
|
| return df |
|
|
|
|
| def _enrich_single_prediction(df: pd.DataFrame, data: PromotionInput) -> pd.DataFrame: |
| """Compute enrichment features for a single prediction using pre-built context.""" |
| |
| supplier_id = str(data.Negoptim__Supplier__c) if data.Negoptim__Supplier__c else None |
| pg_stats = _enrichment_context.get("pg_stats", {}).get(supplier_id, {}) if supplier_id else {} |
|
|
| pred_date = None |
| if data.Negoptim__SellOutBDate__c: |
| try: |
| pred_date = pd.to_datetime(data.Negoptim__SellOutBDate__c) |
| except Exception: |
| pass |
|
|
| |
| if pg_stats and pred_date: |
| recent_dates = [pd.to_datetime(d) for d in pg_stats.get("recent_dates", [])] |
| prior = [d for d in recent_dates if d < pred_date] |
| if prior: |
| days_diff = [(pred_date - d).days for d in prior] |
| df["promo_count_last_90d"] = sum(1 for d in days_diff if d <= 90) |
| df["promo_count_last_180d"] = sum(1 for d in days_diff if d <= 180) |
| df["days_since_last_promo"] = min(days_diff) |
| else: |
| df["promo_count_last_90d"] = 0.0 |
| df["promo_count_last_180d"] = 0.0 |
| df["days_since_last_promo"] = 365.0 |
| df["avg_discount_last_90d"] = pg_stats.get("avg_discount", 0.0) |
| else: |
| df["promo_count_last_90d"] = 0.0 |
| df["promo_count_last_180d"] = 0.0 |
| df["avg_discount_last_90d"] = 0.0 |
| df["days_since_last_promo"] = 365.0 |
|
|
| |
| if pg_stats: |
| df["category_success_rate"] = pg_stats.get("success_rate", 0.5) |
| df["category_avg_qty"] = pg_stats.get("avg_qty", 0.0) |
| total_records = _benchmark_stats.get("total_records", 0) |
| df["category_promo_intensity"] = pg_stats.get("count", 0) / max(total_records, 1) |
| else: |
| df["category_success_rate"] = 0.5 |
| df["category_avg_qty"] = 0.0 |
| df["category_promo_intensity"] = 0.0 |
|
|
| |
| df["price_elasticity"] = pg_stats.get("price_elasticity", 0.0) if pg_stats else 0.0 |
|
|
| |
| current_discount = (data.Negoptim__SellOutDiscountPerc__c or 0) / 100.0 |
| avg_discount = pg_stats.get("avg_discount", 0.0) if pg_stats else 0.0 |
| df["discount_vs_pg_avg"] = current_discount - avg_discount |
| df["optimal_discount_gap"] = 0.0 |
|
|
| return df |
|
|
|
|
| def _enrich_achievement(df: pd.DataFrame, data: PromotionInput, bundle: ModelBundle) -> pd.DataFrame: |
| """Inject buyer BU × supplier achievement stats from Salesforce history.""" |
| ach_ctx = _enrichment_context.get("achievement", {}) |
| if not ach_ctx: |
| return df |
|
|
| retailer = str(data.Negoptim__BusinessUnit__c or "") |
| supplier = str(data.Negoptim__Supplier__c or "") |
| rs_stats = ach_ctx.get("retailer_supplier", {}).get((retailer, supplier), {}) |
|
|
| ach_features = { |
| "retailer_supplier_avg_achievement": rs_stats.get("avg", np.nan), |
| "retailer_supplier_last_achievement": rs_stats.get("last", np.nan), |
| "retailer_supplier_n_promos": rs_stats.get("n", 0.0), |
| "retailer_supplier_achievement_std": rs_stats.get("std", np.nan), |
| "retailer_avg_achievement": ach_ctx.get("retailer", {}).get(retailer, np.nan), |
| "supplier_avg_achievement": ach_ctx.get("supplier", {}).get(supplier, np.nan), |
| } |
| for feat, val in ach_features.items(): |
| if feat in bundle.features or feat in bundle.roi_features: |
| df[feat] = 0.0 if (val is None or (isinstance(val, float) and np.isnan(val))) else val |
|
|
| return df |
|
|
|
|
| def _predict_proba(feature_row: pd.DataFrame, bundle: ModelBundle) -> float: |
| """Get success probability from a preprocessed feature row.""" |
| return float(bundle.model.predict_proba(feature_row)[:, 1][0]) |
|
|
|
|
| |
| |
| |
| def explain_prediction(feature_row: pd.DataFrame, bundle: ModelBundle, top_n: int = 5) -> list[FeatureImpact]: |
| """Return top SHAP feature impacts for a single prediction.""" |
| shap_values = bundle.explainer.shap_values(feature_row) |
| if isinstance(shap_values, list): |
| values = shap_values[1][0] |
| else: |
| values = shap_values[0] |
| indices = np.argsort(np.abs(values))[::-1][:top_n] |
| return [ |
| FeatureImpact( |
| feature=bundle.features[i], |
| impact=round(float(values[i]), 4), |
| direction="positive" if values[i] > 0 else "negative", |
| ) |
| for i in indices |
| ] |
|
|
|
|
| |
| |
| |
| def generate_recommendations( |
| data: PromotionInput, probability: float, factors: list[FeatureImpact], |
| ) -> list[Recommendation]: |
| """Explanation-grounded suggestions based on SHAP factors and PDSD field values.""" |
| recs = [] |
| factor_names = {f.feature for f in factors} |
|
|
| |
| disc_pct = data.Negoptim__SellOutDiscountPerc__c or data.Negoptim__SellOutDiscountValue__c or 0 |
| if disc_pct > 40: |
| recs.append(Recommendation( |
| message="Discount depth exceeds 40%. Very high discounts historically have " |
| "lower validation rates — consider whether a lighter mechanic achieves the same sell-out.", |
| priority="high", |
| )) |
|
|
| if "campaign_duration_days" in factor_names: |
| f = next(f for f in factors if f.feature == "campaign_duration_days") |
| if f.direction == "negative": |
| recs.append(Recommendation( |
| message="Campaign duration is negatively impacting validation probability. " |
| "Shorter, more focused promotions tend to score better.", |
| priority="medium", |
| )) |
|
|
| qty_feat = "Negoptim__SellOutDiscountQtyTarget__c" |
| if qty_feat not in factor_names: |
| qty_feat = "Negoptim__QuantityTarget__c" |
| if qty_feat in factor_names: |
| f = next(f for f in factors if f.feature == qty_feat) |
| if f.direction == "negative": |
| recs.append(Recommendation( |
| message="Volume target is flagged as a risk factor. " |
| "Consider aligning the quantity target more closely with historical averages for this mechanic.", |
| priority="medium", |
| )) |
|
|
| if "supplier_acceptance_rate" in factor_names: |
| f = next(f for f in factors if f.feature == "supplier_acceptance_rate") |
| if f.direction == "negative": |
| recs.append(Recommendation( |
| message="This supplier has a below-average historical validation rate with your organisation. " |
| "Scrutinise the terms carefully before responding.", |
| priority="high", |
| )) |
|
|
| if probability < 0.5: |
| recs.append(Recommendation( |
| message="Validation probability is below 50%. Review the top risk factors " |
| "and consider requesting revised terms from the supplier.", |
| priority="high", |
| )) |
| elif probability < 0.75: |
| recs.append(Recommendation( |
| message="Validation probability is moderate. Targeted adjustments to the " |
| "top negative drivers could significantly improve the outlook.", |
| priority="low", |
| )) |
|
|
| return recs |
|
|
|
|
| |
| |
| |
| def compute_confidence_and_risk(probability: float) -> tuple[float, str]: |
| """Confidence = distance from 0.5 (decision boundary), scaled to 0-1.""" |
| confidence = round(abs(probability - 0.5) * 2, 4) |
|
|
| if confidence >= 0.6: |
| risk_level = "LOW" |
| elif confidence >= 0.3: |
| risk_level = "MEDIUM" |
| else: |
| risk_level = "HIGH" |
|
|
| return confidence, risk_level |
|
|
|
|
| |
| |
| |
| def compute_benchmark(probability: float) -> Benchmark: |
| """Compare this prediction against the dataset average.""" |
| avg_rate = _benchmark_stats.get("average_success_rate", 0.5) |
| total = _benchmark_stats.get("total_records", 0) |
|
|
| if probability >= avg_rate: |
| percentile = 50 + (probability - avg_rate) / (1.0 - avg_rate + 0.001) * 50 |
| else: |
| percentile = (probability / (avg_rate + 0.001)) * 50 |
|
|
| percentile = round(min(max(percentile, 0), 100), 1) |
|
|
| if probability > avg_rate + 0.02: |
| vs_average = "above" |
| elif probability < avg_rate - 0.02: |
| vs_average = "below" |
| else: |
| vs_average = "at" |
|
|
| return Benchmark( |
| average_success_rate=avg_rate, |
| percentile=percentile, |
| total_records=total, |
| vs_average=vs_average, |
| ) |
|
|
|
|
| |
| |
| |
| def _resolve_cost_basis(data: PromotionInput) -> dict: |
| """ |
| Determine unit cost basis without fabrication (PROJECT_CONTEXT §2). |
| Returns dict with keys: source, cogs (float|None), margin (float|None). |
| source='direct_cogs' — Negoptim__COGS__c supplied by Apex |
| source='pg_margin_proxy' — gross minus net price used as margin proxy |
| source='unknown' — no cost data; ROI currency fields must be None |
| """ |
| if data.Negoptim__COGS__c is not None: |
| return {"source": "direct_cogs", "cogs": data.Negoptim__COGS__c, "margin": None} |
| if data.pg_gross_price is not None and data.pg_net_price is not None: |
| return {"source": "pg_margin_proxy", "cogs": None, |
| "margin": data.pg_gross_price - data.pg_net_price} |
| return {"source": "unknown", "cogs": None, "margin": None} |
|
|
|
|
| def compute_roi_prediction(data: PromotionInput, feature_row: pd.DataFrame, |
| bundle: Optional[ModelBundle] = None, |
| acceptance_proba: float = 1.0) -> ROIPrediction: |
| """ |
| Prototype ROI estimate — directional only, not a forecast. |
| Currency fields are None when no cost basis is available (PROJECT_CONTEXT §2). |
| """ |
| cost = _resolve_cost_basis(data) |
| net_price = data.pg_net_price or data.Negoptim__NetPricePromo__c or 0.0 |
| discount_pct = data.Negoptim__SellOutDiscountPerc__c or data.Negoptim__SellOutDiscountValue__c or 0.0 |
| qty_target = (data.Negoptim__SellOutDiscountQtyTarget__c |
| or data.Negoptim__QuantityTarget__c or 0.0) |
|
|
| funding_per_unit = net_price * (discount_pct / 100) |
|
|
| if cost["source"] == "unknown": |
| return ROIPrediction( |
| roi_pct=None, |
| roi_label="Unknown", |
| predicted_units=None, |
| predicted_profit=None, |
| net_margin_per_unit=None, |
| funding_per_unit=round(funding_per_unit, 4) if funding_per_unit else None, |
| break_even_roi=None, |
| cost_basis_source="unknown", |
| ) |
|
|
| if cost["source"] == "direct_cogs": |
| net_margin_per_unit = net_price - cost["cogs"] - funding_per_unit |
| else: |
| net_margin_per_unit = cost["margin"] - funding_per_unit |
|
|
| if funding_per_unit > 0.001: |
| roi_pct = (net_margin_per_unit / funding_per_unit) * 100 |
| else: |
| gross_margin = cost["cogs"] if cost["source"] == "direct_cogs" else cost["margin"] |
| roi_pct = (gross_margin / (net_price + 0.0001)) * 100 if gross_margin is not None else 0.0 |
|
|
| if roi_pct >= 100: |
| roi_label = "Strong" |
| elif roi_pct >= 50: |
| roi_label = "Moderate" |
| elif roi_pct >= 0: |
| roi_label = "Weak" |
| else: |
| roi_label = "Negative" |
|
|
| b = bundle if bundle is not None else _bundle |
| roi_m = b.roi_model if b is not None else None |
| roi_feats = b.roi_features if b is not None else [] |
| roi_log = b.roi_log_transform if b is not None else False |
|
|
| def _predict_ratio(model_or_dict, key: str) -> float: |
| """Run a single quantile model and return the clamped achievement ratio.""" |
| m = model_or_dict[key] if isinstance(model_or_dict, dict) else model_or_dict |
| roi_row = feature_row[[f for f in roi_feats if f in feature_row.columns]].copy() |
| for f in roi_feats: |
| if f not in roi_row.columns: |
| roi_row[f] = 0.0 |
| roi_row = roi_row[roi_feats].fillna(0.0) |
| raw = float(m.predict(roi_row)[0]) |
| ratio = float(np.expm1(raw)) if roi_log else raw |
| return max(0.0, min(ratio, 5.0)) |
|
|
| |
| p10_ratio = p50_ratio = p90_ratio = None |
| units_p10 = units_p50 = units_p90 = None |
| profit_low = profit_mid = profit_high = None |
| roi_pct_low = roi_pct_mid = roi_pct_high = None |
|
|
| if roi_m is not None and qty_target > 0: |
| try: |
| is_quantile = isinstance(roi_m, dict) and "p50" in roi_m |
| if is_quantile: |
| p10_ratio = _predict_ratio(roi_m, "p10") |
| p50_ratio = _predict_ratio(roi_m, "p50") |
| p90_ratio = _predict_ratio(roi_m, "p90") |
| else: |
| |
| p50_ratio = _predict_ratio(roi_m, "p50") if isinstance(roi_m, dict) else max(0.0, min(float(np.expm1(roi_m.predict(feature_row[[f for f in roi_feats if f in feature_row.columns]].reindex(columns=roi_feats, fill_value=0.0).fillna(0.0))[0])) if roi_log else float(roi_m.predict(feature_row[[f for f in roi_feats if f in feature_row.columns]].reindex(columns=roi_feats, fill_value=0.0).fillna(0.0))[0]), 5.0), 0.0) |
|
|
| def _make_units_profit(ratio): |
| if ratio is None: |
| return None, None, None |
| u = round(ratio * qty_target, 0) |
| p = round(u * net_margin_per_unit, 2) |
| r = round((net_margin_per_unit / funding_per_unit) * 100, 1) if funding_per_unit > 0.001 else roi_pct |
| return u, p, r |
|
|
| units_p10, profit_low, roi_pct_low = _make_units_profit(p10_ratio) |
| units_p50, profit_mid, roi_pct_mid = _make_units_profit(p50_ratio) |
| units_p90, profit_high, roi_pct_high = _make_units_profit(p90_ratio) |
|
|
| except Exception: |
| pass |
|
|
| |
| conditional_ratio = p50_ratio if p50_ratio is not None else 0.85 |
| predicted_units = units_p50 if units_p50 is not None else round(conditional_ratio * qty_target, 0) |
| predicted_profit = profit_mid if profit_mid is not None else round(predicted_units * net_margin_per_unit, 2) |
|
|
| return ROIPrediction( |
| roi_pct=round(roi_pct, 1), |
| roi_label=roi_label, |
| predicted_units=predicted_units, |
| predicted_profit=predicted_profit, |
| net_margin_per_unit=round(net_margin_per_unit, 4), |
| funding_per_unit=round(funding_per_unit, 4), |
| break_even_roi=0.0, |
| cost_basis_source=cost["source"], |
| |
| predicted_units_p10=units_p10, |
| predicted_units_p50=units_p50, |
| predicted_units_p90=units_p90, |
| predicted_profit_low=profit_low, |
| predicted_profit_mid=profit_mid, |
| predicted_profit_high=profit_high, |
| roi_pct_low=roi_pct_low, |
| roi_pct_mid=roi_pct_mid, |
| roi_pct_high=roi_pct_high, |
| ) |
|
|
|
|
| |
| |
| |
| def _find_analogues(feature_row: pd.DataFrame, k: int = 5) -> list[HistoricalAnalogue]: |
| """Return k historical promotions most similar to feature_row by cosine similarity.""" |
| if _analogues_matrix is None or _analogues_meta is None: |
| return [] |
|
|
| |
| feat_list = _analogue_features_used or _ANALOGUE_FEATURES |
| vec = np.zeros(len(feat_list), dtype=np.float64) |
| for i, feat in enumerate(feat_list): |
| if feat in feature_row.columns: |
| vec[i] = float(feature_row[feat].fillna(0.0).iloc[0]) |
|
|
| norm = np.linalg.norm(vec) |
| if norm < 1e-9: |
| return [] |
| vec_norm = vec / norm |
|
|
| |
| sims = np.clip(_analogues_matrix.dot(vec_norm), -1.0, 1.0) |
| top_idx = np.argsort(sims)[::-1][:k] |
|
|
| results = [] |
| for rank, idx in enumerate(top_idx): |
| m = _analogues_meta.iloc[idx] |
| results.append(HistoricalAnalogue( |
| rank=rank + 1, |
| similarity=round(float(sims[idx]), 3), |
| status=str(m["status"]), |
| supplier=str(m.get("supplier", "")), |
| discount_pct=round(float(m["discount_pct"]), 1), |
| qty_target=int(m["qty_target"]), |
| campaign_duration_days=int(m["campaign_duration_days"]), |
| discount_depth_vs_gross=round(float(m.get("discount_depth_vs_gross", 0)), 4), |
| supplier_acceptance_rate=round(float(m.get("supplier_acceptance_rate", 0)), 4), |
| sell_out_date=str(m["sell_out_date"]), |
| )) |
| return results |
|
|
|
|
| |
| |
| |
| def _require_admin_key(x_admin_key: Optional[str] = Header(None)): |
| if not _ADMIN_KEY: |
| raise HTTPException(status_code=503, detail="Admin endpoints disabled: ADMIN_API_KEY env var not set") |
| if x_admin_key != _ADMIN_KEY: |
| raise HTTPException(status_code=401, detail="Invalid admin key") |
|
|
|
|
| |
| |
| |
| @app.get("/health") |
| def health(): |
| """Health check for monitoring and BYOM connectivity tests.""" |
| b = _bundle |
| degraded = b is None or _benchmark_stats.get("degraded", False) |
| return { |
| "status": "ok", |
| "version": "2.0.0", |
| "features": len(b.features) if b else 0, |
| "degraded": degraded, |
| } |
|
|
|
|
| @app.get("/warmup") |
| def warmup_probe(): |
| """Dedicated warmup probe — hit this to wake the HF Space and verify model readiness. |
| Uptime-ping services (UptimeRobot, cron) should target this endpoint every 5 minutes |
| during business hours so the free-tier Space doesn't sleep mid-session.""" |
| b = _bundle |
| return { |
| "status": "ok", |
| "model_loaded": b is not None, |
| "features": len(b.features) if b else 0, |
| "ready_at": datetime.now(timezone.utc).isoformat(), |
| } |
|
|
|
|
| def _resolve_buyer_role(promotion: PromotionInput) -> str: |
| """ |
| Determine whether the buyer is acting as sender or receiver. |
| 'receiver' is the primary use case (buyer reviewing inbound supplier PDS). |
| 'sender' applies when buyer created the PDS themselves. |
| """ |
| if promotion.buyer_role in ("sender", "receiver"): |
| return promotion.buyer_role |
| |
| submitted = {"Submitted", "Soumis", "En cours de traitement", "Under Review"} |
| if promotion.pds_status and promotion.pds_status in submitted: |
| return "receiver" |
| return "receiver" |
|
|
|
|
| @app.post("/predict", response_model=PredictionOutput) |
| def predict(promotion: PromotionInput): |
| """Prediction with SHAP explanation, ROI estimate, benchmark, risk level, and buyer role.""" |
| try: |
| |
| |
| bundle = _bundle |
| if bundle is None: |
| raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.") |
|
|
| feature_row = preprocess_input(promotion, bundle=bundle) |
| proba = _predict_proba(feature_row, bundle=bundle) |
| label = "SUCCESS" if proba >= 0.5 else "FAILURE" |
| buyer_role = _resolve_buyer_role(promotion) |
|
|
| qty = ( |
| promotion.Negoptim__SellOutDiscountQtyTarget__c |
| or promotion.Negoptim__QuantityTarget__c |
| or 0 |
| ) |
| cost = _resolve_cost_basis(promotion) |
| net = promotion.pg_net_price or promotion.Negoptim__NetPricePromo__c or 0 |
| if cost["source"] == "unknown": |
| expected_profit = None |
| elif cost["source"] == "direct_cogs": |
| expected_profit = round(proba * (net - cost["cogs"]) * qty, 2) |
| else: |
| expected_profit = round(proba * cost["margin"] * qty, 2) |
|
|
| top_factors = explain_prediction(feature_row, bundle=bundle) |
| roi = compute_roi_prediction(promotion, feature_row, bundle=bundle, acceptance_proba=proba) |
| recommendations = generate_recommendations(promotion, proba, top_factors) |
| confidence, risk_level = compute_confidence_and_risk(proba) |
| benchmark = compute_benchmark(proba) |
| analogues = _find_analogues(feature_row, k=5) |
|
|
| output = PredictionOutput( |
| success_probability=round(proba, 4), |
| prediction=label, |
| expected_profit=expected_profit, |
| buyer_role=buyer_role, |
| roi=roi, |
| top_factors=top_factors, |
| recommendations=recommendations, |
| confidence=confidence, |
| risk_level=risk_level, |
| benchmark=benchmark, |
| analogues=analogues, |
| ) |
|
|
| if promotion.record_id: |
| try: |
| from signals_db import log_prediction, DB_PATH as _DB_PATH |
| log_prediction( |
| record_id=promotion.record_id, |
| predicted_at=datetime.now(timezone.utc).isoformat(), |
| predicted_probability=round(proba, 4), |
| predicted_label=label, |
| predicted_roi_pct=round(roi.roi_pct, 2) if roi else None, |
| db_path=_DB_PATH, |
| ) |
| except Exception: |
| pass |
|
|
| return output |
|
|
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| raise HTTPException(status_code=422, detail=f"Prediction failed: {e}") |
|
|
|
|
| @app.post("/predict/batch", response_model=list[PredictionOutput]) |
| def predict_batch(batch: BatchPredictionInput): |
| """Score a list of PDSD lines in one call (max 200). |
| |
| Vectorises model.predict_proba and SHAP over the stacked feature matrix |
| so the compute cost scales sub-linearly with batch size. |
| Designed for PromotionInsightsAction bulk Agentforce invocations. |
| """ |
| try: |
| bundle = _bundle |
| if bundle is None: |
| raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.") |
| if not batch.promotions: |
| return [] |
|
|
| |
| rows = [preprocess_input(p, bundle=bundle) for p in batch.promotions] |
| stacked = pd.concat(rows, ignore_index=True) |
|
|
| |
| probas = bundle.model.predict_proba(stacked)[:, 1] |
|
|
| |
| shap_vals_all = bundle.explainer.shap_values(stacked) |
| if isinstance(shap_vals_all, list): |
| shap_matrix = shap_vals_all[1] |
| else: |
| shap_matrix = shap_vals_all |
|
|
| now_iso = datetime.now(timezone.utc).isoformat() |
| outputs = [] |
| log_entries = [] |
|
|
| for i, (promotion, feature_row, proba) in enumerate( |
| zip(batch.promotions, rows, probas) |
| ): |
| proba = float(proba) |
| label = "SUCCESS" if proba >= 0.5 else "FAILURE" |
| buyer_role = _resolve_buyer_role(promotion) |
|
|
| qty = promotion.Negoptim__SellOutDiscountQtyTarget__c or promotion.Negoptim__QuantityTarget__c or 0 |
| cost = _resolve_cost_basis(promotion) |
| net = promotion.pg_net_price or promotion.Negoptim__NetPricePromo__c or 0 |
| if cost["source"] == "unknown": |
| expected_profit = None |
| elif cost["source"] == "direct_cogs": |
| expected_profit = round(proba * (net - cost["cogs"]) * qty, 2) |
| else: |
| expected_profit = round(proba * cost["margin"] * qty, 2) |
|
|
| |
| shap_row = shap_matrix[i] |
| top_n = 5 |
| indices = np.argsort(np.abs(shap_row))[::-1][:top_n] |
| top_factors = [ |
| FeatureImpact( |
| feature=bundle.features[j], |
| impact=round(float(shap_row[j]), 4), |
| direction="positive" if shap_row[j] > 0 else "negative", |
| ) |
| for j in indices |
| ] |
|
|
| roi = compute_roi_prediction(promotion, feature_row, bundle=bundle, acceptance_proba=proba) |
| recommendations = generate_recommendations(promotion, proba, top_factors) |
| confidence, risk_level = compute_confidence_and_risk(proba) |
| benchmark = compute_benchmark(proba) |
| analogues = _find_analogues(feature_row, k=3) |
|
|
| outputs.append(PredictionOutput( |
| success_probability=round(proba, 4), |
| prediction=label, |
| expected_profit=expected_profit, |
| buyer_role=buyer_role, |
| roi=roi, |
| top_factors=top_factors, |
| recommendations=recommendations, |
| confidence=confidence, |
| risk_level=risk_level, |
| benchmark=benchmark, |
| analogues=analogues, |
| )) |
|
|
| if promotion.record_id: |
| log_entries.append(dict( |
| record_id=promotion.record_id, |
| predicted_at=now_iso, |
| predicted_probability=round(proba, 4), |
| predicted_label=label, |
| predicted_roi_pct=round(roi.roi_pct, 2) if roi.roi_pct is not None else None, |
| )) |
|
|
| |
| if log_entries: |
| try: |
| from signals_db import DB_PATH as _DB_PATH, get_connection |
| from contextlib import closing |
| with closing(get_connection(_DB_PATH)) as conn, conn: |
| conn.executemany( |
| """INSERT INTO prediction_log |
| (record_id, predicted_at, predicted_probability, predicted_label, predicted_roi_pct) |
| VALUES (:record_id, :predicted_at, :predicted_probability, |
| :predicted_label, :predicted_roi_pct)""", |
| log_entries, |
| ) |
| except Exception: |
| pass |
|
|
| return outputs |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=422, detail=f"Batch prediction failed: {e}") |
|
|
|
|
| |
| |
| |
| class OutcomeInput(BaseModel): |
| record_id: str |
| actual_status: str |
|
|
|
|
| class BatchOutcomeInput(BaseModel): |
| outcomes: list[OutcomeInput] = Field(..., description="List of outcomes to record (max 200)", max_length=200) |
|
|
|
|
| @app.post("/outcome") |
| def record_outcome(outcome: OutcomeInput): |
| """Record the actual outcome for a previously predicted promotion.""" |
| try: |
| from signals_db import record_outcome as _record, DB_PATH as _DB_PATH |
| updated = _record( |
| record_id=outcome.record_id, |
| actual_outcome=outcome.actual_status, |
| outcome_at=datetime.now(timezone.utc).isoformat(), |
| db_path=_DB_PATH, |
| ) |
| return {"status": "ok", "rows_updated": updated} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Outcome recording failed: {e}") |
|
|
|
|
| @app.post("/outcome/batch") |
| def record_outcome_batch(batch: BatchOutcomeInput): |
| """Record outcomes for multiple promotions in a single database transaction. |
| |
| Called by the Salesforce PDSDOutcomeTrigger when multiple lines reach a |
| terminal status in one DML operation — avoids one @future per record. |
| """ |
| try: |
| from signals_db import record_outcomes_many, DB_PATH as _DB_PATH |
| now_iso = datetime.now(timezone.utc).isoformat() |
| outcomes = [ |
| {"record_id": o.record_id, "actual_outcome": o.actual_status, "outcome_at": now_iso} |
| for o in batch.outcomes |
| ] |
| total_updated = record_outcomes_many(outcomes, db_path=_DB_PATH) |
| return {"status": "ok", "rows_updated": total_updated, "submitted": len(outcomes)} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Batch outcome recording failed: {e}") |
|
|
|
|
| @app.post("/analogues") |
| def get_analogues(promotion: PromotionInput, k: int = 5): |
| """Return k historical promotions most similar to this one (k-NN cosine similarity).""" |
| try: |
| bundle = _bundle |
| if bundle is None: |
| raise HTTPException(status_code=503, detail="Model not yet loaded — please retry.") |
| feature_row = preprocess_input(promotion, bundle=bundle) |
| results = _find_analogues(feature_row, k=min(k, 20)) |
| return { |
| "analogues": [a.model_dump() for a in results], |
| "index_size": len(_analogues_meta) if _analogues_meta is not None else 0, |
| "features_used": len(_ANALOGUE_FEATURES), |
| } |
| except Exception as e: |
| raise HTTPException(status_code=422, detail=f"Analogues retrieval failed: {e}") |
|
|
|
|
| @app.post("/admin/retrain") |
| def trigger_retrain(_: None = Depends(_require_admin_key)): |
| """Manually trigger a background retraining cycle.""" |
| if _retrain_in_progress: |
| return {"status": "already_running", "message": "Retraining already in progress"} |
| _trigger_retrain() |
| return {"status": "started", "message": "Retraining started in background"} |
|
|
|
|
| @app.get("/admin/feedback-stats") |
| def feedback_stats(_: None = Depends(_require_admin_key)): |
| """Return prediction log statistics for monitoring.""" |
| try: |
| from signals_db import get_prediction_log_df, DB_PATH as _DB_PATH |
| df = get_prediction_log_df(_DB_PATH) |
| if df.empty: |
| return {"total_predictions": 0, "resolved": 0, "pending": 0} |
| resolved = df["actual_outcome"].notna().sum() |
| return { |
| "total_predictions": len(df), |
| "resolved": int(resolved), |
| "pending": int(len(df) - resolved), |
| "retrain_in_progress": _retrain_in_progress, |
| "unknown_category_hits": _unknown_category_count, |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|