Spaces:
Sleeping
Sleeping
File size: 6,556 Bytes
cf93910 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """
Singleton model store β loads all model artifacts once at startup and holds them
in memory for the lifetime of the process.
Usage inside FastAPI:
from app.models.loader import get_model_store
store = get_model_store() # dependency injection or direct call
"""
from __future__ import annotations
import logging
import os
import pickle
import time
from dataclasses import dataclass, field
from typing import Any
import numpy as np
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data class that holds every loaded artifact
# ---------------------------------------------------------------------------
@dataclass
class ModelStore:
# Pipeline A
xgb_model: Any = field(default=None)
# Pipeline B
encoder_model: Any = field(default=None) # Keras sub-model (encoder half)
lgbm_model: Any = field(default=None)
# Pipeline C
cnn_model: Any = field(default=None) # Keras ResNet50 feature extractor
svm_model: Any = field(default=None)
loaded: bool = field(default=False)
# Module-level singleton
_store: ModelStore | None = None
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _load_pickle(path: str, label: str) -> Any:
t0 = time.perf_counter()
with open(path, "rb") as f:
obj = pickle.load(f)
elapsed = (time.perf_counter() - t0) * 1000
logger.info("Loaded %-35s (%.1f ms)", label, elapsed)
return obj
def _build_encoder(autoencoder_pkl_path: str) -> Any:
"""
Load the full autoencoder from pickle and extract the encoder sub-model.
The autoencoder is a Keras Sequential:
InputLayer (63)
Dense 32 relu β layer index 0
Dense 16 relu β layer index 1 β bottleneck output
Dense 32 relu
Dense 63 linear
We build a Keras Model that maps input β output of the bottleneck Dense.
"""
import os
os.environ.setdefault("KERAS_BACKEND", "tensorflow")
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "3")
full_ae = _load_pickle(autoencoder_pkl_path, "autoencoder_model.pkl")
# Dynamically import keras after env vars are set
import keras
# The Sequential model's built layers: 0=Dense(32), 1=Dense(16), 2=Dense(32), 3=Dense(63)
# layer index 1 output is the 16-D bottleneck.
# We can't use full_ae.input directly on a Sequential that was pickled without
# a traced input tensor, so we wire the layers manually.
import numpy as _np
inp = keras.Input(shape=(63,), name="encoder_input")
x = full_ae.layers[0](inp) # Dense(32, relu)
x = full_ae.layers[1](x) # Dense(16, relu) β bottleneck
encoder = keras.Model(inputs=inp, outputs=x, name="encoder_only")
logger.info("Built encoder sub-model: input(%s) β output(%s)", encoder.input_shape, encoder.output_shape)
return encoder
def _build_cnn_feature_extractor(cnn_pkl_path: str) -> Any:
"""
Load the full CNN (ResNet50 Functional model) from pickle and return a
sub-model that outputs the 256-D penultimate Dense layer.
Architecture (tail of the model):
β¦ ResNet50 backbone β¦
GlobalAveragePooling2D
Dropout(0.5)
Dense(256, relu) β feature vector we want
Dropout(0.5)
Dense(34, softmax) β final classification head (skip this)
The SVC was trained on the 256-D features, so we must stop before the
final Dense(34) layer.
"""
import os
os.environ.setdefault("KERAS_BACKEND", "tensorflow")
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "3")
import keras
full_cnn = _load_pickle(cnn_pkl_path, "cnn_model.pkl")
# Find the Dense(256) layer by scanning from the end
feature_layer = None
for layer in reversed(full_cnn.layers):
cfg = layer.get_config()
if layer.__class__.__name__ == 'Dense' and cfg.get('units') == 256:
feature_layer = layer
break
if feature_layer is None:
logger.warning(
"Could not find Dense(256) layer; using full CNN output as features."
)
return full_cnn
extractor = keras.Model(
inputs=full_cnn.input,
outputs=feature_layer.output,
name="cnn_feature_extractor",
)
logger.info(
"CNN feature extractor: input %s β output %s",
extractor.input_shape,
extractor.output_shape,
)
return extractor
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def load_models() -> ModelStore:
"""
Load all models and return a populated ModelStore.
Call this once from the FastAPI lifespan event.
"""
global _store
from app import config # local import to avoid circular at module level
store = ModelStore()
# ---- Pipeline A --------------------------------------------------------
if os.path.exists(config.PIPELINE_A_MODEL):
store.xgb_model = _load_pickle(config.PIPELINE_A_MODEL, "xgb model.pkl")
else:
logger.warning("Pipeline A model not found: %s", config.PIPELINE_A_MODEL)
# ---- Pipeline B --------------------------------------------------------
if os.path.exists(config.PIPELINE_B_AE) and os.path.exists(config.PIPELINE_B_LGBM):
store.encoder_model = _build_encoder(config.PIPELINE_B_AE)
store.lgbm_model = _load_pickle(config.PIPELINE_B_LGBM, "lgbm_model.pkl")
else:
logger.warning("Pipeline B models not found β B will be skipped.")
# ---- Pipeline C --------------------------------------------------------
if os.path.exists(config.PIPELINE_C_CNN) and os.path.exists(config.PIPELINE_C_SVM):
store.cnn_model = _build_cnn_feature_extractor(config.PIPELINE_C_CNN)
store.svm_model = _load_pickle(config.PIPELINE_C_SVM, "svm_model.pkl")
else:
logger.warning("Pipeline C models not found β C will be skipped.")
store.loaded = True
logger.info("All models loaded successfully.")
_store = store
return store
def get_model_store() -> ModelStore:
"""Return the singleton ModelStore (must have been loaded via load_models() first)."""
if _store is None or not _store.loaded:
raise RuntimeError("ModelStore has not been initialised β call load_models() first.")
return _store
|