loc-service / main.py
uncertainrods's picture
deploy: initial push with LFS-tracked models
e1283b0
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, List
import joblib
import os
import pandas as pd
import numpy as np
import shap
app = FastAPI(title="Agentic ML Microservice", version="2.0.0")
# --- Globals for Model Loading ---
prop_model = None
firmo_model = None
chan_model = None
shap_explainer = None
df_db = None
# --- Constants ---
CHANNELS = ["LinkedIn", "Email", "Direct Call", "WhatsApp"]
SEGMENT_MAP = {0: "Startup", 1: "Mid-Market", 2: "Enterprise"}
TONE_MAP = {
"Enterprise": "Formal and value-driven",
"Startup": "Casual and high-energy",
"Mid-Market": "Professional and balanced",
}
CONSTRAINT_MAP = {
"Job_Promotion_Flag": "Congratulate them on their new role priorities.",
"Hiring_Increase_Flag": "Mention their recent hiring spree.",
"Revenue_Growth_Score": "Acknowledge their strong financial trajectory.",
"Clay_Intent_Signal": "Reference their third-party intent signals.",
"Apollo_Engagement_Score": "Highlight their active engagement score.",
"Composite_Growth_Signal": "Highlight their explosive scaling metrics (hiring + revenue).",
}
FEATURE_PLAIN_NAMES = {
"Job_Promotion_Flag": "Recent Job Promotion",
"Hiring_Increase_Flag": "Active Hiring",
"Revenue_Growth_Score": "Revenue Growth",
"Clay_Intent_Signal": "Third-Party Intent (Clay)",
"Apollo_Engagement_Score": "Engagement Score (Apollo)",
"Composite_Growth_Signal": "Growth Momentum (Hiring × Revenue)",
}
# --- Pydantic Schemas ---
class BuyerRequest(BaseModel):
buyer_id: str
@app.on_event("startup")
def load_models():
"""Loads models and the local dataset into memory when the FastAPI server initializes."""
global prop_model, firmo_model, chan_model, shap_explainer, df_db
# In HF Spaces Docker, models are at /app/models/ and data at /app/data/
models_dir = os.path.join(os.path.dirname(__file__), 'models')
data_path = os.path.join(os.path.dirname(__file__), 'data', 'Cleaned_OutreachAI_Dataset.csv')
try:
prop_model = joblib.load(os.path.join(models_dir, 'propensity_xgb.joblib'))
firmo_model = joblib.load(os.path.join(models_dir, 'firmographics_kmeans.joblib'))
chan_model = joblib.load(os.path.join(models_dir, 'channel_rf.joblib'))
shap_explainer = shap.TreeExplainer(prop_model)
# Load the CSV purely as an in-memory lookup table
df_db = pd.read_csv(data_path)
print("Successfully loaded stateful models and CSV Data into memory.")
except Exception as e:
print(f"Warning: Failed to load models or data. Ensure train_models.py has been run. Error: {e}")
@app.get("/health")
def health_check():
if prop_model is None or df_db is None:
raise HTTPException(status_code=503, detail="Models or Database not loaded")
return {"status": "ok", "message": "API and Models are fully operational."}
# ================================================================
# EXPLAINABILITY ENGINE
# ================================================================
def build_propensity_explainability(prop_arr: pd.DataFrame, raw_prob: float, lead_score: int) -> dict:
"""
Full SHAP waterfall: per-feature contributions with direction, magnitude,
impact percentages, and an auto-generated narrative.
"""
# Get signed SHAP values (not absolute) for direction detection
raw_shap = shap_explainer.shap_values(prop_arr)[0]
abs_shap = np.abs(raw_shap)
total_impact = float(abs_shap.sum()) if abs_shap.sum() > 0 else 1.0
# Build sorted feature contributions
feature_contributions = []
for idx in np.argsort(abs_shap)[::-1]: # Descending by magnitude
fname = prop_arr.columns[idx]
sval = float(raw_shap[idx])
feature_contributions.append({
"feature": fname,
"display_name": FEATURE_PLAIN_NAMES.get(fname, fname),
"feature_value": float(prop_arr.iloc[0, idx]),
"shap_value": round(sval, 4),
"direction": "positive" if sval >= 0 else "negative",
"impact_pct": round((abs_shap[idx] / total_impact) * 100, 1),
})
# Base value from the SHAP explainer (expected model output)
base_value = float(shap_explainer.expected_value) if hasattr(shap_explainer.expected_value, '__float__') else float(shap_explainer.expected_value[1]) if hasattr(shap_explainer.expected_value, '__len__') else 0.5
# Auto-generate narrative from top 2 drivers
top1 = feature_contributions[0]
top2 = feature_contributions[1] if len(feature_contributions) > 1 else None
narrative = f"Lead score of {lead_score} driven primarily by {top1['display_name']} ({top1['impact_pct']}% impact, {top1['direction']})."
if top2 and top2['impact_pct'] > 10:
narrative += f" {top2['display_name']} reinforces this signal ({top2['impact_pct']}% impact)."
# XGBoost feature importances (model-level, not instance-level)
model_importances = dict(zip(prop_arr.columns, prop_model.feature_importances_))
total_imp = sum(model_importances.values()) or 1.0
model_weight_pct = {k: round((v / total_imp) * 100, 1) for k, v in sorted(model_importances.items(), key=lambda x: -x[1])}
return {
"raw_probability": round(raw_prob, 4),
"scaled_lead_score": lead_score,
"scaling_formula": "75 + (raw_probability × 23)",
"qualification_threshold": 80,
"base_value": round(base_value, 4),
"feature_contributions": feature_contributions,
"model_level_feature_weights": model_weight_pct,
"narrative": narrative,
}
def build_segmentation_explainability(firmo_arr: pd.DataFrame, cluster_idx: int, segmentation_class: str) -> dict:
"""
KMeans reasoning: centroid distances, cluster boundaries, and confidence.
"""
centroids = firmo_model.cluster_centers_
input_point = firmo_arr.values[0]
# Euclidean distance to each centroid
distances = {}
for cid, centroid in enumerate(centroids):
label = SEGMENT_MAP.get(cid, f"Cluster_{cid}")
dist = float(np.linalg.norm(input_point - centroid))
distances[label] = round(dist, 1)
# Confidence: inverse-distance ratio (closer = more confident)
all_dists = list(distances.values())
min_dist = min(all_dists) if min(all_dists) > 0 else 0.001
total_inv = sum(1.0 / d if d > 0 else 1000.0 for d in all_dists)
confidence = round((1.0 / min_dist) / total_inv, 3) if total_inv > 0 else 0.5
# Centroid values for context
centroid_profiles = {}
for cid, centroid in enumerate(centroids):
label = SEGMENT_MAP.get(cid, f"Cluster_{cid}")
centroid_profiles[label] = {
"avg_revenue_usd": round(float(centroid[0]), 0),
"avg_headcount": round(float(centroid[1]), 0),
}
revenue = float(firmo_arr.iloc[0]['Revenue_Size_USD'])
headcount = float(firmo_arr.iloc[0]['Headcount_Size'])
narrative = (
f"Classified as {segmentation_class} (closest centroid, distance: {distances[segmentation_class]:,.0f}). "
f"Revenue of ${revenue:,.0f} and headcount of {headcount:,.0f} place this company in the {segmentation_class} tier."
)
# Find nearest alternative
sorted_dists = sorted(distances.items(), key=lambda x: x[1])
if len(sorted_dists) > 1:
alt_label, alt_dist = sorted_dists[1]
boundary_gap = round(alt_dist - sorted_dists[0][1], 1)
narrative += f" Next closest tier: {alt_label} (gap: {boundary_gap:,.0f} units)."
return {
"assigned_cluster": segmentation_class,
"revenue_usd": revenue,
"headcount": headcount,
"centroid_distances": distances,
"centroid_profiles": centroid_profiles,
"cluster_confidence": confidence,
"narrative": narrative,
}
def build_channel_explainability(chan_arr: pd.DataFrame, channel_probs: np.ndarray, recommended_channel: str) -> dict:
"""
Channel probability spread + Random Forest feature importances.
"""
# Full probability breakdown
prob_spread = {}
for i, ch in enumerate(CHANNELS):
if i < len(channel_probs):
prob_spread[ch] = round(float(channel_probs[i]), 3)
# RF feature importances for channel model
rf_importances = dict(zip(chan_arr.columns, chan_model.feature_importances_))
sorted_imp = sorted(rf_importances.items(), key=lambda x: -x[1])
top_drivers = [
{"feature": feat, "importance": round(float(imp), 3)}
for feat, imp in sorted_imp[:5]
]
# Build the narrative
top_feat = sorted_imp[0][0].replace("_", " ") if sorted_imp else "unknown"
rec_prob = prob_spread.get(recommended_channel, 0)
# Find runner-up channel
sorted_channels = sorted(prob_spread.items(), key=lambda x: -x[1])
runner_up = sorted_channels[1] if len(sorted_channels) > 1 else ("N/A", 0)
narrative = (
f"{recommended_channel} recommended with {rec_prob*100:.0f}% probability. "
f"{top_feat} is the strongest channel signal. "
f"Runner-up: {runner_up[0]} ({runner_up[1]*100:.0f}%)."
)
# Channel feature value snapshot (what the model actually saw)
feature_snapshot = {}
for col in chan_arr.columns:
feature_snapshot[col] = float(chan_arr.iloc[0][col])
return {
"recommended": recommended_channel,
"probability_spread": prob_spread,
"top_channel_drivers": top_drivers,
"feature_values_used": feature_snapshot,
"narrative": narrative,
}
def build_composite_signals(
profile, composite_growth: float, lead_score: int,
model_conf: float, chan_arr: pd.DataFrame
) -> dict:
"""
High-level composite signals aggregating across all models.
"""
# Growth Momentum: based on Composite_Growth_Signal thresholds
if composite_growth >= 0.7:
growth_momentum = "HIGH"
elif composite_growth >= 0.3:
growth_momentum = "MODERATE"
else:
growth_momentum = "LOW"
# Engagement Intensity: average of channel behavior signals
engagement_cols = ['LinkedIn_Active', 'LinkedIn_Post_Engagement', 'Cold_Call_Response']
engagement_vals = [float(chan_arr.iloc[0].get(c, 0)) for c in engagement_cols if c in chan_arr.columns]
engagement_avg = np.mean(engagement_vals) if engagement_vals else 0
if engagement_avg >= 0.6:
engagement_intensity = "HIGH"
elif engagement_avg >= 0.3:
engagement_intensity = "MODERATE"
else:
engagement_intensity = "LOW"
# Data Quality Score: ratio of non-zero/non-null fields across key columns
key_fields = [
'job_promotion_flag', 'hiring_increase_flag', 'revenue_growth_score',
'clay_intent_signal', 'apollo_engagement_score', 'revenue_size_usd',
'headcount_size', 'linkedin_active', 'email_open_rate', 'cold_call_response'
]
filled = sum(1 for f in key_fields if pd.notna(profile.get(f)) and float(profile.get(f, 0)) != 0)
data_quality = round(filled / len(key_fields), 2)
# Qualification Margin
margin = lead_score - 80
if margin > 0:
margin_text = f"+{margin} points above threshold"
elif margin == 0:
margin_text = "Exactly at threshold"
else:
margin_text = f"{margin} points below threshold"
# Overall Outreach Readiness
readiness_score = 0
if growth_momentum == "HIGH": readiness_score += 3
elif growth_momentum == "MODERATE": readiness_score += 2
else: readiness_score += 1
if engagement_intensity == "HIGH": readiness_score += 3
elif engagement_intensity == "MODERATE": readiness_score += 2
else: readiness_score += 1
if lead_score >= 85: readiness_score += 3
elif lead_score >= 80: readiness_score += 2
else: readiness_score += 1
if model_conf >= 0.6: readiness_score += 2
else: readiness_score += 1
max_readiness = 11
readiness_pct = round((readiness_score / max_readiness) * 100)
return {
"growth_momentum": growth_momentum,
"composite_growth_value": round(composite_growth, 3),
"engagement_intensity": engagement_intensity,
"engagement_average": round(engagement_avg, 3),
"data_quality_score": data_quality,
"qualification_margin": margin_text,
"outreach_readiness_pct": readiness_pct,
}
# ================================================================
# EXPLAINABILITY UTILITY ENDPOINTS
# ================================================================
@app.get("/explain/params")
def explain_parameters() -> Dict[str, Any]:
"""
Returns a dictionary of all explainability parameters, features, and model outputs
used in the pipeline with human-readable definitions.
"""
return {
"propensity_features": {
"Job_Promotion_Flag": "Binary (0/1): Has a key decision-maker recently been promoted? Indicates openness to new tooling.",
"Hiring_Increase_Flag": "Binary (0/1): Is the company actively hiring? Strongest signal of capital deployment and growth.",
"Revenue_Growth_Score": "Float (0-1): How fast is revenue growing? Indicator of available budget.",
"Clay_Intent_Signal": "Float (0-1): Third-party buying intent from Clay. Indicates active market research.",
"Apollo_Engagement_Score": "Float (0-1): Engagement score from Apollo indicating general market activity.",
"Composite_Growth_Signal": "Engineered Value: Hiring_Increase_Flag × Revenue_Growth_Score. Amplifies profiles that are both hiring AND growing."
},
"propensity_outputs": {
"raw_probability": "Raw ML output (0.0 to 1.0) directly from the XGBoost model.",
"scaled_lead_score": "Score scaled to a 75-98 curve for natural human interpretation.",
"is_qualified": "Boolean threshold: true if scaled_lead_score >= 80.",
"shap_value": "The exact magnitude of impact a specific feature had on pulling the score up (positive) or down (negative)."
},
"firmographic_features": {
"Revenue_Size_USD": "Annual revenue in USD.",
"Headcount_Size": "Total number of employees."
},
"firmographic_outputs": {
"segmentation_class": "KMeans cluster mapped to 'Startup', 'Mid-Market', or 'Enterprise'.",
"centroid_distances": "Euclidean distance to the center of each of the 3 cluster tiers identifying how close they are to boundaries."
},
"channel_features": {
"historical_engagement": "Matrix across 10 datapoints including LinkedIn views, email open rates, and previous call responses."
},
"channel_outputs": {
"recommended_channel": "The specific communication medium with the highest predicted response probability.",
"probability_spread": "The exact baseline probability the model predicts across all 4 channels.",
"model_confidence": "The maximum probability value achieved across all available channels."
},
"composite_signals": {
"growth_momentum": "HIGH/MODERATE/LOW index derived from the multiplier of internal growth constraints.",
"engagement_intensity": "HIGH/MODERATE/LOW average computed across all historical interaction modes.",
"data_quality_score": "Float (0-1): Ratio representing how complete the lead's profile data was prior to inference."
}
}
@app.get("/explain/narrative/{buyer_id}")
def explain_narrative(buyer_id: str) -> Dict[str, Any]:
"""
Executes inference internally but ONLY returns the clean, human-readable narrative
strings explaining exactly why the outputs were generated.
"""
global prop_model, firmo_model, chan_model, df_db
if prop_model is None or df_db is None:
raise HTTPException(status_code=503, detail="Models or Database not loaded")
# DB Lookup
buyer = df_db[df_db['buyer_id'] == buyer_id]
if buyer.empty:
raise HTTPException(status_code=404, detail=f"Buyer ID '{buyer_id}' not found in database")
profile = buyer.iloc[0].fillna(0)
# 1. Propensity
composite_growth = float(profile.get('hiring_increase_flag', 0)) * float(profile.get('revenue_growth_score', 0))
prop_arr = pd.DataFrame([{
'Job_Promotion_Flag': float(profile.get('job_promotion_flag', 0)),
'Hiring_Increase_Flag': float(profile.get('hiring_increase_flag', 0)),
'Revenue_Growth_Score': float(profile.get('revenue_growth_score', 0)),
'Clay_Intent_Signal': float(profile.get('clay_intent_signal', 0)),
'Apollo_Engagement_Score': float(profile.get('apollo_engagement_score', 0)),
'Composite_Growth_Signal': composite_growth,
}])
raw_prob = float(prop_model.predict_proba(prop_arr)[0, 1])
lead_score = int(round(75 + (raw_prob * 23)))
prop_expl = build_propensity_explainability(prop_arr, raw_prob, lead_score)
# 2. Firmographics
firmo_arr = pd.DataFrame([{
'Revenue_Size_USD': float(profile.get('revenue_size_usd', 0)),
'Headcount_Size': float(profile.get('headcount_size', 0)),
}])
cluster_idx = int(firmo_model.predict(firmo_arr)[0])
segmentation_class = SEGMENT_MAP.get(cluster_idx, "Unknown")
firmo_expl = build_segmentation_explainability(firmo_arr, cluster_idx, segmentation_class)
# 3. Channel
chan_arr = pd.DataFrame([{
'LinkedIn_Active': float(profile.get('linkedin_active', 0)),
'LinkedIn_Post_Engagement': float(profile.get('linkedin_post_engagement', 0)),
'LinkedIn_Profile_Views': float(profile.get('linkedin_profile_views', 0)),
'Email_Verified': 1.0 if str(profile.get('email_verified', '0')).lower() in ['yes', '1', '1.0'] else 0.0,
'Email_Open_Rate': float(profile.get('email_open_rate', 0)),
'Email_Reply_History': 1.0 if str(profile.get('email_reply_history', '0')).lower() in ['past', '1', '1.0'] else 0.0,
'Cold_Call_Response': float(profile.get('cold_call_response', 0)),
'WhatsApp_Verified': float(profile.get('whatsapp_verified', 0)),
'SMS_Verified': float(profile.get('sms_verified', 0)),
'Previous_Channel_Response': 1.0 if pd.notna(profile.get('previous_channel_response')) else 0.0,
}])
channel_probs = chan_model.predict_proba(chan_arr)[0]
best_idx = int(np.argmax(channel_probs))
recommended_channel = CHANNELS[min(best_idx, 3)]
chan_expl = build_channel_explainability(chan_arr, channel_probs, recommended_channel)
# Return pure narratives + charting data
return {
"buyer_id": buyer_id,
"narratives": {
"propensity": prop_expl["narrative"],
"segmentation": firmo_expl["narrative"],
"channel": chan_expl["narrative"]
},
"chart_data": {
"propensity_waterfall_chart": {
"base_value": prop_expl["base_value"],
"features": [f["display_name"] for f in prop_expl["feature_contributions"]],
"shap_values": [f["shap_value"] for f in prop_expl["feature_contributions"]],
"actual_values": [f["feature_value"] for f in prop_expl["feature_contributions"]]
},
"firmographic_scatter_chart": {
"x_axis": "Revenue_Size_USD",
"y_axis": "Headcount_Size",
"company_position": {"x": firmo_expl["revenue_usd"], "y": firmo_expl["headcount"]},
"cluster_centroids": [
{
"cluster": k,
"x": v["avg_revenue_usd"],
"y": v["avg_headcount"],
"distance_from_company": firmo_expl["centroid_distances"].get(k)
}
for k, v in firmo_expl["centroid_profiles"].items()
]
},
"channel_radar_chart": {
"labels": list(chan_expl["probability_spread"].keys()),
"probabilities": list(chan_expl["probability_spread"].values())
}
}
}
# ================================================================
# MAIN INFERENCE ENDPOINT
# ================================================================
@app.post("/predict/profile")
def predict_profile(request: BuyerRequest) -> Dict[str, Any]:
"""
Retrieves lead data by buyer_id, executes ML inference across all 3 models,
and returns a comprehensive explainability report alongside the predictions.
"""
global prop_model, firmo_model, chan_model, shap_explainer, df_db
if prop_model is None or df_db is None:
raise HTTPException(status_code=503, detail="Models or Database not loaded")
# 0. Database Lookup
buyer = df_db[df_db['buyer_id'] == request.buyer_id]
if buyer.empty:
raise HTTPException(status_code=404, detail=f"Buyer ID '{request.buyer_id}' not found in database")
profile = buyer.iloc[0].fillna(0)
# ── 1. PROPENSITY ENGINE ──────────────────────────────────
composite_growth = float(profile.get('hiring_increase_flag', 0)) * float(profile.get('revenue_growth_score', 0))
prop_arr = pd.DataFrame([{
'Job_Promotion_Flag': float(profile.get('job_promotion_flag', 0)),
'Hiring_Increase_Flag': float(profile.get('hiring_increase_flag', 0)),
'Revenue_Growth_Score': float(profile.get('revenue_growth_score', 0)),
'Clay_Intent_Signal': float(profile.get('clay_intent_signal', 0)),
'Apollo_Engagement_Score': float(profile.get('apollo_engagement_score', 0)),
'Composite_Growth_Signal': composite_growth,
}])
raw_prob = float(prop_model.predict_proba(prop_arr)[0, 1])
lead_score = int(round(75 + (raw_prob * 23)))
is_qualified = lead_score >= 80
# SHAP primary driver (for backward compatibility)
shap_vals_abs = np.abs(shap_explainer.shap_values(prop_arr)[0])
top_idx = int(np.argsort(shap_vals_abs)[-1])
p_driver = str(prop_arr.columns[top_idx])
langgraph_constraint = CONSTRAINT_MAP.get(p_driver, f"Acknowledge their {p_driver}")
# ── 2. FIRMOGRAPHICS ENGINE ───────────────────────────────
firmo_arr = pd.DataFrame([{
'Revenue_Size_USD': float(profile.get('revenue_size_usd', 0)),
'Headcount_Size': float(profile.get('headcount_size', 0)),
}])
cluster_idx = int(firmo_model.predict(firmo_arr)[0])
segmentation_class = SEGMENT_MAP.get(cluster_idx, "Unknown")
tone = TONE_MAP.get(segmentation_class, "Professional and balanced")
# ── 3. CHANNEL ENGINE ─────────────────────────────────────
chan_arr = pd.DataFrame([{
'LinkedIn_Active': float(profile.get('linkedin_active', 0)),
'LinkedIn_Post_Engagement': float(profile.get('linkedin_post_engagement', 0)),
'LinkedIn_Profile_Views': float(profile.get('linkedin_profile_views', 0)),
'Email_Verified': 1.0 if str(profile.get('email_verified', '0')).lower() in ['yes', '1', '1.0'] else 0.0,
'Email_Open_Rate': float(profile.get('email_open_rate', 0)),
'Email_Reply_History': 1.0 if str(profile.get('email_reply_history', '0')).lower() in ['past', '1', '1.0'] else 0.0,
'Cold_Call_Response': float(profile.get('cold_call_response', 0)),
'WhatsApp_Verified': float(profile.get('whatsapp_verified', 0)),
'SMS_Verified': float(profile.get('sms_verified', 0)),
'Previous_Channel_Response': 1.0 if pd.notna(profile.get('previous_channel_response')) else 0.0,
}])
channel_probs = chan_model.predict_proba(chan_arr)[0]
best_idx = int(np.argmax(channel_probs))
recommended_channel = CHANNELS[min(best_idx, 3)]
model_conf = float(np.max(channel_probs))
decision_status = "ROUTE_TO_AGENT" if model_conf >= 0.60 else "ROUTE_TO_VERIFICATION"
# ── 4. BUILD EXPLAINABILITY ───────────────────────────────
explainability = {
"propensity_breakdown": build_propensity_explainability(prop_arr, raw_prob, lead_score),
"segmentation_reasoning": build_segmentation_explainability(firmo_arr, cluster_idx, segmentation_class),
"channel_reasoning": build_channel_explainability(chan_arr, channel_probs, recommended_channel),
"composite_signals": build_composite_signals(profile, composite_growth, lead_score, model_conf, chan_arr),
}
# ── 5. FINAL PAYLOAD ──────────────────────────────────────
return {
"buyer_id": request.buyer_id,
"ml_decision": {
"status": decision_status,
"is_qualified": is_qualified,
"lead_score": lead_score,
"segmentation_class": segmentation_class,
"recommended_channel": recommended_channel,
"recommended_tone": tone,
"model_confidence": round(model_conf, 3),
},
"shap_anchors": {
"primary_driver": p_driver,
"langgraph_constraint": langgraph_constraint,
},
"explainability": explainability,
"raw_context": {
"industry": str(profile.get('industry', 'Unknown')),
"country": str(profile.get('country', 'Unknown')),
"group_memberships": str(profile.get('group_memberships', 'None')),
},
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)