Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter | |
| from datetime import datetime | |
| from datasets import load_dataset | |
| from sklearn.metrics import accuracy_score | |
| import random | |
| import os | |
| import librosa | |
| import pickle | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier | |
| from .utils.evaluation import AudioEvaluationRequest | |
| from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| router = APIRouter() | |
| DESCRIPTION = "Not Neural Network" | |
| ROUTE = "/audio" | |
| def extract_mfcc_features( | |
| signal, | |
| sr=16000, | |
| n_mfcc=13, | |
| duration=3.0 | |
| ): | |
| """ | |
| Extrait des MFCC (base, delta, delta-delta) à partir d'un signal audio 1D. | |
| Retourne un tuple: | |
| (features_vector, mfcc_combined) | |
| où: | |
| - features_vector : vecteur 1D (moyenne+std des MFCC combinés), | |
| - mfcc_combined : matrice 2D de taille (3*n_mfcc, nb_frames). | |
| """ | |
| # 1) Durée cible en échantillons | |
| target_length = int(sr * duration) | |
| # 2) Tronquer ou padder le signal à la durée souhaitée | |
| if len(signal) > target_length: | |
| signal = signal[:target_length] | |
| elif len(signal) < target_length: | |
| signal = np.pad(signal, (0, target_length - len(signal)), mode='constant') | |
| # 3) Extraction des MFCC de base | |
| mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc) | |
| # 4) Dérivées première (delta) et seconde (delta-delta) | |
| mfcc_delta = librosa.feature.delta(mfcc, order=1) | |
| mfcc_delta2 = librosa.feature.delta(mfcc, order=2) | |
| # 5) Concaténer en (3*n_mfcc, nb_frames) | |
| mfcc_combined = np.vstack([mfcc, mfcc_delta, mfcc_delta2]) | |
| # 6) Calculer moyenne et écart-type sur l'axe temporel | |
| # => vecteur de taille (6 * n_mfcc) si 3*n_mfcc + mean/std | |
| mfcc_mean = np.mean(mfcc_combined, axis=1) | |
| mfcc_std = np.std(mfcc_combined, axis=1) | |
| # 7) Vecteur global | |
| features_vector = np.concatenate([mfcc_mean, mfcc_std]) | |
| # Retour des deux | |
| return features_vector, mfcc_combined | |
| def transform_data(df, sr=12000, duration=3.0): | |
| """ | |
| Prend un DataFrame df avec colonnes 'audio' et 'label'. | |
| - Extrait les MFCC + delta + delta-delta pour chaque signal | |
| => récupère un vecteur global (mean/std) + la matrice 2D complète. | |
| - Montre comment concaténer ces deux morceaux pour un seul vecteur final. | |
| - Entraîne un RandomForest (binaire). | |
| - Affiche l'accuracy sur un jeu de test (25%). | |
| """ | |
| X = [] | |
| Y = [] | |
| print("Extraction des features MFCC (base + delta + delta-delta)...") | |
| for i, row in df.iterrows(): | |
| signal = row["audio"] | |
| y = row["label"] | |
| # Récupère (vecteur global, matrice 2D) | |
| features_vector, mfcc_matrix = extract_mfcc_features( | |
| signal=signal, | |
| sr=sr, | |
| duration=duration | |
| ) | |
| # Exemple : On concatène (moyenne+std) + la matrice aplatie | |
| mfcc_matrix_flat = mfcc_matrix.flatten() | |
| big_features = np.concatenate([features_vector, mfcc_matrix_flat]) | |
| # On stocke big_features dans X | |
| X.append(big_features) | |
| Y.append(y) | |
| X = np.array(X) | |
| Y = np.array(Y) | |
| return X, y | |
| async def evaluate_audio(request: AudioEvaluationRequest): | |
| """ | |
| Evaluate audio classification for rainforest sound detection. | |
| Current Model: Random Baseline | |
| - Makes random predictions from the label space (0-1) | |
| - Used as a baseline for comparison | |
| """ | |
| # Get space info | |
| username, space_url = get_space_info() | |
| # Define the label mapping | |
| LABEL_MAPPING = { | |
| "chainsaw": 0, | |
| "environment": 1 | |
| } | |
| # Load and prepare the dataset | |
| # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate | |
| dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) | |
| # Split dataset | |
| train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) | |
| test_dataset = train_test["test"] | |
| """ | |
| dict_train = [ | |
| { | |
| "label": elmt["label"], | |
| "audio": elmt["audio"]["array"], | |
| "sampling_rate": elmt["audio"]["sampling_rate"] | |
| } for elmt in train_test["train"] | |
| ] | |
| """ | |
| # df_train = pd.DataFrame(dict_train) | |
| dict_test = [ | |
| { | |
| "label": elmt["label"], | |
| "audio": elmt["audio"]["array"], | |
| "sampling_rate": elmt["audio"]["sampling_rate"] | |
| } for elmt in test_dataset | |
| ] | |
| df_test = pd.DataFrame(dict_test) | |
| # Get the model | |
| with open("models/mon_modele.pkl", "rb") as f: | |
| model = pickle.load(f) | |
| # model = RandomForestClassifier | |
| # X_train, y_train = transform_data(df_test) | |
| X_test, y_test = transform_data(df_test) | |
| # Start tracking emissions | |
| tracker.start() | |
| tracker.start_task("inference") | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE CODE HERE | |
| # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
| #-------------------------------------------------------------------------------------------- | |
| # clf = RandomForestClassifier(n_estimators=100, random_state=42) | |
| # clf.fit(X_train, y_train) | |
| print("Évaluation sur le test set...") | |
| y_pred = model.predict(X_test) | |
| # Make random predictions (placeholder for actual model inference) | |
| true_labels = test_dataset["label"] | |
| # predictions = [random.randint(0, 1) for _ in range(len(true_labels))] | |
| predictions = model.predict(df_test) | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE STOPS HERE | |
| #-------------------------------------------------------------------------------------------- | |
| # Stop tracking emissions | |
| emissions_data = tracker.stop_task() | |
| # Calculate accuracy | |
| accuracy = accuracy_score(true_labels, predictions) | |
| # Prepare results dictionary | |
| results = { | |
| "username": username, | |
| "space_url": space_url, | |
| "submission_timestamp": datetime.now().isoformat(), | |
| "model_description": DESCRIPTION, | |
| "accuracy": float(accuracy), | |
| "energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
| "emissions_gco2eq": emissions_data.emissions * 1000, | |
| "emissions_data": clean_emissions_data(emissions_data), | |
| "api_route": ROUTE, | |
| "dataset_config": { | |
| "dataset_name": request.dataset_name, | |
| "test_size": request.test_size, | |
| "test_seed": request.test_seed | |
| } | |
| } | |
| return results |