Spaces:
Running
Running
| """ | |
| Prediction API Routes — Need level assessment and assistance type classification. | |
| """ | |
| from fastapi import APIRouter, Request | |
| from app.schemas import NeedLevelRequest, NeedLevelResponse, AssistanceTypeResponse | |
| from app.services.prediction.prediction_engine import ( | |
| preprocess_need_level_features, | |
| validate_need_level_rules, | |
| classify_assistance_by_rules, | |
| get_score_from_level, | |
| LABEL_FALLBACK_MAP, | |
| ) | |
| import asyncio | |
| import logging | |
| import uuid | |
| import time | |
| from app.core.audit_logger import audit_logger | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| async def predict_need_level(request: Request, data: NeedLevelRequest): | |
| """ | |
| Evaluates the urgency and need score of an assistance request using a hybrid ML approach. | |
| Expects structured financial and demographic data mapped to a standard NeedLevelRequest model. | |
| Instantly applies deterministic guardrails before deferring to the XGBoost risk scoring engine. | |
| Retruns the categorized need level, confidence percentiles, and optional SHAP feature explanations. | |
| """ | |
| prediction_id = str(uuid.uuid4()) | |
| start_time = time.perf_counter() | |
| # 1. Rule-Based Guardrails (instant, no ML needed) | |
| rule_result = validate_need_level_rules(data) | |
| if rule_result: | |
| duration_ms = (time.perf_counter() - start_time) * 1000 | |
| asyncio.create_task(asyncio.to_thread( | |
| audit_logger.log_event, "prediction_need_level", "/api/ai/need-level", | |
| data.model_dump(), {"need_level": rule_result, "confidence": 1.0}, "rule_based_guardrail", duration_ms | |
| )) | |
| return NeedLevelResponse( | |
| prediction_id=prediction_id, | |
| need_level=rule_result, | |
| confidence=1.0, | |
| score=get_score_from_level(rule_result), | |
| method="rule_based_guardrail", | |
| ) | |
| # 2. ML Inference (async to avoid blocking) | |
| if "need_level" not in request.app.state.models: | |
| return NeedLevelResponse( | |
| prediction_id=prediction_id, | |
| need_level="Medium", | |
| confidence=0.0, | |
| score=50.0, | |
| method="fallback_mock", | |
| ) | |
| df = await asyncio.to_thread(preprocess_need_level_features, data) | |
| model = request.app.state.models["need_level"] | |
| predictions = await asyncio.to_thread(model.predict, df) | |
| prediction_val = predictions[0] | |
| # Confidence | |
| confidence = 0.95 | |
| if hasattr(model, "predict_proba"): | |
| try: | |
| probas = await asyncio.to_thread(model.predict_proba, df) | |
| confidence = float(max(probas[0])) | |
| except Exception: | |
| pass | |
| # Map prediction to label (need_level is always assigned in both branches below) | |
| if "need_level_encoder" in request.app.state.models: | |
| encoder = request.app.state.models["need_level_encoder"] | |
| try: | |
| need_level = encoder.inverse_transform([int(prediction_val)])[0] | |
| except Exception: | |
| need_level = LABEL_FALLBACK_MAP.get(int(prediction_val), "Medium") | |
| else: | |
| need_level = LABEL_FALLBACK_MAP.get(int(prediction_val), "Medium") | |
| # SHAP Explainability (non-blocking — always returns even on failure) | |
| # Only pass the 7 numeric features the model was trained on | |
| NUMERIC_FEATURES = [ | |
| "family_size", "income_monthly", "monthly_expenses", "debts", | |
| "number_of_children", "age", "expense_to_income_ratio" | |
| ] | |
| explanation_dict = None | |
| try: | |
| from app.services.prediction.prediction_engine import generate_shap_explanation | |
| df_numeric = df[NUMERIC_FEATURES] | |
| explanation_dict = await asyncio.to_thread(generate_shap_explanation, model, df_numeric, need_level) | |
| except Exception as exp_err: | |
| logger.warning("SHAP explanation failed (non-critical): %s", exp_err) | |
| duration_ms = (time.perf_counter() - start_time) * 1000 | |
| asyncio.create_task(asyncio.to_thread( | |
| audit_logger.log_event, "prediction_need_level", "/api/ai/need-level", | |
| data.model_dump(), {"need_level": need_level, "confidence": confidence}, "ml_model_synthetic_v1", duration_ms | |
| )) | |
| return NeedLevelResponse( | |
| prediction_id=prediction_id, | |
| need_level=need_level, | |
| confidence=confidence, | |
| score=get_score_from_level(need_level), | |
| method="ml_model_synthetic_v1", | |
| explanation=explanation_dict, | |
| ) | |
| async def classify_assistance(request: Request, data: NeedLevelRequest): | |
| """ | |
| Classifies the most appropriate category of demographic assistance necessary for an applicant. | |
| Ingests core family metrics to evaluate rigid deterministic rules (e.g., medical extremity). | |
| Falls back transparently to a dedicated classification model capturing advanced socioeconomic contexts. | |
| Returns the recommended assistance string definition, boolean rule triggers, and analytical confidence. | |
| """ | |
| prediction_id = str(uuid.uuid4()) | |
| start_time = time.perf_counter() | |
| # 1. Rule-Based (instant) | |
| rule_label = classify_assistance_by_rules(data) | |
| if rule_label != "General Support": | |
| duration_ms = (time.perf_counter() - start_time) * 1000 | |
| asyncio.create_task(asyncio.to_thread( | |
| audit_logger.log_event, "prediction_assistance", "/api/ai/assistance-type", | |
| data.model_dump(), {"assistance_type": rule_label, "confidence": 1.0}, "rule_based_assistance_v1", duration_ms | |
| )) | |
| return AssistanceTypeResponse( | |
| prediction_id=prediction_id, | |
| assistance_type=rule_label, | |
| is_rule_based=True, | |
| confidence=1.0, | |
| score=0.0, | |
| method="rule_based_assistance_v1", | |
| ) | |
| # 2. ML Inference | |
| models = request.app.state.models | |
| if "assistance_type" not in models or "assistance_type_encoder" not in models: | |
| return AssistanceTypeResponse( | |
| prediction_id=prediction_id, | |
| assistance_type=rule_label, | |
| is_rule_based=True, | |
| confidence=1.0, | |
| score=0.0, | |
| method="rule_based_assistance_v1", | |
| ) | |
| try: | |
| df = await asyncio.to_thread(preprocess_need_level_features, data) | |
| model = models["assistance_type"] | |
| encoder = models["assistance_type_encoder"] | |
| predictions = await asyncio.to_thread(model.predict, df) | |
| prediction_idx = predictions[0] | |
| # Map to label | |
| try: | |
| if hasattr(encoder, "inverse_transform"): | |
| label = encoder.inverse_transform([prediction_idx])[0] | |
| label = label.capitalize() if isinstance(label, str) else label | |
| else: | |
| label = str(prediction_idx) | |
| except Exception: | |
| label = str(prediction_idx) | |
| # Confidence | |
| confidence = 1.0 | |
| if hasattr(model, "predict_proba"): | |
| try: | |
| probas = await asyncio.to_thread(model.predict_proba, df) | |
| confidence = float(max(probas[0])) | |
| except Exception: | |
| pass | |
| duration_ms = (time.perf_counter() - start_time) * 1000 | |
| asyncio.create_task(asyncio.to_thread( | |
| audit_logger.log_event, "prediction_assistance", "/api/ai/assistance-type", | |
| data.model_dump(), {"assistance_type": label, "confidence": confidence}, "ml_model_assistance_v1", duration_ms | |
| )) | |
| return AssistanceTypeResponse( | |
| prediction_id=prediction_id, | |
| assistance_type=label, | |
| is_rule_based=False, | |
| confidence=confidence, | |
| score=0.0, | |
| method="ml_model_assistance_v1", | |
| ) | |
| except Exception as e: | |
| logger.error("ML Assistance classification failed: %s", e, exc_info=True) | |
| return AssistanceTypeResponse( | |
| prediction_id=prediction_id, | |
| assistance_type=rule_label, | |
| is_rule_based=True, | |
| confidence=1.0, | |
| score=0.0, | |
| method="rule_based_assistance_v1", | |
| ) | |