import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.metrics import classification_report, accuracy_score from lightgbm import LGBMClassifier import pickle import os import sys CSV_PATH = "all_genres_clean.csv" MODEL_PATH = "genre_model.pkl" SCALER_PATH = "genre_scaler.pkl" ENCODER_PATH = "genre_encoder.pkl" NUMERICAL_FEATURES = [ "melody_complexity (vocals)", "melody_range (vocals)", "melody_variability (vocals)", "tempo_bpm_original (mix)", "danceability custom (mix)", "loudness_integrated_lufs custom (mix)", "loudness_range_lu custom (mix)", "energy_librosa (mix)", "energy_librosa_std (mix)", "energy_essentia (mix)", "energy_essentia_std (mix)", "energy_combined (mix)", "spectral_centroid_mean custom (mix)", "mfcc_mean_1 (mix)", "mfcc_mean_2 (mix)", "chroma_mean (mix)", "spectral_contrast_mean (mix)", "repetition_score custom (mix)", "pitch_mean (mix)", "pitch_std (mix)", "rms_energy_mean (mix)", "rms_energy_std (mix)", "zero_crossing_rate (mix)", ] def engineer_features(df, feature_cols): df = df.copy() df['energy_per_tempo'] = df['energy_combined (mix)'] / (df['tempo_bpm_original (mix)'] + 1) df['dance_energy_ratio'] = df['danceability custom (mix)'] * df['energy_combined (mix)'] df['loudness_range_ratio'] = df['loudness_range_lu custom (mix)'] / (abs(df['loudness_integrated_lufs custom (mix)']) + 1) df['melody_energy'] = df['melody_variability (vocals)'] * df['energy_combined (mix)'] df['spectral_complexity'] = df['spectral_centroid_mean custom (mix)'] * df['spectral_contrast_mean (mix)'] df['mfcc_ratio'] = df['mfcc_mean_1 (mix)'] / (abs(df['mfcc_mean_2 (mix)']) + 1) df['rhythm_strength'] = df['tempo_bpm_original (mix)'] * df['danceability custom (mix)'] df['pitch_variation'] = df['pitch_std (mix)'] / (df['pitch_mean (mix)'] + 1) df['rms_energy_ratio'] = df['rms_energy_mean (mix)'] / (df['rms_energy_std (mix)'] + 1) df['chroma_energy'] = df['chroma_mean (mix)'] * df['energy_combined (mix)'] df['zero_tempo'] = df['zero_crossing_rate (mix)'] * df['tempo_bpm_original (mix)'] df['tempo_category'] = np.where(df['tempo_bpm_original (mix)'] < 100, 0, np.where(df['tempo_bpm_original (mix)'] < 130, 1, 2)) df['energy_category'] = np.where(df['energy_combined (mix)'] < 0.3, 0, np.where(df['energy_combined (mix)'] < 0.6, 1, 2)) df['dance_category'] = np.where(df['danceability custom (mix)'] < 0.5, 0, np.where(df['danceability custom (mix)'] < 0.75, 1, 2)) engineered = [ 'energy_per_tempo', 'dance_energy_ratio', 'loudness_range_ratio', 'melody_energy', 'spectral_complexity', 'mfcc_ratio', 'rhythm_strength', 'pitch_variation', 'rms_energy_ratio', 'chroma_energy', 'zero_tempo', 'tempo_category', 'energy_category', 'dance_category' ] return df, feature_cols + engineered def load_and_preprocess_data(): print("Loading data...") df = pd.read_csv(CSV_PATH) print(f"Total songs: {len(df)}") df_sub = df["sub_genres"].fillna("[]") all_subgenres = set() for subs in df_sub: try: if pd.notna(subs) and subs != "[]": cleaned = subs.replace("[", "").replace("]", "").replace("'", "") for s in cleaned.split(","): s = s.strip() if s: all_subgenres.add(s) except: pass all_subgenres = sorted(list(all_subgenres)) print(f"Sub-genres found: {len(all_subgenres)}") genre_counts = df["genre"].value_counts() print(f"\nGenre distribution ({len(genre_counts)} genres):") for genre, count in list(genre_counts.items())[:12]: print(f" {genre}: {count}") df_sampled, all_features = engineer_features(df, NUMERICAL_FEATURES) X = df_sampled[all_features].copy() X = X.fillna(X.mean()) X = X.replace([np.inf, -np.inf], 0) y_genre = df_sampled["genre"].fillna("Unknown") genre_encoder = LabelEncoder() y_genre_encoded = genre_encoder.fit_transform(y_genre) print(f"\nGenres: {list(genre_encoder.classes_)}") print(f"Total features: {len(all_features)}") return X, y_genre_encoded, genre_encoder, all_subgenres, all_features def train_model(X, y_genre, genre_encoder, all_subgenres, all_features): print("\n" + "=" * 60) print("TRAINING MODEL") print("=" * 60) print("\nSplitting data (80% train, 20% test)...") X_train, X_test, y_train, y_test = train_test_split( X, y_genre, test_size=0.2, random_state=42 ) print(f" Train: {len(X_train)}, Test: {len(X_test)}") print("\nScaling features...") scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) print("\nTraining LightGBM...") model = LGBMClassifier( n_estimators=500, max_depth=30, learning_rate=0.05, subsample=0.9, colsample_bytree=0.9, min_child_samples=20, num_leaves=100, n_jobs=-1, random_state=42, verbose=-1 ) model.fit(X_train_scaled, y_train) print("\nEvaluating...") y_pred = model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) print("\n" + "=" * 60) print("CLASSIFICATION REPORT") print("=" * 60) print(classification_report(y_test, y_pred, target_names=genre_encoder.classes_, zero_division=0)) print(f"\n{'='*60}") print(f"ACCURACY: {accuracy:.2%}") print(f"{'='*60}") print("\nNote: ~50% accuracy is typical for 12-genre classification.") print("Genres overlap heavily in audio features.") feature_importance = pd.DataFrame({ 'feature': all_features, 'importance': model.feature_importances_ }).sort_values('importance', ascending=False) print("\nTop 10 Features:") for _, row in feature_importance.head(10).iterrows(): print(f" {row['feature']}: {row['importance']:.0f}") print("\nSaving model...") with open(MODEL_PATH, "wb") as f: pickle.dump(model, f) with open(SCALER_PATH, "wb") as f: pickle.dump(scaler, f) with open(ENCODER_PATH, "wb") as f: pickle.dump((genre_encoder, all_subgenres, all_features), f) return model, scaler, genre_encoder, all_subgenres, all_features def load_model(): print("Loading model...") with open(MODEL_PATH, "rb") as f: model = pickle.load(f) with open(SCALER_PATH, "rb") as f: scaler = pickle.load(f) with open(ENCODER_PATH, "rb") as f: genre_encoder, all_subgenres, all_features = pickle.load(f) return model, scaler, genre_encoder, all_subgenres, all_features def predict(input_values, model, scaler, genre_encoder, all_subgenres): input_array = np.array(input_values).reshape(1, -1) input_scaled = scaler.transform(input_array) genre_idx = model.predict(input_scaled)[0] genre = genre_encoder.inverse_transform([genre_idx])[0] genre_probs = model.predict_proba(input_scaled)[0] top_indices = np.argsort(genre_probs)[::-1][:5] similar = [(genre_encoder.classes_[i], genre_probs[i]) for i in top_indices] related_subs = [s for s in all_subgenres if genre.lower() in s.lower()] if not related_subs: related_subs = all_subgenres[:10] return genre, similar, related_subs def print_result(genre, similar, subgenres): print("\n" + "=" * 60) print("PREDICTION RESULTS") print("=" * 60) print(f"\n GENRE: {genre}") print(f"\n Similar Genres:") for g, prob in similar: bar = "#" * int(prob * 20) + "-" * (20 - int(prob * 20)) print(f" [{bar}] {g}: {prob:.1%}") print(f"\n Sub-genres in {genre}:") for sub in subgenres[:10]: print(f" - {sub}") print("=" * 60) def get_random_values(all_features): df = pd.read_csv(CSV_PATH, nrows=5000) df, _ = engineer_features(df, NUMERICAL_FEATURES) X = df[all_features].fillna(df[all_features].mean()) X = X.replace([np.inf, -np.inf], 0) idx = np.random.randint(0, len(X)) return X.iloc[idx].values.tolist() def main(): if "--train" in sys.argv or not os.path.exists(MODEL_PATH): X, y_genre, genre_encoder, all_subgenres, all_features = load_and_preprocess_data() model, scaler, genre_encoder, all_subgenres, all_features = train_model( X, y_genre, genre_encoder, all_subgenres, all_features ) else: model, scaler, genre_encoder, all_subgenres, all_features = load_model() if "--demo" in sys.argv: print("\n" + "=" * 60) print("DEMO PREDICTIONS") print("=" * 60) for i in range(3): print(f"\n[Demo {i+1}]") values = get_random_values(all_features) for j, feat in enumerate(NUMERICAL_FEATURES[:5]): print(f" {feat}: {values[j]:.4f}") genre, similar, subs = predict(values, model, scaler, genre_encoder, all_subgenres) print_result(genre, similar, subs) return if "--predict" in sys.argv: idx = sys.argv.index("--predict") if idx + 1 < len(sys.argv): try: values = [float(x) for x in sys.argv[idx + 1 : idx + 1 + len(all_features)]] genre, similar, subs = predict(values, model, scaler, genre_encoder, all_subgenres) print_result(genre, similar, subs) return except ValueError as e: print(f"Error: {e}") return print("\nUsage:") print(" python genre_predictor.py --train # Train model") print(" python genre_predictor.py --demo # Demo predictions") print(f" python genre_predictor.py --predict <{len(all_features)} values>") print(f"\nFeatures ({len(all_features)}):") for i, f in enumerate(all_features, 1): print(f" {i}. {f}") if __name__ == "__main__": main()