Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import KBinsDiscretizer, StandardScaler, OneHotEncoder | |
| from sklearn.feature_extraction.text import HashingVectorizer | |
| from collections import Counter | |
| import joblib | |
| import freesound | |
| import gensim.downloader as api | |
| from huggingface_hub import hf_hub_download | |
| import xgboost as xgb | |
| # -------- FreeSound API -------- | |
| client = freesound.FreesoundClient() | |
| client.set_token("zE9NjEOgUMzH9K7mjiGBaPJiNwJLjSM53LevarRK", "token") | |
| dataset_dir = "dataset_audio" | |
| os.makedirs(dataset_dir, exist_ok=True) | |
| class AvgRatingTransformer: | |
| def __init__(self, est, class_mapping=None): | |
| self.est = est | |
| if class_mapping is None: | |
| self.class_mapping = {0:"MissedInfo", 1:"Low", 2:"Medium", 3:"High"} | |
| else: | |
| self.class_mapping = class_mapping | |
| def transform(self, X): | |
| X = X.copy() | |
| mask_non_zero = X != 0 | |
| Xt = np.zeros_like(X, dtype=int) | |
| if mask_non_zero.any(): | |
| Xt[mask_non_zero] = self.est.transform(X[mask_non_zero].reshape(-1,1)).flatten() + 1 | |
| X_transformed = np.array([self.class_mapping.get(v, "MissedInfo") for v in Xt]) | |
| return X_transformed | |
| # -------- Charger les objets sauvegardés -------- | |
| # Music | |
| scaler_samplerate_music = joblib.load("music/scaler_music_samplerate.joblib") | |
| scaler_age_days_music = joblib.load("music/scaler_music_age_days_log.joblib") | |
| username_freq_music = joblib.load("music/username_freq_dict_music.joblib") | |
| est_num_downloads_music = joblib.load("music/est_num_downloads_music.joblib") | |
| avg_rating_transformer_music = joblib.load("music/avg_rating_transformer_music.joblib") | |
| music_subcategory_cols = joblib.load("music/music_subcategory_cols.joblib") | |
| music_onehot_cols = joblib.load("music/music_onehot_cols.joblib") | |
| music_onehot_tags = joblib.load("music/music_onehot_tags.joblib") | |
| # -------- MODELS -------- | |
| # ============================= | |
| # Load ML models at runtime | |
| # ============================= | |
| os.makedirs("models_cache", exist_ok=True) | |
| # ---- MUSIC MODELS ---- | |
| music_model_num_downloads = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", # <-- important pour HF Hub | |
| filename="music_model_num_downloads.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| music_model_avg_rating = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="music_xgb_avg_rating.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| music_avg_rating_le = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="music_xgb_avg_rating_label_encoder.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| # ---- EFFECT SOUND MODELS ---- | |
| effect_model_num_downloads = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="effectSound_model_num_downloads.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| effect_model_avg_rating = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="effectSound_xgb_avg_rating.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| effect_avg_rating_le = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="effectSound_xgb_avg_rating_label_encoder.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| # Charger les listes de colonnes exactes utilisées pendant l'entraînement | |
| music_model_features = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="music_model_features_list.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| effect_model_features = joblib.load( | |
| hf_hub_download( | |
| repo_id="NIIHAAD/freesound-models", | |
| repo_type="model", | |
| filename="effect_model_features_list.joblib", | |
| cache_dir="models_cache" | |
| ) | |
| ) | |
| # Charger les listes | |
| music_model_features_raw = music_model_features | |
| effect_model_features_raw = effect_model_features | |
| # NETTOYAGE : Supprimer les doublons en gardant l'ordre | |
| music_model_features = list(dict.fromkeys(music_model_features_raw)) | |
| effect_model_features = list(dict.fromkeys(effect_model_features_raw)) | |
| print(f"Après nettoyage - Music: {len(music_model_features)} features") | |
| print(f"Après nettoyage - Effect: {len(effect_model_features)} features") | |
| # EffectSound | |
| scaler_samplerate_effect = joblib.load("effectSound/scaler_effectSamplerate.joblib") | |
| scaler_age_days_effect = joblib.load("effectSound/scaler_effectSound_age_days_log.joblib") | |
| username_freq_effect = joblib.load("effectSound/username_freq_dict_effectSound.joblib") | |
| est_num_downloads_effect = joblib.load("effectSound/est_num_downloads_effectSound.joblib") | |
| avg_rating_transformer_effect = joblib.load("effectSound/avg_rating_transformer_effectSound.joblib") | |
| effect_subcategory_cols = joblib.load("effectSound/effectSound_subcategory_cols.joblib") | |
| effect_onehot_cols = joblib.load("effectSound/effectSound_onehot_cols.joblib") | |
| effect_onehot_tags = joblib.load("effectSound/effect_onehot_tags.joblib") | |
| # GloVe pour description | |
| glove_model = api.load("glove-wiki-gigaword-100") | |
| # --- AJOUTE LE CODE ICI --- | |
| print("--- DIAGNOSTIC DES FEATURES ---") | |
| print(f"Nombre de features Music : {len(music_model_features)}") | |
| print(f"Doublons dans Music : {len(music_model_features) - len(set(music_model_features))}") | |
| print(f"Nombre de features Effect : {len(effect_model_features)}") | |
| print(f"Doublons dans Effect : {len(effect_model_features) - len(set(effect_model_features))}") | |
| print("-------------------------------") | |
| # --------------------------- | |
| # -------- Fonctions -------- | |
| def fetch_sound_metadata(sound_url): | |
| """Télécharge les métadonnées du son FreeSound""" | |
| sound_id = int(sound_url.rstrip("/").split("/")[-1]) | |
| sound = client.get_sound(sound_id) | |
| file_name = f"{sound.name.replace(' ', '_')}.mp3" | |
| file_path = os.path.join(dataset_dir, file_name) | |
| try: | |
| sound.retrieve_preview(dataset_dir, file_name) | |
| except Exception as e: | |
| print(f"Erreur téléchargement {file_name}: {e}") | |
| file_path = None | |
| data = { | |
| "file_path": file_path, | |
| "name": sound.name, | |
| "num_ratings": sound.num_ratings, | |
| "tags": ",".join(sound.tags) if getattr(sound, "tags", None) else "", | |
| "username": sound.username, | |
| "description": sound.description if sound.description else "", | |
| "created": getattr(sound, "created", ""), | |
| "license": getattr(sound, "license", ""), | |
| "num_downloads": getattr(sound, "num_downloads", 0), | |
| "channels": getattr(sound, "channels", 0), | |
| "filesize": getattr(sound, "filesize", 0), | |
| "num_comments": getattr(sound, "num_comments", 0), | |
| "category_is_user_provided": getattr(sound, "category_is_user_provided", 0), | |
| "duration": getattr(sound, "duration", 0), | |
| "avg_rating": getattr(sound, "avg_rating", 0), | |
| "category": getattr(sound, "category", "Unknown"), | |
| "subcategory": getattr(sound, "subcategory", "Other"), | |
| "type": getattr(sound, "type", ""), | |
| "samplerate": getattr(sound, "samplerate", 0) | |
| } | |
| return pd.DataFrame([data]) | |
| def description_to_vec(text, model, dim=100): | |
| if not text: | |
| return np.zeros(dim) | |
| words = text.lower().split() | |
| vecs = [model[w] for w in words if w in model] | |
| if len(vecs) == 0: | |
| return np.zeros(dim) | |
| return np.mean(vecs, axis=0) | |
| def preprocess_sound(df): | |
| """Applique le preprocessing complet selon duration pour choisir music ou effectSound""" | |
| df = df.copy() | |
| dur = df["duration"].iloc[0] | |
| if 0.5 <= dur <= 3: | |
| dataset_type = "effectSound" | |
| scaler_samplerate = scaler_samplerate_effect | |
| scaler_age = scaler_age_days_effect | |
| username_freq = username_freq_effect | |
| est_num_downloads = est_num_downloads_effect | |
| avg_rating_transformer = avg_rating_transformer_effect | |
| subcat_cols = effect_subcategory_cols | |
| onehot_cols = effect_onehot_cols | |
| onehot_tags = effect_onehot_tags | |
| elif 10 <= dur <= 60: | |
| dataset_type = "music" | |
| scaler_samplerate = scaler_samplerate_music | |
| scaler_age = scaler_age_days_music | |
| username_freq = username_freq_music | |
| est_num_downloads = est_num_downloads_music | |
| avg_rating_transformer = avg_rating_transformer_music | |
| subcat_cols = music_subcategory_cols | |
| onehot_cols = music_onehot_cols | |
| onehot_tags = music_onehot_tags | |
| else: | |
| return f"❌ Son trop court ou trop long ({dur} sec)" | |
| # ----------------- Features ----------------- | |
| # Category bool | |
| df["category_is_user_provided"] = df["category_is_user_provided"].astype(int) | |
| # Username frequency | |
| df["username_freq"] = df["username"].map(username_freq).fillna(0) | |
| # Numeric features | |
| for col in ["num_ratings", "num_comments", "filesize", "duration"]: | |
| df[col] = np.log1p(df[col]) | |
| df["samplerate"] = scaler_samplerate.transform(df[["samplerate"]]) | |
| # Age_days | |
| df["created"] = pd.to_datetime(df["created"], errors="coerce").dt.tz_localize(None) | |
| df["age_days"] = (pd.Timestamp.now() - df["created"]).dt.days | |
| df["age_days_log"] = np.log1p(df["age_days"]) | |
| df["age_days_log_scaled"] = scaler_age.transform(df[["age_days_log"]]) | |
| df = df.drop(columns=["created", "age_days", "age_days_log"]) | |
| # num_downloads | |
| df["num_downloads_class"] = est_num_downloads.transform(df[["num_downloads"]]) | |
| # avg_rating | |
| df["avg_rating"] = avg_rating_transformer.transform(df["avg_rating"].to_numpy()) | |
| # Subcategory | |
| for col in subcat_cols: | |
| df[col] = 0 # toutes les colonnes initialisées à 0 | |
| # activer 1 pour la bonne subcategory | |
| subcat_val = df["subcategory"].iloc[0] | |
| for col in subcat_cols: | |
| cat_name = col.replace("subcategory_", "") | |
| if subcat_val == cat_name: | |
| df[col] = 1 | |
| df.drop(columns=["subcategory"], inplace=True) | |
| # créer toutes les colonnes attendues à 0 | |
| for col in onehot_cols: | |
| if col not in df.columns: | |
| df[col] = 0 | |
| # activer les bonnes colonnes one-hot | |
| license_val = df.loc[0, "license"] | |
| category_val = df.loc[0, "category"] | |
| type_val = df.loc[0, "type"] | |
| for col_name in [ | |
| f"license_{license_val}", | |
| f"category_{category_val}", | |
| f"type_{type_val}", | |
| ]: | |
| if col_name in df.columns: | |
| df[col_name] = 1 | |
| # Tags | |
| # Si la colonne "tags" n'existe pas, on la crée avec une valeur vide | |
| for col in ["name", "tags", "description"]: | |
| if col not in df.columns: | |
| df[col] = "" | |
| df["tags_list"] = df["tags"].fillna("").astype(str).str.lower().str.split(",") | |
| # Si aucun tag n'existe ou que la liste est vide, mettre "Other" | |
| if not df["tags_list"].iloc[0] or df["tags_list"].iloc[0] == [""]: | |
| df["tags_list"] = [["Other"]] | |
| # One-hot sur toutes les colonnes enregistrées | |
| # 1️ Créer toutes les colonnes attendues avec 0 | |
| for col in onehot_tags: | |
| if col not in df.columns: | |
| df[col] = 0 | |
| # 2️ Activer seulement les colonnes correspondant aux tags existants | |
| tags_list = df["tags"].iloc[0].lower().split(",") if df["tags"].iloc[0] else [] | |
| for col in onehot_tags: | |
| tag_name = col.replace("tag_", "").lower() | |
| if tag_name in tags_list: | |
| df[col] = 1 | |
| # 3️ Supprimer la colonne temporaire | |
| df.drop(columns=["tags"], inplace=True) | |
| # Name | |
| df["name_clean"] = df["name"].astype(str).str.lower().str.rsplit(".", n=1).str[0] | |
| df = preprocess_name(df, vec_dim=8) | |
| df.drop(columns=["name","name_clean"], inplace=True) | |
| # Description | |
| desc_vec = description_to_vec(df["description"].iloc[0], glove_model) | |
| for i in range(100): | |
| df[f"description_glove_{i}"] = desc_vec[i] | |
| df.drop(columns=["description"], inplace=True) | |
| df.drop(columns=[ "license","category","type","created","subcategory","id","num_downloads","file_path","username"],inplace=True, errors="ignore") | |
| # --- SAFE REORDER (CRUCIAL) --- | |
| """ | |
| final_cols = [] | |
| for col in onehot_cols: | |
| if col in df.columns: | |
| final_cols.append(col) | |
| # subcategories | |
| for col in subcat_cols: | |
| if col in df.columns: | |
| final_cols.append(col) | |
| # le reste | |
| final_cols += [c for c in df.columns if c not in final_cols] | |
| df = df[final_cols] | |
| """ | |
| return df | |
| def xgb_predict_safe(model, X, label_encoder=None): | |
| booster_features = model.get_booster().feature_names | |
| X_safe = X.reindex(columns=booster_features, fill_value=0.0).astype(np.float32) | |
| dmatrix = xgb.DMatrix(X_safe.values, feature_names=list(booster_features)) | |
| pred = model.get_booster().predict(dmatrix)[0] | |
| if label_encoder is not None: | |
| # label_encoder est une liste de classes | |
| pred_int = int(round(pred)) | |
| if pred_int < 0: pred_int = 0 | |
| if pred_int >= len(label_encoder): pred_int = len(label_encoder) - 1 | |
| return label_encoder[pred_int] | |
| return pred | |
| # -------- Gradio -------- | |
| def predict_with_model(model, df_input, feat_list, le=None): | |
| """ | |
| On passe directement le DataFrame filtré pour éviter les erreurs de dictionnaire | |
| """ | |
| # 1. On s'assure de n'avoir que les colonnes attendues par le booster | |
| booster_feats = model.get_booster().feature_names | |
| # 2. On aligne le DataFrame sur ces colonnes précisément | |
| X_aligned = df_input.reindex(columns=booster_feats, fill_value=0.0).astype(float) | |
| # 3. Création de la DMatrix avec les noms de features officiels du modèle | |
| dmatrix = xgb.DMatrix(X_aligned.values, feature_names=booster_feats) | |
| # 4. Prédiction | |
| preds = model.get_booster().predict(dmatrix) | |
| pred_val = preds[0] | |
| # Si c'est une classification (plusieurs probabilités), on prend l'index max | |
| if len(preds.shape) > 1 and preds.shape[1] > 1: | |
| pred_int = int(np.argmax(pred_val)) | |
| else: | |
| pred_int = int(round(float(pred_val))) | |
| if le: | |
| try: | |
| return le.inverse_transform([pred_int])[0] | |
| except: | |
| return f"Classe inconnue ({pred_int})" | |
| return pred_int | |
| def predict_with_metadata(url): | |
| if url.strip() == "": | |
| return "❌ Veuillez entrer une URL FreeSound." | |
| # 1️ Récupérer les métadonnées brutes | |
| df_raw = fetch_sound_metadata(url) | |
| raw_lines = ["=== Métadonnées brutes ==="] | |
| for col in df_raw.columns: | |
| raw_lines.append(f"{col}: {df_raw[col].iloc[0]}") | |
| raw_str = "\n".join(raw_lines) | |
| # 2️ Vérifier la durée | |
| dur = df_raw["duration"].iloc[0] | |
| if dur < 0.5: | |
| return raw_str + f"\n\n❌ Son trop court ({dur} sec). Plage acceptée: 0.5-3 ou 10-60 sec" | |
| elif 3 < dur < 10 or dur > 60: | |
| return raw_str + f"\n\n❌ Son hors plage ({dur} sec). Plage acceptée: 0.5-3 ou 10-60 sec" | |
| # 3️ Prétraitement | |
| df_processed = preprocess_sound(df_raw) | |
| cols_to_remove = ["avg_rating", "num_downloads_class"] | |
| df_for_model = df_processed.drop(columns=[c for c in cols_to_remove if c in df_processed.columns]) | |
| # 4️ Choix modèle selon durée | |
| if 0.5 <= dur <= 3: | |
| model_nd = effect_model_num_downloads | |
| model_ar = effect_model_avg_rating | |
| model_features = effect_model_features | |
| sound_type = "EffectSound" | |
| else: | |
| model_nd = music_model_num_downloads | |
| model_ar = music_model_avg_rating | |
| model_features = music_model_features | |
| sound_type = "Music" | |
| # 5️ Forcer exactement les colonnes du modèle | |
| df_for_model = df_for_model.reindex(columns=model_features, fill_value=0.0).astype(float) | |
| # 6️ DMatrix XGBoost | |
| dmatrix = xgb.DMatrix(df_for_model.values, feature_names=list(df_for_model.columns)) | |
| # 7️ Faire les prédictions | |
| # On passe 'df_for_model' directement (qui est déjà un DataFrame) | |
| pred_num_downloads_val = predict_with_model(model_nd, df_for_model, model_features) | |
| # Mapping pour num_downloads si le modèle renvoie un entier | |
| NUM_DOWNLOADS_MAP = {0: "Low", 1: "Medium", 2: "High"} | |
| pred_num_downloads = NUM_DOWNLOADS_MAP.get(pred_num_downloads_val, str(pred_num_downloads_val)) | |
| # Prédiction du rating avec le LabelEncoder | |
| current_le = music_avg_rating_le if dur >= 10 else effect_avg_rating_le | |
| pred_avg_rating = predict_with_model(model_ar, df_for_model, model_features, le=current_le) | |
| # 8️ Affichage des features prétraitées | |
| processed_lines = ["\n=== Features après preprocessing ==="] | |
| for col in df_processed.columns: | |
| processed_lines.append(f"{col}: {df_processed[col].iloc[0]}") | |
| processed_str = "\n".join(processed_lines) | |
| # 9️ Résultat final | |
| prediction_lines = [ | |
| "\n=== Prédictions ===", | |
| f"Type détecté : {sound_type}", | |
| f"📥 Num downloads prédit : {pred_num_downloads}", | |
| f"⭐ Avg rating prédit : {pred_avg_rating}" | |
| ] | |
| prediction_str = "\n".join(prediction_lines) | |
| return raw_str + processed_str + prediction_str | |
| def preprocess_name(df, vec_dim=8): | |
| df = df.copy() | |
| # Calcul de la longueur du nom | |
| df["name_len"] = df["name_clean"].str.len() | |
| # HashingVectorizer pour transformer le texte en vecteur | |
| vectorizer = HashingVectorizer(n_features=vec_dim, alternate_sign=False, norm=None) | |
| name_vec_sparse = vectorizer.transform(df["name_clean"]) | |
| name_vec_df = pd.DataFrame( | |
| name_vec_sparse.toarray(), | |
| columns=[f"name_vec_{i}" for i in range(vec_dim)], | |
| index=df.index | |
| ) | |
| df = pd.concat([df, name_vec_df], axis=1) | |
| return df | |
| with gr.Blocks(title="FreeSound Popularity Detector") as demo: | |
| gr.Markdown("# 🎧 FreeSound Popularity Detector") | |
| gr.Markdown("Collez l'URL d'un son FreeSound et le preprocessing complet sera appliqué automatiquement.") | |
| url_input = gr.Textbox(label="URL du son FreeSound") | |
| btn_meta = gr.Button("📊 Prétraiter et afficher features") | |
| output = gr.Textbox(label="Résultat") | |
| btn_meta.click(fn=predict_with_metadata, inputs=url_input, outputs=output) | |
| demo.launch() | |