Spaces:
Running
Running
File size: 5,080 Bytes
cf93910 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """
Ensemble logic β orchestrates Pipelines A β B β C with confidence-based fallback.
Strategy
--------
1. Always run Pipeline A (XGBoost, sub-ms).
2. If confidence < CONFIDENCE_THRESHOLD, also run Pipeline B (Autoencoder+LGBM).
3. Average the probability vectors from the pipelines that were run.
4. If ensemble confidence still < SECONDARY_THRESHOLD AND image data is supplied,
also run Pipeline C (CNN+SVM) and include it in the average.
5. Return the class with the highest averaged probability.
The caller can also force a specific pipeline via the PIPELINE_MODE config.
"""
from __future__ import annotations
import logging
import time
from typing import List, Optional, Any
import numpy as np
from app import config
from app.models.label_map import get_sign
from app.inference.pipeline_a import PredictionResult
import app.inference.pipeline_a as _pa
import app.inference.pipeline_b as _pb
import app.inference.pipeline_c as _pc
logger = logging.getLogger(__name__)
def run(
landmarks: List[float],
*,
image_input: Optional[str] = None,
xgb_model: Any,
encoder_model: Any,
lgbm_model: Any,
cnn_model: Any,
svm_model: Any,
pipeline_mode: str = "ensemble",
confidence_threshold: float = 0.70,
secondary_threshold: float = 0.60,
) -> PredictionResult:
"""
Run one or more inference pipelines and return a consolidated PredictionResult.
Parameters
----------
landmarks : flat 63-element MediaPipe landmark vector
image_input : optional base-64 JPEG for Pipeline C
xgb_model : Pipeline A model
encoder_model : Pipeline B encoder (Keras sub-model)
lgbm_model : Pipeline B classifier
cnn_model : Pipeline C feature extractor
svm_model : Pipeline C classifier
pipeline_mode : "A" | "B" | "C" | "ensemble"
confidence_threshold : fallback to B when A confidence < this value
secondary_threshold : fallback to C when ensemble(A+B) confidence < this value
"""
t0 = time.perf_counter()
# -----------------------------------------------------------
# Forced single-pipeline modes
# -----------------------------------------------------------
if pipeline_mode == "A":
if xgb_model is None:
raise RuntimeError("Pipeline A model not loaded.")
return _pa.predict(landmarks, xgb_model)
if pipeline_mode == "B":
if encoder_model is None or lgbm_model is None:
raise RuntimeError("Pipeline B models not loaded.")
return _pb.predict(landmarks, encoder_model, lgbm_model)
if pipeline_mode == "C":
if cnn_model is None or svm_model is None:
raise RuntimeError("Pipeline C models not loaded.")
if image_input is None:
raise ValueError("Pipeline C requires image_input.")
return _pc.predict(image_input, cnn_model, svm_model)
# -----------------------------------------------------------
# Ensemble mode (default)
# -----------------------------------------------------------
results: list[PredictionResult] = []
proba_stack: list[list[float]] = []
# Step 1 β Pipeline A (always)
if xgb_model is not None:
res_a = _pa.predict(landmarks, xgb_model)
results.append(res_a)
proba_stack.append(res_a.probabilities)
else:
logger.warning("Pipeline A not available in ensemble mode.")
res_a = None
# Step 2 β Pipeline B if A confidence is low
current_conf = float(np.max(np.mean(proba_stack, axis=0))) if proba_stack else 0.0
if current_conf < confidence_threshold and encoder_model is not None and lgbm_model is not None:
res_b = _pb.predict(landmarks, encoder_model, lgbm_model)
results.append(res_b)
proba_stack.append(res_b.probabilities)
# Step 3 β Pipeline C if still low and image provided
current_conf = float(np.max(np.mean(proba_stack, axis=0))) if proba_stack else 0.0
if (
current_conf < secondary_threshold
and image_input is not None
and cnn_model is not None
and svm_model is not None
):
res_c = _pc.predict(image_input, cnn_model, svm_model)
results.append(res_c)
proba_stack.append(res_c.probabilities)
# -----------------------------------------------------------
# Aggregate
# -----------------------------------------------------------
if not proba_stack:
raise RuntimeError("No inference pipeline could be executed.")
avg_proba = np.mean(proba_stack, axis=0) # shape (34,)
idx = int(np.argmax(avg_proba))
conf = float(avg_proba[idx])
pipeline_labels = "+".join(r.pipeline for r in results)
total_latency = (time.perf_counter() - t0) * 1000
return PredictionResult(
sign=get_sign(idx),
confidence=conf,
label_index=idx,
probabilities=avg_proba.tolist(),
pipeline=pipeline_labels if len(results) > 1 else results[0].pipeline,
latency_ms=total_latency,
)
|