NIIHAAD's picture
Update app.py
d0b8c26 verified
import gradio as gr
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import KBinsDiscretizer, StandardScaler, OneHotEncoder
from sklearn.feature_extraction.text import HashingVectorizer
from collections import Counter
import joblib
import freesound
import gensim.downloader as api
from huggingface_hub import hf_hub_download
import xgboost as xgb
# -------- FreeSound API --------
client = freesound.FreesoundClient()
client.set_token("zE9NjEOgUMzH9K7mjiGBaPJiNwJLjSM53LevarRK", "token")
dataset_dir = "dataset_audio"
os.makedirs(dataset_dir, exist_ok=True)
class AvgRatingTransformer:
def __init__(self, est, class_mapping=None):
self.est = est
if class_mapping is None:
self.class_mapping = {0:"MissedInfo", 1:"Low", 2:"Medium", 3:"High"}
else:
self.class_mapping = class_mapping
def transform(self, X):
X = X.copy()
mask_non_zero = X != 0
Xt = np.zeros_like(X, dtype=int)
if mask_non_zero.any():
Xt[mask_non_zero] = self.est.transform(X[mask_non_zero].reshape(-1,1)).flatten() + 1
X_transformed = np.array([self.class_mapping.get(v, "MissedInfo") for v in Xt])
return X_transformed
# -------- Charger les objets sauvegardés --------
# Music
scaler_samplerate_music = joblib.load("music/scaler_music_samplerate.joblib")
scaler_age_days_music = joblib.load("music/scaler_music_age_days_log.joblib")
username_freq_music = joblib.load("music/username_freq_dict_music.joblib")
est_num_downloads_music = joblib.load("music/est_num_downloads_music.joblib")
avg_rating_transformer_music = joblib.load("music/avg_rating_transformer_music.joblib")
music_subcategory_cols = joblib.load("music/music_subcategory_cols.joblib")
music_onehot_cols = joblib.load("music/music_onehot_cols.joblib")
music_onehot_tags = joblib.load("music/music_onehot_tags.joblib")
# -------- MODELS --------
# =============================
# Load ML models at runtime
# =============================
os.makedirs("models_cache", exist_ok=True)
# ---- MUSIC MODELS ----
music_model_num_downloads = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model", # <-- important pour HF Hub
filename="music_model_num_downloads.joblib",
cache_dir="models_cache"
)
)
music_model_avg_rating = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="music_xgb_avg_rating.joblib",
cache_dir="models_cache"
)
)
music_avg_rating_le = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="music_xgb_avg_rating_label_encoder.joblib",
cache_dir="models_cache"
)
)
# ---- EFFECT SOUND MODELS ----
effect_model_num_downloads = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="effectSound_model_num_downloads.joblib",
cache_dir="models_cache"
)
)
effect_model_avg_rating = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="effectSound_xgb_avg_rating.joblib",
cache_dir="models_cache"
)
)
effect_avg_rating_le = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="effectSound_xgb_avg_rating_label_encoder.joblib",
cache_dir="models_cache"
)
)
# Charger les listes de colonnes exactes utilisées pendant l'entraînement
music_model_features = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="music_model_features_list.joblib",
cache_dir="models_cache"
)
)
effect_model_features = joblib.load(
hf_hub_download(
repo_id="NIIHAAD/freesound-models",
repo_type="model",
filename="effect_model_features_list.joblib",
cache_dir="models_cache"
)
)
# Charger les listes
music_model_features_raw = music_model_features
effect_model_features_raw = effect_model_features
# NETTOYAGE : Supprimer les doublons en gardant l'ordre
music_model_features = list(dict.fromkeys(music_model_features_raw))
effect_model_features = list(dict.fromkeys(effect_model_features_raw))
print(f"Après nettoyage - Music: {len(music_model_features)} features")
print(f"Après nettoyage - Effect: {len(effect_model_features)} features")
# EffectSound
scaler_samplerate_effect = joblib.load("effectSound/scaler_effectSamplerate.joblib")
scaler_age_days_effect = joblib.load("effectSound/scaler_effectSound_age_days_log.joblib")
username_freq_effect = joblib.load("effectSound/username_freq_dict_effectSound.joblib")
est_num_downloads_effect = joblib.load("effectSound/est_num_downloads_effectSound.joblib")
avg_rating_transformer_effect = joblib.load("effectSound/avg_rating_transformer_effectSound.joblib")
effect_subcategory_cols = joblib.load("effectSound/effectSound_subcategory_cols.joblib")
effect_onehot_cols = joblib.load("effectSound/effectSound_onehot_cols.joblib")
effect_onehot_tags = joblib.load("effectSound/effect_onehot_tags.joblib")
# GloVe pour description
glove_model = api.load("glove-wiki-gigaword-100")
# --- AJOUTE LE CODE ICI ---
print("--- DIAGNOSTIC DES FEATURES ---")
print(f"Nombre de features Music : {len(music_model_features)}")
print(f"Doublons dans Music : {len(music_model_features) - len(set(music_model_features))}")
print(f"Nombre de features Effect : {len(effect_model_features)}")
print(f"Doublons dans Effect : {len(effect_model_features) - len(set(effect_model_features))}")
print("-------------------------------")
# ---------------------------
# -------- Fonctions --------
def fetch_sound_metadata(sound_url):
"""Télécharge les métadonnées du son FreeSound"""
sound_id = int(sound_url.rstrip("/").split("/")[-1])
sound = client.get_sound(sound_id)
file_name = f"{sound.name.replace(' ', '_')}.mp3"
file_path = os.path.join(dataset_dir, file_name)
try:
sound.retrieve_preview(dataset_dir, file_name)
except Exception as e:
print(f"Erreur téléchargement {file_name}: {e}")
file_path = None
data = {
"file_path": file_path,
"name": sound.name,
"num_ratings": sound.num_ratings,
"tags": ",".join(sound.tags) if getattr(sound, "tags", None) else "",
"username": sound.username,
"description": sound.description if sound.description else "",
"created": getattr(sound, "created", ""),
"license": getattr(sound, "license", ""),
"num_downloads": getattr(sound, "num_downloads", 0),
"channels": getattr(sound, "channels", 0),
"filesize": getattr(sound, "filesize", 0),
"num_comments": getattr(sound, "num_comments", 0),
"category_is_user_provided": getattr(sound, "category_is_user_provided", 0),
"duration": getattr(sound, "duration", 0),
"avg_rating": getattr(sound, "avg_rating", 0),
"category": getattr(sound, "category", "Unknown"),
"subcategory": getattr(sound, "subcategory", "Other"),
"type": getattr(sound, "type", ""),
"samplerate": getattr(sound, "samplerate", 0)
}
return pd.DataFrame([data])
def description_to_vec(text, model, dim=100):
if not text:
return np.zeros(dim)
words = text.lower().split()
vecs = [model[w] for w in words if w in model]
if len(vecs) == 0:
return np.zeros(dim)
return np.mean(vecs, axis=0)
def preprocess_sound(df):
"""Applique le preprocessing complet selon duration pour choisir music ou effectSound"""
df = df.copy()
dur = df["duration"].iloc[0]
if 0.5 <= dur <= 3:
dataset_type = "effectSound"
scaler_samplerate = scaler_samplerate_effect
scaler_age = scaler_age_days_effect
username_freq = username_freq_effect
est_num_downloads = est_num_downloads_effect
avg_rating_transformer = avg_rating_transformer_effect
subcat_cols = effect_subcategory_cols
onehot_cols = effect_onehot_cols
onehot_tags = effect_onehot_tags
elif 10 <= dur <= 60:
dataset_type = "music"
scaler_samplerate = scaler_samplerate_music
scaler_age = scaler_age_days_music
username_freq = username_freq_music
est_num_downloads = est_num_downloads_music
avg_rating_transformer = avg_rating_transformer_music
subcat_cols = music_subcategory_cols
onehot_cols = music_onehot_cols
onehot_tags = music_onehot_tags
else:
return f"❌ Son trop court ou trop long ({dur} sec)"
# ----------------- Features -----------------
# Category bool
df["category_is_user_provided"] = df["category_is_user_provided"].astype(int)
# Username frequency
df["username_freq"] = df["username"].map(username_freq).fillna(0)
# Numeric features
for col in ["num_ratings", "num_comments", "filesize", "duration"]:
df[col] = np.log1p(df[col])
df["samplerate"] = scaler_samplerate.transform(df[["samplerate"]])
# Age_days
df["created"] = pd.to_datetime(df["created"], errors="coerce").dt.tz_localize(None)
df["age_days"] = (pd.Timestamp.now() - df["created"]).dt.days
df["age_days_log"] = np.log1p(df["age_days"])
df["age_days_log_scaled"] = scaler_age.transform(df[["age_days_log"]])
df = df.drop(columns=["created", "age_days", "age_days_log"])
# num_downloads
df["num_downloads_class"] = est_num_downloads.transform(df[["num_downloads"]])
# avg_rating
df["avg_rating"] = avg_rating_transformer.transform(df["avg_rating"].to_numpy())
# Subcategory
for col in subcat_cols:
df[col] = 0 # toutes les colonnes initialisées à 0
# activer 1 pour la bonne subcategory
subcat_val = df["subcategory"].iloc[0]
for col in subcat_cols:
cat_name = col.replace("subcategory_", "")
if subcat_val == cat_name:
df[col] = 1
df.drop(columns=["subcategory"], inplace=True)
# créer toutes les colonnes attendues à 0
for col in onehot_cols:
if col not in df.columns:
df[col] = 0
# activer les bonnes colonnes one-hot
license_val = df.loc[0, "license"]
category_val = df.loc[0, "category"]
type_val = df.loc[0, "type"]
for col_name in [
f"license_{license_val}",
f"category_{category_val}",
f"type_{type_val}",
]:
if col_name in df.columns:
df[col_name] = 1
# Tags
# Si la colonne "tags" n'existe pas, on la crée avec une valeur vide
for col in ["name", "tags", "description"]:
if col not in df.columns:
df[col] = ""
df["tags_list"] = df["tags"].fillna("").astype(str).str.lower().str.split(",")
# Si aucun tag n'existe ou que la liste est vide, mettre "Other"
if not df["tags_list"].iloc[0] or df["tags_list"].iloc[0] == [""]:
df["tags_list"] = [["Other"]]
# One-hot sur toutes les colonnes enregistrées
# 1️ Créer toutes les colonnes attendues avec 0
for col in onehot_tags:
if col not in df.columns:
df[col] = 0
# 2️ Activer seulement les colonnes correspondant aux tags existants
tags_list = df["tags"].iloc[0].lower().split(",") if df["tags"].iloc[0] else []
for col in onehot_tags:
tag_name = col.replace("tag_", "").lower()
if tag_name in tags_list:
df[col] = 1
# 3️ Supprimer la colonne temporaire
df.drop(columns=["tags"], inplace=True)
# Name
df["name_clean"] = df["name"].astype(str).str.lower().str.rsplit(".", n=1).str[0]
df = preprocess_name(df, vec_dim=8)
df.drop(columns=["name","name_clean"], inplace=True)
# Description
desc_vec = description_to_vec(df["description"].iloc[0], glove_model)
for i in range(100):
df[f"description_glove_{i}"] = desc_vec[i]
df.drop(columns=["description"], inplace=True)
df.drop(columns=[ "license","category","type","created","subcategory","id","num_downloads","file_path","username"],inplace=True, errors="ignore")
# --- SAFE REORDER (CRUCIAL) ---
"""
final_cols = []
for col in onehot_cols:
if col in df.columns:
final_cols.append(col)
# subcategories
for col in subcat_cols:
if col in df.columns:
final_cols.append(col)
# le reste
final_cols += [c for c in df.columns if c not in final_cols]
df = df[final_cols]
"""
return df
def xgb_predict_safe(model, X, label_encoder=None):
booster_features = model.get_booster().feature_names
X_safe = X.reindex(columns=booster_features, fill_value=0.0).astype(np.float32)
dmatrix = xgb.DMatrix(X_safe.values, feature_names=list(booster_features))
pred = model.get_booster().predict(dmatrix)[0]
if label_encoder is not None:
# label_encoder est une liste de classes
pred_int = int(round(pred))
if pred_int < 0: pred_int = 0
if pred_int >= len(label_encoder): pred_int = len(label_encoder) - 1
return label_encoder[pred_int]
return pred
# -------- Gradio --------
def predict_with_model(model, df_input, feat_list, le=None):
"""
On passe directement le DataFrame filtré pour éviter les erreurs de dictionnaire
"""
# 1. On s'assure de n'avoir que les colonnes attendues par le booster
booster_feats = model.get_booster().feature_names
# 2. On aligne le DataFrame sur ces colonnes précisément
X_aligned = df_input.reindex(columns=booster_feats, fill_value=0.0).astype(float)
# 3. Création de la DMatrix avec les noms de features officiels du modèle
dmatrix = xgb.DMatrix(X_aligned.values, feature_names=booster_feats)
# 4. Prédiction
preds = model.get_booster().predict(dmatrix)
pred_val = preds[0]
# Si c'est une classification (plusieurs probabilités), on prend l'index max
if len(preds.shape) > 1 and preds.shape[1] > 1:
pred_int = int(np.argmax(pred_val))
else:
pred_int = int(round(float(pred_val)))
if le:
try:
return le.inverse_transform([pred_int])[0]
except:
return f"Classe inconnue ({pred_int})"
return pred_int
def predict_with_metadata(url):
if url.strip() == "":
return "❌ Veuillez entrer une URL FreeSound."
# 1️ Récupérer les métadonnées brutes
df_raw = fetch_sound_metadata(url)
raw_lines = ["=== Métadonnées brutes ==="]
for col in df_raw.columns:
raw_lines.append(f"{col}: {df_raw[col].iloc[0]}")
raw_str = "\n".join(raw_lines)
# 2️ Vérifier la durée
dur = df_raw["duration"].iloc[0]
if dur < 0.5:
return raw_str + f"\n\n❌ Son trop court ({dur} sec). Plage acceptée: 0.5-3 ou 10-60 sec"
elif 3 < dur < 10 or dur > 60:
return raw_str + f"\n\n❌ Son hors plage ({dur} sec). Plage acceptée: 0.5-3 ou 10-60 sec"
# 3️ Prétraitement
df_processed = preprocess_sound(df_raw)
cols_to_remove = ["avg_rating", "num_downloads_class"]
df_for_model = df_processed.drop(columns=[c for c in cols_to_remove if c in df_processed.columns])
# 4️ Choix modèle selon durée
if 0.5 <= dur <= 3:
model_nd = effect_model_num_downloads
model_ar = effect_model_avg_rating
model_features = effect_model_features
sound_type = "EffectSound"
else:
model_nd = music_model_num_downloads
model_ar = music_model_avg_rating
model_features = music_model_features
sound_type = "Music"
# 5️ Forcer exactement les colonnes du modèle
df_for_model = df_for_model.reindex(columns=model_features, fill_value=0.0).astype(float)
# 6️ DMatrix XGBoost
dmatrix = xgb.DMatrix(df_for_model.values, feature_names=list(df_for_model.columns))
# 7️ Faire les prédictions
# On passe 'df_for_model' directement (qui est déjà un DataFrame)
pred_num_downloads_val = predict_with_model(model_nd, df_for_model, model_features)
# Mapping pour num_downloads si le modèle renvoie un entier
NUM_DOWNLOADS_MAP = {0: "Low", 1: "Medium", 2: "High"}
pred_num_downloads = NUM_DOWNLOADS_MAP.get(pred_num_downloads_val, str(pred_num_downloads_val))
# Prédiction du rating avec le LabelEncoder
current_le = music_avg_rating_le if dur >= 10 else effect_avg_rating_le
pred_avg_rating = predict_with_model(model_ar, df_for_model, model_features, le=current_le)
# 8️ Affichage des features prétraitées
processed_lines = ["\n=== Features après preprocessing ==="]
for col in df_processed.columns:
processed_lines.append(f"{col}: {df_processed[col].iloc[0]}")
processed_str = "\n".join(processed_lines)
# 9️ Résultat final
prediction_lines = [
"\n=== Prédictions ===",
f"Type détecté : {sound_type}",
f"📥 Num downloads prédit : {pred_num_downloads}",
f"⭐ Avg rating prédit : {pred_avg_rating}"
]
prediction_str = "\n".join(prediction_lines)
return raw_str + processed_str + prediction_str
def preprocess_name(df, vec_dim=8):
df = df.copy()
# Calcul de la longueur du nom
df["name_len"] = df["name_clean"].str.len()
# HashingVectorizer pour transformer le texte en vecteur
vectorizer = HashingVectorizer(n_features=vec_dim, alternate_sign=False, norm=None)
name_vec_sparse = vectorizer.transform(df["name_clean"])
name_vec_df = pd.DataFrame(
name_vec_sparse.toarray(),
columns=[f"name_vec_{i}" for i in range(vec_dim)],
index=df.index
)
df = pd.concat([df, name_vec_df], axis=1)
return df
with gr.Blocks(title="FreeSound Popularity Detector") as demo:
gr.Markdown("# 🎧 FreeSound Popularity Detector")
gr.Markdown("Collez l'URL d'un son FreeSound et le preprocessing complet sera appliqué automatiquement.")
url_input = gr.Textbox(label="URL du son FreeSound")
btn_meta = gr.Button("📊 Prétraiter et afficher features")
output = gr.Textbox(label="Résultat")
btn_meta.click(fn=predict_with_metadata, inputs=url_input, outputs=output)
demo.launch()