Aoun-Ai / app /api /prediction.py
MuhammadMahmoud's picture
enhance rag
468ea61
"""
Prediction API Routes — Need level assessment and assistance type classification.
"""
from fastapi import APIRouter, Request
from app.schemas import NeedLevelRequest, NeedLevelResponse, AssistanceTypeResponse
from app.services.prediction.prediction_engine import (
preprocess_need_level_features,
validate_need_level_rules,
classify_assistance_by_rules,
get_score_from_level,
LABEL_FALLBACK_MAP,
)
import asyncio
import logging
import uuid
import time
from app.core.audit_logger import audit_logger
router = APIRouter()
logger = logging.getLogger(__name__)
@router.post("/need-level", response_model=NeedLevelResponse)
async def predict_need_level(request: Request, data: NeedLevelRequest):
"""
Evaluates the urgency and need score of an assistance request using a hybrid ML approach.
Expects structured financial and demographic data mapped to a standard NeedLevelRequest model.
Instantly applies deterministic guardrails before deferring to the XGBoost risk scoring engine.
Retruns the categorized need level, confidence percentiles, and optional SHAP feature explanations.
"""
prediction_id = str(uuid.uuid4())
start_time = time.perf_counter()
# 1. Rule-Based Guardrails (instant, no ML needed)
rule_result = validate_need_level_rules(data)
if rule_result:
duration_ms = (time.perf_counter() - start_time) * 1000
asyncio.create_task(asyncio.to_thread(
audit_logger.log_event, "prediction_need_level", "/api/ai/need-level",
data.model_dump(), {"need_level": rule_result, "confidence": 1.0}, "rule_based_guardrail", duration_ms
))
return NeedLevelResponse(
prediction_id=prediction_id,
need_level=rule_result,
confidence=1.0,
score=get_score_from_level(rule_result),
method="rule_based_guardrail",
)
# 2. ML Inference (async to avoid blocking)
if "need_level" not in request.app.state.models:
return NeedLevelResponse(
prediction_id=prediction_id,
need_level="Medium",
confidence=0.0,
score=50.0,
method="fallback_mock",
)
df = await asyncio.to_thread(preprocess_need_level_features, data)
model = request.app.state.models["need_level"]
predictions = await asyncio.to_thread(model.predict, df)
prediction_val = predictions[0]
# Confidence
confidence = 0.95
if hasattr(model, "predict_proba"):
try:
probas = await asyncio.to_thread(model.predict_proba, df)
confidence = float(max(probas[0]))
except Exception:
pass
# Map prediction to label (need_level is always assigned in both branches below)
if "need_level_encoder" in request.app.state.models:
encoder = request.app.state.models["need_level_encoder"]
try:
need_level = encoder.inverse_transform([int(prediction_val)])[0]
except Exception:
need_level = LABEL_FALLBACK_MAP.get(int(prediction_val), "Medium")
else:
need_level = LABEL_FALLBACK_MAP.get(int(prediction_val), "Medium")
# SHAP Explainability (non-blocking — always returns even on failure)
# Only pass the 7 numeric features the model was trained on
NUMERIC_FEATURES = [
"family_size", "income_monthly", "monthly_expenses", "debts",
"number_of_children", "age", "expense_to_income_ratio"
]
explanation_dict = None
try:
from app.services.prediction.prediction_engine import generate_shap_explanation
df_numeric = df[NUMERIC_FEATURES]
explanation_dict = await asyncio.to_thread(generate_shap_explanation, model, df_numeric, need_level)
except Exception as exp_err:
logger.warning("SHAP explanation failed (non-critical): %s", exp_err)
duration_ms = (time.perf_counter() - start_time) * 1000
asyncio.create_task(asyncio.to_thread(
audit_logger.log_event, "prediction_need_level", "/api/ai/need-level",
data.model_dump(), {"need_level": need_level, "confidence": confidence}, "ml_model_synthetic_v1", duration_ms
))
return NeedLevelResponse(
prediction_id=prediction_id,
need_level=need_level,
confidence=confidence,
score=get_score_from_level(need_level),
method="ml_model_synthetic_v1",
explanation=explanation_dict,
)
@router.post("/assistance-type", response_model=AssistanceTypeResponse)
async def classify_assistance(request: Request, data: NeedLevelRequest):
"""
Classifies the most appropriate category of demographic assistance necessary for an applicant.
Ingests core family metrics to evaluate rigid deterministic rules (e.g., medical extremity).
Falls back transparently to a dedicated classification model capturing advanced socioeconomic contexts.
Returns the recommended assistance string definition, boolean rule triggers, and analytical confidence.
"""
prediction_id = str(uuid.uuid4())
start_time = time.perf_counter()
# 1. Rule-Based (instant)
rule_label = classify_assistance_by_rules(data)
if rule_label != "General Support":
duration_ms = (time.perf_counter() - start_time) * 1000
asyncio.create_task(asyncio.to_thread(
audit_logger.log_event, "prediction_assistance", "/api/ai/assistance-type",
data.model_dump(), {"assistance_type": rule_label, "confidence": 1.0}, "rule_based_assistance_v1", duration_ms
))
return AssistanceTypeResponse(
prediction_id=prediction_id,
assistance_type=rule_label,
is_rule_based=True,
confidence=1.0,
score=0.0,
method="rule_based_assistance_v1",
)
# 2. ML Inference
models = request.app.state.models
if "assistance_type" not in models or "assistance_type_encoder" not in models:
return AssistanceTypeResponse(
prediction_id=prediction_id,
assistance_type=rule_label,
is_rule_based=True,
confidence=1.0,
score=0.0,
method="rule_based_assistance_v1",
)
try:
df = await asyncio.to_thread(preprocess_need_level_features, data)
model = models["assistance_type"]
encoder = models["assistance_type_encoder"]
predictions = await asyncio.to_thread(model.predict, df)
prediction_idx = predictions[0]
# Map to label
try:
if hasattr(encoder, "inverse_transform"):
label = encoder.inverse_transform([prediction_idx])[0]
label = label.capitalize() if isinstance(label, str) else label
else:
label = str(prediction_idx)
except Exception:
label = str(prediction_idx)
# Confidence
confidence = 1.0
if hasattr(model, "predict_proba"):
try:
probas = await asyncio.to_thread(model.predict_proba, df)
confidence = float(max(probas[0]))
except Exception:
pass
duration_ms = (time.perf_counter() - start_time) * 1000
asyncio.create_task(asyncio.to_thread(
audit_logger.log_event, "prediction_assistance", "/api/ai/assistance-type",
data.model_dump(), {"assistance_type": label, "confidence": confidence}, "ml_model_assistance_v1", duration_ms
))
return AssistanceTypeResponse(
prediction_id=prediction_id,
assistance_type=label,
is_rule_based=False,
confidence=confidence,
score=0.0,
method="ml_model_assistance_v1",
)
except Exception as e:
logger.error("ML Assistance classification failed: %s", e, exc_info=True)
return AssistanceTypeResponse(
prediction_id=prediction_id,
assistance_type=rule_label,
is_rule_based=True,
confidence=1.0,
score=0.0,
method="rule_based_assistance_v1",
)