| import pandas as pd |
| import numpy as np |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import StandardScaler, LabelEncoder |
| from sklearn.metrics import classification_report, accuracy_score |
| from lightgbm import LGBMClassifier |
| import pickle |
| import os |
| import sys |
|
|
| CSV_PATH = "all_genres_clean.csv" |
| MODEL_PATH = "genre_model.pkl" |
| SCALER_PATH = "genre_scaler.pkl" |
| ENCODER_PATH = "genre_encoder.pkl" |
|
|
| NUMERICAL_FEATURES = [ |
| "melody_complexity (vocals)", |
| "melody_range (vocals)", |
| "melody_variability (vocals)", |
| "tempo_bpm_original (mix)", |
| "danceability custom (mix)", |
| "loudness_integrated_lufs custom (mix)", |
| "loudness_range_lu custom (mix)", |
| "energy_librosa (mix)", |
| "energy_librosa_std (mix)", |
| "energy_essentia (mix)", |
| "energy_essentia_std (mix)", |
| "energy_combined (mix)", |
| "spectral_centroid_mean custom (mix)", |
| "mfcc_mean_1 (mix)", |
| "mfcc_mean_2 (mix)", |
| "chroma_mean (mix)", |
| "spectral_contrast_mean (mix)", |
| "repetition_score custom (mix)", |
| "pitch_mean (mix)", |
| "pitch_std (mix)", |
| "rms_energy_mean (mix)", |
| "rms_energy_std (mix)", |
| "zero_crossing_rate (mix)", |
| ] |
|
|
|
|
| def engineer_features(df, feature_cols): |
| df = df.copy() |
| |
| df['energy_per_tempo'] = df['energy_combined (mix)'] / (df['tempo_bpm_original (mix)'] + 1) |
| df['dance_energy_ratio'] = df['danceability custom (mix)'] * df['energy_combined (mix)'] |
| df['loudness_range_ratio'] = df['loudness_range_lu custom (mix)'] / (abs(df['loudness_integrated_lufs custom (mix)']) + 1) |
| df['melody_energy'] = df['melody_variability (vocals)'] * df['energy_combined (mix)'] |
| df['spectral_complexity'] = df['spectral_centroid_mean custom (mix)'] * df['spectral_contrast_mean (mix)'] |
| df['mfcc_ratio'] = df['mfcc_mean_1 (mix)'] / (abs(df['mfcc_mean_2 (mix)']) + 1) |
| df['rhythm_strength'] = df['tempo_bpm_original (mix)'] * df['danceability custom (mix)'] |
| df['pitch_variation'] = df['pitch_std (mix)'] / (df['pitch_mean (mix)'] + 1) |
| df['rms_energy_ratio'] = df['rms_energy_mean (mix)'] / (df['rms_energy_std (mix)'] + 1) |
| df['chroma_energy'] = df['chroma_mean (mix)'] * df['energy_combined (mix)'] |
| df['zero_tempo'] = df['zero_crossing_rate (mix)'] * df['tempo_bpm_original (mix)'] |
| df['tempo_category'] = np.where(df['tempo_bpm_original (mix)'] < 100, 0, |
| np.where(df['tempo_bpm_original (mix)'] < 130, 1, 2)) |
| df['energy_category'] = np.where(df['energy_combined (mix)'] < 0.3, 0, |
| np.where(df['energy_combined (mix)'] < 0.6, 1, 2)) |
| df['dance_category'] = np.where(df['danceability custom (mix)'] < 0.5, 0, |
| np.where(df['danceability custom (mix)'] < 0.75, 1, 2)) |
| |
| engineered = [ |
| 'energy_per_tempo', 'dance_energy_ratio', 'loudness_range_ratio', |
| 'melody_energy', 'spectral_complexity', 'mfcc_ratio', 'rhythm_strength', |
| 'pitch_variation', 'rms_energy_ratio', 'chroma_energy', 'zero_tempo', |
| 'tempo_category', 'energy_category', 'dance_category' |
| ] |
| |
| return df, feature_cols + engineered |
|
|
|
|
| def load_and_preprocess_data(): |
| print("Loading data...") |
| df = pd.read_csv(CSV_PATH) |
| print(f"Total songs: {len(df)}") |
|
|
| df_sub = df["sub_genres"].fillna("[]") |
| all_subgenres = set() |
| for subs in df_sub: |
| try: |
| if pd.notna(subs) and subs != "[]": |
| cleaned = subs.replace("[", "").replace("]", "").replace("'", "") |
| for s in cleaned.split(","): |
| s = s.strip() |
| if s: |
| all_subgenres.add(s) |
| except: |
| pass |
| all_subgenres = sorted(list(all_subgenres)) |
| print(f"Sub-genres found: {len(all_subgenres)}") |
|
|
| genre_counts = df["genre"].value_counts() |
| print(f"\nGenre distribution ({len(genre_counts)} genres):") |
| for genre, count in list(genre_counts.items())[:12]: |
| print(f" {genre}: {count}") |
|
|
| df_sampled, all_features = engineer_features(df, NUMERICAL_FEATURES) |
| X = df_sampled[all_features].copy() |
| X = X.fillna(X.mean()) |
| X = X.replace([np.inf, -np.inf], 0) |
|
|
| y_genre = df_sampled["genre"].fillna("Unknown") |
|
|
| genre_encoder = LabelEncoder() |
| y_genre_encoded = genre_encoder.fit_transform(y_genre) |
|
|
| print(f"\nGenres: {list(genre_encoder.classes_)}") |
| print(f"Total features: {len(all_features)}") |
|
|
| return X, y_genre_encoded, genre_encoder, all_subgenres, all_features |
|
|
|
|
| def train_model(X, y_genre, genre_encoder, all_subgenres, all_features): |
| print("\n" + "=" * 60) |
| print("TRAINING MODEL") |
| print("=" * 60) |
|
|
| print("\nSplitting data (80% train, 20% test)...") |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y_genre, test_size=0.2, random_state=42 |
| ) |
| print(f" Train: {len(X_train)}, Test: {len(X_test)}") |
|
|
| print("\nScaling features...") |
| scaler = StandardScaler() |
| X_train_scaled = scaler.fit_transform(X_train) |
| X_test_scaled = scaler.transform(X_test) |
|
|
| print("\nTraining LightGBM...") |
| |
| model = LGBMClassifier( |
| n_estimators=500, |
| max_depth=30, |
| learning_rate=0.05, |
| subsample=0.9, |
| colsample_bytree=0.9, |
| min_child_samples=20, |
| num_leaves=100, |
| n_jobs=-1, |
| random_state=42, |
| verbose=-1 |
| ) |
|
|
| model.fit(X_train_scaled, y_train) |
|
|
| print("\nEvaluating...") |
| y_pred = model.predict(X_test_scaled) |
| accuracy = accuracy_score(y_test, y_pred) |
|
|
| print("\n" + "=" * 60) |
| print("CLASSIFICATION REPORT") |
| print("=" * 60) |
| print(classification_report(y_test, y_pred, target_names=genre_encoder.classes_, zero_division=0)) |
|
|
| print(f"\n{'='*60}") |
| print(f"ACCURACY: {accuracy:.2%}") |
| print(f"{'='*60}") |
| print("\nNote: ~50% accuracy is typical for 12-genre classification.") |
| print("Genres overlap heavily in audio features.") |
|
|
| feature_importance = pd.DataFrame({ |
| 'feature': all_features, |
| 'importance': model.feature_importances_ |
| }).sort_values('importance', ascending=False) |
|
|
| print("\nTop 10 Features:") |
| for _, row in feature_importance.head(10).iterrows(): |
| print(f" {row['feature']}: {row['importance']:.0f}") |
|
|
| print("\nSaving model...") |
| with open(MODEL_PATH, "wb") as f: |
| pickle.dump(model, f) |
| with open(SCALER_PATH, "wb") as f: |
| pickle.dump(scaler, f) |
| with open(ENCODER_PATH, "wb") as f: |
| pickle.dump((genre_encoder, all_subgenres, all_features), f) |
|
|
| return model, scaler, genre_encoder, all_subgenres, all_features |
|
|
|
|
| def load_model(): |
| print("Loading model...") |
| with open(MODEL_PATH, "rb") as f: |
| model = pickle.load(f) |
| with open(SCALER_PATH, "rb") as f: |
| scaler = pickle.load(f) |
| with open(ENCODER_PATH, "rb") as f: |
| genre_encoder, all_subgenres, all_features = pickle.load(f) |
| return model, scaler, genre_encoder, all_subgenres, all_features |
|
|
|
|
| def predict(input_values, model, scaler, genre_encoder, all_subgenres): |
| input_array = np.array(input_values).reshape(1, -1) |
| input_scaled = scaler.transform(input_array) |
|
|
| genre_idx = model.predict(input_scaled)[0] |
| genre = genre_encoder.inverse_transform([genre_idx])[0] |
|
|
| genre_probs = model.predict_proba(input_scaled)[0] |
| top_indices = np.argsort(genre_probs)[::-1][:5] |
| similar = [(genre_encoder.classes_[i], genre_probs[i]) for i in top_indices] |
|
|
| related_subs = [s for s in all_subgenres if genre.lower() in s.lower()] |
| if not related_subs: |
| related_subs = all_subgenres[:10] |
|
|
| return genre, similar, related_subs |
|
|
|
|
| def print_result(genre, similar, subgenres): |
| print("\n" + "=" * 60) |
| print("PREDICTION RESULTS") |
| print("=" * 60) |
| print(f"\n GENRE: {genre}") |
| print(f"\n Similar Genres:") |
| for g, prob in similar: |
| bar = "#" * int(prob * 20) + "-" * (20 - int(prob * 20)) |
| print(f" [{bar}] {g}: {prob:.1%}") |
| print(f"\n Sub-genres in {genre}:") |
| for sub in subgenres[:10]: |
| print(f" - {sub}") |
| print("=" * 60) |
|
|
|
|
| def get_random_values(all_features): |
| df = pd.read_csv(CSV_PATH, nrows=5000) |
| df, _ = engineer_features(df, NUMERICAL_FEATURES) |
| X = df[all_features].fillna(df[all_features].mean()) |
| X = X.replace([np.inf, -np.inf], 0) |
| idx = np.random.randint(0, len(X)) |
| return X.iloc[idx].values.tolist() |
|
|
|
|
| def main(): |
| if "--train" in sys.argv or not os.path.exists(MODEL_PATH): |
| X, y_genre, genre_encoder, all_subgenres, all_features = load_and_preprocess_data() |
| model, scaler, genre_encoder, all_subgenres, all_features = train_model( |
| X, y_genre, genre_encoder, all_subgenres, all_features |
| ) |
| else: |
| model, scaler, genre_encoder, all_subgenres, all_features = load_model() |
|
|
| if "--demo" in sys.argv: |
| print("\n" + "=" * 60) |
| print("DEMO PREDICTIONS") |
| print("=" * 60) |
| for i in range(3): |
| print(f"\n[Demo {i+1}]") |
| values = get_random_values(all_features) |
| for j, feat in enumerate(NUMERICAL_FEATURES[:5]): |
| print(f" {feat}: {values[j]:.4f}") |
| genre, similar, subs = predict(values, model, scaler, genre_encoder, all_subgenres) |
| print_result(genre, similar, subs) |
| return |
|
|
| if "--predict" in sys.argv: |
| idx = sys.argv.index("--predict") |
| if idx + 1 < len(sys.argv): |
| try: |
| values = [float(x) for x in sys.argv[idx + 1 : idx + 1 + len(all_features)]] |
| genre, similar, subs = predict(values, model, scaler, genre_encoder, all_subgenres) |
| print_result(genre, similar, subs) |
| return |
| except ValueError as e: |
| print(f"Error: {e}") |
| return |
|
|
| print("\nUsage:") |
| print(" python genre_predictor.py --train # Train model") |
| print(" python genre_predictor.py --demo # Demo predictions") |
| print(f" python genre_predictor.py --predict <{len(all_features)} values>") |
| print(f"\nFeatures ({len(all_features)}):") |
| for i, f in enumerate(all_features, 1): |
| print(f" {i}. {f}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|