Spaces:
Sleeping
Sleeping
QnxprU69yCNg8XJ
Add RandomRealisticClassifier class to inference_service for demo classifier loading
0fd05da | import os | |
| import warnings | |
| import numpy as np | |
| import librosa | |
| import joblib | |
| import soundfile as sf | |
| import openl3 # <- OpenL3 pour embeddings audio | |
| from sklearn.base import BaseEstimator, ClassifierMixin | |
| # ============================================================ | |
| # Configuration générale | |
| # ============================================================ | |
| warnings.filterwarnings("ignore", category=UserWarning, module="soundfile") | |
| warnings.filterwarnings("ignore", category=UserWarning, module="librosa") | |
| SAMPLE_RATE = 16000 | |
| CLIP_DURATION = 2 # seconds | |
| CLIP_LENGTH = SAMPLE_RATE * CLIP_DURATION | |
| CLIP_OVERLAP_PERCENT = 10 | |
| CLIP_IGNORE_SILENT_CLIPS = True | |
| SILENCE_RMS_THRESHOLD_DB = -50 | |
| # ============================================================ | |
| # Classifier de démonstration (pour tests) | |
| # ============================================================ | |
| class RandomRealisticClassifier(BaseEstimator, ClassifierMixin): | |
| """ | |
| Classifier qui génère des scores aléatoires mais réalistes | |
| - Low: 0.0 - 0.4 (40% des cas) | |
| - Moderate: 0.4 - 0.7 (35% des cas) | |
| - High: 0.7 - 1.0 (25% des cas) | |
| """ | |
| def __init__(self, random_state=None): | |
| self.random_state = random_state | |
| self.classes_ = np.array([0, 1]) | |
| def fit(self, X, y): | |
| """Fake fit - ne fait rien""" | |
| return self | |
| def predict(self, X): | |
| """Génère des prédictions basées sur les probabilités""" | |
| probas = self.predict_proba(X) | |
| return (probas[:, 1] > 0.5).astype(int) | |
| def predict_proba(self, X): | |
| """ | |
| Génère des probabilités aléatoires réalistes | |
| """ | |
| n_samples = X.shape[0] | |
| rng = np.random.RandomState(self.random_state) | |
| # Générer des scores pour chaque sample | |
| scores = [] | |
| for i in range(n_samples): | |
| # Utiliser les features pour créer une "seed" unique par sample | |
| seed = int(np.abs(np.sum(X[i]) * 1000)) % 1000000 | |
| sample_rng = np.random.RandomState(seed) | |
| # Choisir une catégorie aléatoirement | |
| category = sample_rng.choice(['low', 'moderate', 'high'], | |
| p=[0.40, 0.35, 0.25]) | |
| if category == 'low': | |
| # Low: 0.05 - 0.40 | |
| score = sample_rng.uniform(0.05, 0.40) | |
| elif category == 'moderate': | |
| # Moderate: 0.40 - 0.70 | |
| score = sample_rng.uniform(0.40, 0.70) | |
| else: # high | |
| # High: 0.70 - 0.95 | |
| score = sample_rng.uniform(0.70, 0.95) | |
| scores.append(score) | |
| scores = np.array(scores) | |
| # Retourner les probabilités pour [classe 0, classe 1] | |
| probas = np.column_stack([1 - scores, scores]) | |
| return probas | |
| # ============================================================ | |
| # Utilitaire de test | |
| # ============================================================ | |
| def create_dummy_audio(filename="dummy_audio.wav", duration=5, sr=SAMPLE_RATE): | |
| t = np.linspace(0, duration, int(sr * duration), endpoint=False) | |
| audio = 0.5 * np.sin(2 * np.pi * 440 * t) | |
| sf.write(filename, audio.astype(np.float32), sr) | |
| return filename | |
| # ============================================================ | |
| # Chargement du classifieur entraîné | |
| # ============================================================ | |
| def load_classifier(model_path="pneumonia_classifier.joblib"): | |
| print(f"Loading classifier from '{model_path}'...") | |
| try: | |
| clf = joblib.load(model_path) | |
| print("Classifier loaded successfully.") | |
| return clf | |
| except Exception as e: | |
| print("CRITICAL: Failed to load classifier:", str(e)) | |
| return None | |
| # ============================================================ | |
| # Prétraitement audio | |
| # ============================================================ | |
| def preprocess_audio( | |
| audio_path, | |
| sample_rate=SAMPLE_RATE, | |
| clip_duration=CLIP_DURATION, | |
| clip_overlap_percent=CLIP_OVERLAP_PERCENT, | |
| ignore_silent_clips=CLIP_IGNORE_SILENT_CLIPS, | |
| silence_rms_threshold_db=SILENCE_RMS_THRESHOLD_DB, | |
| ): | |
| audio, sr = librosa.load(audio_path, sr=sample_rate, mono=True) | |
| clip_length = sr * clip_duration | |
| overlap = int(clip_length * (clip_overlap_percent / 100)) | |
| step = clip_length - overlap | |
| clips = [] | |
| for start in range(0, len(audio), step): | |
| end = start + clip_length | |
| clip = audio[start:end] | |
| if len(clip) < clip_length: | |
| clip = np.pad(clip, (0, clip_length - len(clip))) | |
| rms_db = 20 * np.log10(np.sqrt(np.mean(clip**2)) + 1e-10) | |
| if ignore_silent_clips and rms_db < silence_rms_threshold_db: | |
| continue | |
| clips.append(clip) | |
| if len(clips) == 0: | |
| return np.empty((0, clip_length), dtype=np.float32) | |
| return np.asarray(clips, dtype=np.float32) | |
| # ============================================================ | |
| # Extraction des embeddings avec OpenL3 | |
| # ============================================================ | |
| def generate_embeddings(audio_clips): | |
| """ | |
| audio_clips: shape (N, 32000) | |
| return: shape (N, embedding_dim) | |
| """ | |
| if len(audio_clips) == 0: | |
| return np.empty((0, 512)) # OpenL3 embedding dim par défaut | |
| embeddings_list = [] | |
| for clip in audio_clips: | |
| # OpenL3 attend shape (samples,) float32 | |
| # Ne pas passer model= comme string, laisser OpenL3 charger le modèle par défaut | |
| embedding, ts = openl3.get_audio_embedding( | |
| clip, sr=SAMPLE_RATE, | |
| input_repr="mel256", # représentation spectrogramme | |
| content_type="env", # "env" pour sons environnementaux/respiratoires | |
| embedding_size=512 | |
| ) | |
| # embeddings_list.append shape (1, embedding_dim) | |
| embeddings_list.append(np.mean(embedding, axis=0)) | |
| return np.vstack(embeddings_list) | |
| # ============================================================ | |
| # Prédiction pneumonie | |
| # ============================================================ | |
| def predict_pneumonia(embeddings, classifier): | |
| if embeddings.shape[0] == 0: | |
| return None, None | |
| preds = classifier.predict(embeddings) | |
| probs = classifier.predict_proba(embeddings) | |
| return preds, probs | |
| # ============================================================ | |
| # Agrégation des résultats | |
| # ============================================================ | |
| def aggregate_predictions(predictions, probabilities): | |
| """ | |
| Retourne : | |
| - label final (0/1) | |
| - score de risque moyen | |
| """ | |
| if predictions is None or len(predictions) == 0: | |
| return None, None | |
| mean_risk = float(np.mean(probabilities[:, 1])) | |
| final_label = int(np.round(mean_risk)) | |
| return final_label, mean_risk | |