File size: 5,080 Bytes
cf93910
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Ensemble logic β€” orchestrates Pipelines A β†’ B β†’ C with confidence-based fallback.

Strategy
--------
1. Always run Pipeline A (XGBoost, sub-ms).
2. If confidence < CONFIDENCE_THRESHOLD, also run Pipeline B (Autoencoder+LGBM).
3. Average the probability vectors from the pipelines that were run.
4. If ensemble confidence still < SECONDARY_THRESHOLD AND image data is supplied,
   also run Pipeline C (CNN+SVM) and include it in the average.
5. Return the class with the highest averaged probability.

The caller can also force a specific pipeline via the PIPELINE_MODE config.
"""
from __future__ import annotations

import logging
import time
from typing import List, Optional, Any

import numpy as np

from app import config
from app.models.label_map import get_sign
from app.inference.pipeline_a import PredictionResult
import app.inference.pipeline_a as _pa
import app.inference.pipeline_b as _pb
import app.inference.pipeline_c as _pc

logger = logging.getLogger(__name__)


def run(
    landmarks: List[float],
    *,
    image_input: Optional[str] = None,
    xgb_model: Any,
    encoder_model: Any,
    lgbm_model: Any,
    cnn_model: Any,
    svm_model: Any,
    pipeline_mode: str = "ensemble",
    confidence_threshold: float = 0.70,
    secondary_threshold: float = 0.60,
) -> PredictionResult:
    """
    Run one or more inference pipelines and return a consolidated PredictionResult.

    Parameters
    ----------
    landmarks            : flat 63-element MediaPipe landmark vector
    image_input          : optional base-64 JPEG for Pipeline C
    xgb_model            : Pipeline A model
    encoder_model        : Pipeline B encoder (Keras sub-model)
    lgbm_model           : Pipeline B classifier
    cnn_model            : Pipeline C feature extractor
    svm_model            : Pipeline C classifier
    pipeline_mode        : "A" | "B" | "C" | "ensemble"
    confidence_threshold : fallback to B when A confidence < this value
    secondary_threshold  : fallback to C when ensemble(A+B) confidence < this value
    """
    t0 = time.perf_counter()

    # -----------------------------------------------------------
    # Forced single-pipeline modes
    # -----------------------------------------------------------
    if pipeline_mode == "A":
        if xgb_model is None:
            raise RuntimeError("Pipeline A model not loaded.")
        return _pa.predict(landmarks, xgb_model)

    if pipeline_mode == "B":
        if encoder_model is None or lgbm_model is None:
            raise RuntimeError("Pipeline B models not loaded.")
        return _pb.predict(landmarks, encoder_model, lgbm_model)

    if pipeline_mode == "C":
        if cnn_model is None or svm_model is None:
            raise RuntimeError("Pipeline C models not loaded.")
        if image_input is None:
            raise ValueError("Pipeline C requires image_input.")
        return _pc.predict(image_input, cnn_model, svm_model)

    # -----------------------------------------------------------
    # Ensemble mode (default)
    # -----------------------------------------------------------
    results: list[PredictionResult] = []
    proba_stack: list[list[float]] = []

    # Step 1 β€” Pipeline A (always)
    if xgb_model is not None:
        res_a = _pa.predict(landmarks, xgb_model)
        results.append(res_a)
        proba_stack.append(res_a.probabilities)
    else:
        logger.warning("Pipeline A not available in ensemble mode.")
        res_a = None

    # Step 2 β€” Pipeline B if A confidence is low
    current_conf = float(np.max(np.mean(proba_stack, axis=0))) if proba_stack else 0.0
    if current_conf < confidence_threshold and encoder_model is not None and lgbm_model is not None:
        res_b = _pb.predict(landmarks, encoder_model, lgbm_model)
        results.append(res_b)
        proba_stack.append(res_b.probabilities)

    # Step 3 β€” Pipeline C if still low and image provided
    current_conf = float(np.max(np.mean(proba_stack, axis=0))) if proba_stack else 0.0
    if (
        current_conf < secondary_threshold
        and image_input is not None
        and cnn_model is not None
        and svm_model is not None
    ):
        res_c = _pc.predict(image_input, cnn_model, svm_model)
        results.append(res_c)
        proba_stack.append(res_c.probabilities)

    # -----------------------------------------------------------
    # Aggregate
    # -----------------------------------------------------------
    if not proba_stack:
        raise RuntimeError("No inference pipeline could be executed.")

    avg_proba = np.mean(proba_stack, axis=0)   # shape (34,)
    idx  = int(np.argmax(avg_proba))
    conf = float(avg_proba[idx])

    pipeline_labels = "+".join(r.pipeline for r in results)
    total_latency   = (time.perf_counter() - t0) * 1000

    return PredictionResult(
        sign=get_sign(idx),
        confidence=conf,
        label_index=idx,
        probabilities=avg_proba.tolist(),
        pipeline=pipeline_labels if len(results) > 1 else results[0].pipeline,
        latency_ms=total_latency,
    )