""" EXTRA CREDIT — Deep Learning with LSTM ======================================= LSTM model for temporal popularity prediction on Spotify. Addresses the extra credit: "Try DL, LSTM, or RL for +1 pt in lowest case study" The LSTM treats each track's audio features as a sequence across popularity tiers (Obscure → Low → Mid → Popular → Hit), learning temporal dynamics of how feature importance shifts across success levels. Usage: python3 lstm_model.py python3 lstm_model.py --epochs 30 --mode spotify python3 lstm_model.py --mode amazon """ import os import sys import argparse import warnings import numpy as np import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt warnings.filterwarnings("ignore") # ── TensorFlow / Keras ────────────────────────────────────── try: import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau from tensorflow.keras.optimizers import Adam TF_OK = True print(f"TensorFlow {tf.__version__} loaded.") except ImportError: TF_OK = False print("[ERROR] TensorFlow not installed. Run: pip install tensorflow") sys.exit(1) from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, r2_score COLORS = ["#2E86AB", "#A23B72", "#F18F01", "#C73E1D", "#44BBA4"] # ════════════════════════════════════════════════════════════ # DATA PREPARATION — SEQUENCE CONSTRUCTION # ════════════════════════════════════════════════════════════ def build_spotify_sequences(df, features, target, window=5): """ Convert track-level data into overlapping windows of length `window`. Tracks are sorted by popularity then split into windows, creating pseudo-temporal sequences that simulate how audio characteristics evolve across the popularity spectrum. """ df_sorted = df.sort_values(target).reset_index(drop=True) X_all = df_sorted[features].values y_all = df_sorted[target].values scaler_X = MinMaxScaler() scaler_y = MinMaxScaler() X_scaled = scaler_X.fit_transform(X_all) y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten() Xs, ys = [], [] for i in range(len(X_scaled) - window): Xs.append(X_scaled[i:i + window]) ys.append(y_scaled[i + window]) return np.array(Xs), np.array(ys), scaler_X, scaler_y def build_amazon_sequences(df, features, target, window=5): """ For Amazon: sort by rating (quality proxy), build overlapping windows. """ df_sorted = df.sort_values("rating").reset_index(drop=True) X_all = df_sorted[features].values y_all = df_sorted[target].values scaler_X = MinMaxScaler() scaler_y = MinMaxScaler() X_scaled = scaler_X.fit_transform(X_all) y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten() Xs, ys = [], [] for i in range(len(X_scaled) - window): Xs.append(X_scaled[i:i + window]) ys.append(y_scaled[i + window]) return np.array(Xs), np.array(ys), scaler_X, scaler_y # ════════════════════════════════════════════════════════════ # LSTM MODEL BUILDER # ════════════════════════════════════════════════════════════ def build_lstm(input_shape, units=64, dropout=0.2): """ Two-layer stacked LSTM with BatchNorm and Dropout. Architecture chosen for sequence regression tasks. """ model = Sequential([ LSTM(units, input_shape=input_shape, return_sequences=True, name="lstm_layer_1"), BatchNormalization(), Dropout(dropout), LSTM(units // 2, return_sequences=False, name="lstm_layer_2"), BatchNormalization(), Dropout(dropout), Dense(32, activation="relu", name="dense_1"), Dense(1, activation="linear", name="output"), ]) model.compile( optimizer=Adam(learning_rate=0.001), loss="mse", metrics=["mae"], ) return model # ════════════════════════════════════════════════════════════ # TRAINING & EVALUATION # ════════════════════════════════════════════════════════════ def train_and_evaluate(X, y, scaler_y, domain, epochs=50, batch_size=32): X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) model = build_lstm(input_shape=(X.shape[1], X.shape[2])) model.summary() callbacks = [ EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True), ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=4, min_lr=1e-5), ] history = model.fit( X_train, y_train, validation_split=0.15, epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=1, ) y_pred_scaled = model.predict(X_test, verbose=0).flatten() # Inverse transform predictions y_test_orig = scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten() y_pred_orig = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() mae = mean_absolute_error(y_test_orig, y_pred_orig) r2 = r2_score(y_test_orig, y_pred_orig) print(f"\n{'─'*50}") print(f"LSTM Results — {domain}") print(f" MAE : {mae:.3f}") print(f" R² : {r2:.3f}") print(f" Epochs trained: {len(history.history['loss'])}") print(f"{'─'*50}") return model, history, y_test_orig, y_pred_orig, mae, r2 # ════════════════════════════════════════════════════════════ # VISUALISATION # ════════════════════════════════════════════════════════════ def plot_results(history, y_test, y_pred, mae, r2, domain, filename): fig, axes = plt.subplots(1, 3, figsize=(18, 5)) fig.suptitle(f"LSTM Deep Learning — {domain}", fontsize=14, fontweight="bold") # Training curve ax = axes[0] ax.plot(history.history["loss"], color=COLORS[0], label="Train Loss") ax.plot(history.history["val_loss"], color=COLORS[1], linestyle="--", label="Val Loss") ax.set_title("Training & Validation Loss", fontweight="bold") ax.set_xlabel("Epoch") ax.set_ylabel("MSE Loss") ax.legend() # Actual vs predicted ax = axes[1] ax.scatter(y_test, y_pred, alpha=0.4, color=COLORS[1], s=20) mn = min(y_test.min(), y_pred.min()) mx = max(y_test.max(), y_pred.max()) ax.plot([mn, mx], [mn, mx], "r--", lw=2, label="Perfect fit") ax.set_title(f"Actual vs Predicted\nR² = {r2:.3f}", fontweight="bold") ax.set_xlabel("Actual") ax.set_ylabel("Predicted") ax.legend() # Residuals ax = axes[2] residuals = y_test - y_pred ax.hist(residuals, bins=30, color=COLORS[2], edgecolor="white") ax.axvline(0, color="red", linestyle="--") ax.set_title(f"Residuals Distribution\nMAE = {mae:.3f}", fontweight="bold") ax.set_xlabel("Residual") ax.set_ylabel("Count") plt.tight_layout() plt.savefig(filename, dpi=150, bbox_inches="tight") plt.close() print(f"Saved: {filename}") # ════════════════════════════════════════════════════════════ # MAIN # ════════════════════════════════════════════════════════════ def run_spotify_lstm(epochs=50): print("\n" + "=" * 60) print("LSTM — SPOTIFY POPULARITY PREDICTION") print("=" * 60) paths = ["spotify_synthetic.csv", "spotify/dataset.csv", "dataset.csv"] df = None for p in paths: if os.path.exists(p): df = pd.read_csv(p) print(f"Loaded: {p} ({len(df)} records)") break if df is None: print("No Spotify data found. Generating synthetic...") np.random.seed(42) n = 800 from scipy.stats import beta as beta_dist dance = beta_dist.rvs(5, 3, size=n) energy = beta_dist.rvs(4, 3, size=n) loudness = np.random.normal(-8, 4, n).clip(-40, 0) tempo = np.random.normal(120, 20, n).clip(60, 200) valence = beta_dist.rvs(3, 3, size=n) acou = beta_dist.rvs(2, 5, size=n) speech = beta_dist.rvs(2, 8, size=n) instru = beta_dist.rvs(1, 9, size=n) pop = np.clip(20 + 25*dance + 15*energy + 0.5*(loudness+20) + np.random.normal(0, 8, n), 0, 100) df = pd.DataFrame({"danceability": dance, "energy": energy, "loudness": loudness, "tempo": tempo, "valence": valence, "acousticness": acou, "speechiness": speech, "instrumentalness": instru, "explicit": np.random.binomial(1, 0.15, n), "popularity": pop.astype(int)}) features = ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "valence", "tempo", "explicit"] df["explicit"] = df["explicit"].astype(int) df = df[features + ["popularity"]].dropna() print(f"\nBuilding LSTM sequences (window=5)...") X, y, scaler_X, scaler_y = build_spotify_sequences(df, features, "popularity", window=5) print(f"Sequence shape: X={X.shape}, y={y.shape}") model, history, y_test, y_pred, mae, r2 = train_and_evaluate( X, y, scaler_y, "Spotify", epochs=epochs ) plot_results(history, y_test, y_pred, mae, r2, "Spotify", "lstm_spotify.png") return {"domain": "spotify", "mae": round(mae, 3), "r2": round(r2, 3)} def run_amazon_lstm(epochs=50): print("\n" + "=" * 60) print("LSTM — AMAZON SALES PREDICTION") print("=" * 60) paths = ["amazon_synthetic.csv", "amazon/amazon.csv"] df = None for p in paths: if os.path.exists(p): raw = pd.read_csv(p) print(f"Loaded: {p} ({len(raw)} records)") # Try to get the needed columns if "log_sales" not in raw.columns and "rating_count" in raw.columns: raw["rating_count"] = pd.to_numeric( raw["rating_count"].astype(str).str.replace(",", ""), errors="coerce" ) raw["log_sales"] = np.log1p(raw["rating_count"]) if all(c in raw.columns for c in ["actual_price", "discount_pct", "rating", "sentiment_score", "log_sales"]): df = raw break if df is None: print("No Amazon data found. Generating synthetic...") np.random.seed(0) n = 800 actual = np.random.lognormal(7, 1.2, n).clip(50, 80000) disc = np.random.uniform(5, 80, n) discounted = actual * (1 - disc/100) rating = np.random.normal(4, 0.5, n).clip(1, 5) sent = np.random.normal(0.5, 0.3, n).clip(-1, 1) log_sales = np.clip(2 + 1.5*rating + 1.2*sent + np.random.normal(0, 0.8, n), 0, 15) df = pd.DataFrame({"actual_price": actual, "discounted_price": discounted, "discount_pct": disc, "rating": rating, "sentiment_score": sent, "log_sales": log_sales}) features = ["actual_price", "discounted_price", "discount_pct", "rating", "sentiment_score"] df = df[features + ["log_sales"]].dropna() # Normalise price to prevent scale domination from sklearn.preprocessing import StandardScaler df[["actual_price", "discounted_price"]] = StandardScaler().fit_transform( df[["actual_price", "discounted_price"]] ) print(f"\nBuilding LSTM sequences (window=5)...") X, y, scaler_X, scaler_y = build_amazon_sequences(df, features, "log_sales", window=5) print(f"Sequence shape: X={X.shape}, y={y.shape}") model, history, y_test, y_pred, mae, r2 = train_and_evaluate( X, y, scaler_y, "Amazon", epochs=epochs ) plot_results(history, y_test, y_pred, mae, r2, "Amazon", "lstm_amazon.png") return {"domain": "amazon", "mae": round(mae, 3), "r2": round(r2, 3)} if __name__ == "__main__": parser = argparse.ArgumentParser(description="LSTM Deep Learning — Extra Credit") parser.add_argument("--mode", choices=["spotify", "amazon", "both"], default="both") parser.add_argument("--epochs", type=int, default=50, help="Max training epochs (EarlyStopping applies)") args = parser.parse_args() results = [] if args.mode in ("spotify", "both"): results.append(run_spotify_lstm(args.epochs)) if args.mode in ("amazon", "both"): results.append(run_amazon_lstm(args.epochs)) print("\n" + "=" * 60) print("LSTM SUMMARY") print("=" * 60) for r in results: print(f" {r['domain'].upper():10s} MAE={r['mae']} R²={r['r2']}") print("\nOutputs: lstm_spotify.png, lstm_amazon.png") print("Include these plots and metrics in the individual reports as DL comparison.")