pneumonia_space / inference_service.py
QnxprU69yCNg8XJ
Add RandomRealisticClassifier class to inference_service for demo classifier loading
0fd05da
import os
import warnings
import numpy as np
import librosa
import joblib
import soundfile as sf
import openl3 # <- OpenL3 pour embeddings audio
from sklearn.base import BaseEstimator, ClassifierMixin
# ============================================================
# Configuration générale
# ============================================================
warnings.filterwarnings("ignore", category=UserWarning, module="soundfile")
warnings.filterwarnings("ignore", category=UserWarning, module="librosa")
SAMPLE_RATE = 16000
CLIP_DURATION = 2 # seconds
CLIP_LENGTH = SAMPLE_RATE * CLIP_DURATION
CLIP_OVERLAP_PERCENT = 10
CLIP_IGNORE_SILENT_CLIPS = True
SILENCE_RMS_THRESHOLD_DB = -50
# ============================================================
# Classifier de démonstration (pour tests)
# ============================================================
class RandomRealisticClassifier(BaseEstimator, ClassifierMixin):
"""
Classifier qui génère des scores aléatoires mais réalistes
- Low: 0.0 - 0.4 (40% des cas)
- Moderate: 0.4 - 0.7 (35% des cas)
- High: 0.7 - 1.0 (25% des cas)
"""
def __init__(self, random_state=None):
self.random_state = random_state
self.classes_ = np.array([0, 1])
def fit(self, X, y):
"""Fake fit - ne fait rien"""
return self
def predict(self, X):
"""Génère des prédictions basées sur les probabilités"""
probas = self.predict_proba(X)
return (probas[:, 1] > 0.5).astype(int)
def predict_proba(self, X):
"""
Génère des probabilités aléatoires réalistes
"""
n_samples = X.shape[0]
rng = np.random.RandomState(self.random_state)
# Générer des scores pour chaque sample
scores = []
for i in range(n_samples):
# Utiliser les features pour créer une "seed" unique par sample
seed = int(np.abs(np.sum(X[i]) * 1000)) % 1000000
sample_rng = np.random.RandomState(seed)
# Choisir une catégorie aléatoirement
category = sample_rng.choice(['low', 'moderate', 'high'],
p=[0.40, 0.35, 0.25])
if category == 'low':
# Low: 0.05 - 0.40
score = sample_rng.uniform(0.05, 0.40)
elif category == 'moderate':
# Moderate: 0.40 - 0.70
score = sample_rng.uniform(0.40, 0.70)
else: # high
# High: 0.70 - 0.95
score = sample_rng.uniform(0.70, 0.95)
scores.append(score)
scores = np.array(scores)
# Retourner les probabilités pour [classe 0, classe 1]
probas = np.column_stack([1 - scores, scores])
return probas
# ============================================================
# Utilitaire de test
# ============================================================
def create_dummy_audio(filename="dummy_audio.wav", duration=5, sr=SAMPLE_RATE):
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
audio = 0.5 * np.sin(2 * np.pi * 440 * t)
sf.write(filename, audio.astype(np.float32), sr)
return filename
# ============================================================
# Chargement du classifieur entraîné
# ============================================================
def load_classifier(model_path="pneumonia_classifier.joblib"):
print(f"Loading classifier from '{model_path}'...")
try:
clf = joblib.load(model_path)
print("Classifier loaded successfully.")
return clf
except Exception as e:
print("CRITICAL: Failed to load classifier:", str(e))
return None
# ============================================================
# Prétraitement audio
# ============================================================
def preprocess_audio(
audio_path,
sample_rate=SAMPLE_RATE,
clip_duration=CLIP_DURATION,
clip_overlap_percent=CLIP_OVERLAP_PERCENT,
ignore_silent_clips=CLIP_IGNORE_SILENT_CLIPS,
silence_rms_threshold_db=SILENCE_RMS_THRESHOLD_DB,
):
audio, sr = librosa.load(audio_path, sr=sample_rate, mono=True)
clip_length = sr * clip_duration
overlap = int(clip_length * (clip_overlap_percent / 100))
step = clip_length - overlap
clips = []
for start in range(0, len(audio), step):
end = start + clip_length
clip = audio[start:end]
if len(clip) < clip_length:
clip = np.pad(clip, (0, clip_length - len(clip)))
rms_db = 20 * np.log10(np.sqrt(np.mean(clip**2)) + 1e-10)
if ignore_silent_clips and rms_db < silence_rms_threshold_db:
continue
clips.append(clip)
if len(clips) == 0:
return np.empty((0, clip_length), dtype=np.float32)
return np.asarray(clips, dtype=np.float32)
# ============================================================
# Extraction des embeddings avec OpenL3
# ============================================================
def generate_embeddings(audio_clips):
"""
audio_clips: shape (N, 32000)
return: shape (N, embedding_dim)
"""
if len(audio_clips) == 0:
return np.empty((0, 512)) # OpenL3 embedding dim par défaut
embeddings_list = []
for clip in audio_clips:
# OpenL3 attend shape (samples,) float32
# Ne pas passer model= comme string, laisser OpenL3 charger le modèle par défaut
embedding, ts = openl3.get_audio_embedding(
clip, sr=SAMPLE_RATE,
input_repr="mel256", # représentation spectrogramme
content_type="env", # "env" pour sons environnementaux/respiratoires
embedding_size=512
)
# embeddings_list.append shape (1, embedding_dim)
embeddings_list.append(np.mean(embedding, axis=0))
return np.vstack(embeddings_list)
# ============================================================
# Prédiction pneumonie
# ============================================================
def predict_pneumonia(embeddings, classifier):
if embeddings.shape[0] == 0:
return None, None
preds = classifier.predict(embeddings)
probs = classifier.predict_proba(embeddings)
return preds, probs
# ============================================================
# Agrégation des résultats
# ============================================================
def aggregate_predictions(predictions, probabilities):
"""
Retourne :
- label final (0/1)
- score de risque moyen
"""
if predictions is None or len(predictions) == 0:
return None, None
mean_risk = float(np.mean(probabilities[:, 1]))
final_label = int(np.round(mean_risk))
return final_label, mean_risk