Spaces:
Sleeping
Sleeping
| """ | |
| EXTRA CREDIT β Deep Learning with LSTM | |
| ======================================= | |
| LSTM model for temporal popularity prediction on Spotify. | |
| Addresses the extra credit: "Try DL, LSTM, or RL for +1 pt in lowest case study" | |
| The LSTM treats each track's audio features as a sequence across | |
| popularity tiers (Obscure β Low β Mid β Popular β Hit), learning | |
| temporal dynamics of how feature importance shifts across success levels. | |
| Usage: | |
| python3 lstm_model.py | |
| python3 lstm_model.py --epochs 30 --mode spotify | |
| python3 lstm_model.py --mode amazon | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| warnings.filterwarnings("ignore") | |
| # ββ TensorFlow / Keras ββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization | |
| from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau | |
| from tensorflow.keras.optimizers import Adam | |
| TF_OK = True | |
| print(f"TensorFlow {tf.__version__} loaded.") | |
| except ImportError: | |
| TF_OK = False | |
| print("[ERROR] TensorFlow not installed. Run: pip install tensorflow") | |
| sys.exit(1) | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_absolute_error, r2_score | |
| COLORS = ["#2E86AB", "#A23B72", "#F18F01", "#C73E1D", "#44BBA4"] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATA PREPARATION β SEQUENCE CONSTRUCTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_spotify_sequences(df, features, target, window=5): | |
| """ | |
| Convert track-level data into overlapping windows of length `window`. | |
| Tracks are sorted by popularity then split into windows, creating | |
| pseudo-temporal sequences that simulate how audio characteristics | |
| evolve across the popularity spectrum. | |
| """ | |
| df_sorted = df.sort_values(target).reset_index(drop=True) | |
| X_all = df_sorted[features].values | |
| y_all = df_sorted[target].values | |
| scaler_X = MinMaxScaler() | |
| scaler_y = MinMaxScaler() | |
| X_scaled = scaler_X.fit_transform(X_all) | |
| y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten() | |
| Xs, ys = [], [] | |
| for i in range(len(X_scaled) - window): | |
| Xs.append(X_scaled[i:i + window]) | |
| ys.append(y_scaled[i + window]) | |
| return np.array(Xs), np.array(ys), scaler_X, scaler_y | |
| def build_amazon_sequences(df, features, target, window=5): | |
| """ | |
| For Amazon: sort by rating (quality proxy), build overlapping windows. | |
| """ | |
| df_sorted = df.sort_values("rating").reset_index(drop=True) | |
| X_all = df_sorted[features].values | |
| y_all = df_sorted[target].values | |
| scaler_X = MinMaxScaler() | |
| scaler_y = MinMaxScaler() | |
| X_scaled = scaler_X.fit_transform(X_all) | |
| y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten() | |
| Xs, ys = [], [] | |
| for i in range(len(X_scaled) - window): | |
| Xs.append(X_scaled[i:i + window]) | |
| ys.append(y_scaled[i + window]) | |
| return np.array(Xs), np.array(ys), scaler_X, scaler_y | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LSTM MODEL BUILDER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_lstm(input_shape, units=64, dropout=0.2): | |
| """ | |
| Two-layer stacked LSTM with BatchNorm and Dropout. | |
| Architecture chosen for sequence regression tasks. | |
| """ | |
| model = Sequential([ | |
| LSTM(units, input_shape=input_shape, return_sequences=True, | |
| name="lstm_layer_1"), | |
| BatchNormalization(), | |
| Dropout(dropout), | |
| LSTM(units // 2, return_sequences=False, name="lstm_layer_2"), | |
| BatchNormalization(), | |
| Dropout(dropout), | |
| Dense(32, activation="relu", name="dense_1"), | |
| Dense(1, activation="linear", name="output"), | |
| ]) | |
| model.compile( | |
| optimizer=Adam(learning_rate=0.001), | |
| loss="mse", | |
| metrics=["mae"], | |
| ) | |
| return model | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TRAINING & EVALUATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def train_and_evaluate(X, y, scaler_y, domain, epochs=50, batch_size=32): | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=42 | |
| ) | |
| model = build_lstm(input_shape=(X.shape[1], X.shape[2])) | |
| model.summary() | |
| callbacks = [ | |
| EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True), | |
| ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=4, min_lr=1e-5), | |
| ] | |
| history = model.fit( | |
| X_train, y_train, | |
| validation_split=0.15, | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| callbacks=callbacks, | |
| verbose=1, | |
| ) | |
| y_pred_scaled = model.predict(X_test, verbose=0).flatten() | |
| # Inverse transform predictions | |
| y_test_orig = scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten() | |
| y_pred_orig = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() | |
| mae = mean_absolute_error(y_test_orig, y_pred_orig) | |
| r2 = r2_score(y_test_orig, y_pred_orig) | |
| print(f"\n{'β'*50}") | |
| print(f"LSTM Results β {domain}") | |
| print(f" MAE : {mae:.3f}") | |
| print(f" RΒ² : {r2:.3f}") | |
| print(f" Epochs trained: {len(history.history['loss'])}") | |
| print(f"{'β'*50}") | |
| return model, history, y_test_orig, y_pred_orig, mae, r2 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # VISUALISATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def plot_results(history, y_test, y_pred, mae, r2, domain, filename): | |
| fig, axes = plt.subplots(1, 3, figsize=(18, 5)) | |
| fig.suptitle(f"LSTM Deep Learning β {domain}", fontsize=14, fontweight="bold") | |
| # Training curve | |
| ax = axes[0] | |
| ax.plot(history.history["loss"], color=COLORS[0], label="Train Loss") | |
| ax.plot(history.history["val_loss"], color=COLORS[1], linestyle="--", label="Val Loss") | |
| ax.set_title("Training & Validation Loss", fontweight="bold") | |
| ax.set_xlabel("Epoch") | |
| ax.set_ylabel("MSE Loss") | |
| ax.legend() | |
| # Actual vs predicted | |
| ax = axes[1] | |
| ax.scatter(y_test, y_pred, alpha=0.4, color=COLORS[1], s=20) | |
| mn = min(y_test.min(), y_pred.min()) | |
| mx = max(y_test.max(), y_pred.max()) | |
| ax.plot([mn, mx], [mn, mx], "r--", lw=2, label="Perfect fit") | |
| ax.set_title(f"Actual vs Predicted\nRΒ² = {r2:.3f}", fontweight="bold") | |
| ax.set_xlabel("Actual") | |
| ax.set_ylabel("Predicted") | |
| ax.legend() | |
| # Residuals | |
| ax = axes[2] | |
| residuals = y_test - y_pred | |
| ax.hist(residuals, bins=30, color=COLORS[2], edgecolor="white") | |
| ax.axvline(0, color="red", linestyle="--") | |
| ax.set_title(f"Residuals Distribution\nMAE = {mae:.3f}", fontweight="bold") | |
| ax.set_xlabel("Residual") | |
| ax.set_ylabel("Count") | |
| plt.tight_layout() | |
| plt.savefig(filename, dpi=150, bbox_inches="tight") | |
| plt.close() | |
| print(f"Saved: {filename}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_spotify_lstm(epochs=50): | |
| print("\n" + "=" * 60) | |
| print("LSTM β SPOTIFY POPULARITY PREDICTION") | |
| print("=" * 60) | |
| paths = ["spotify_synthetic.csv", "spotify/dataset.csv", "dataset.csv"] | |
| df = None | |
| for p in paths: | |
| if os.path.exists(p): | |
| df = pd.read_csv(p) | |
| print(f"Loaded: {p} ({len(df)} records)") | |
| break | |
| if df is None: | |
| print("No Spotify data found. Generating synthetic...") | |
| np.random.seed(42) | |
| n = 800 | |
| from scipy.stats import beta as beta_dist | |
| dance = beta_dist.rvs(5, 3, size=n) | |
| energy = beta_dist.rvs(4, 3, size=n) | |
| loudness = np.random.normal(-8, 4, n).clip(-40, 0) | |
| tempo = np.random.normal(120, 20, n).clip(60, 200) | |
| valence = beta_dist.rvs(3, 3, size=n) | |
| acou = beta_dist.rvs(2, 5, size=n) | |
| speech = beta_dist.rvs(2, 8, size=n) | |
| instru = beta_dist.rvs(1, 9, size=n) | |
| pop = np.clip(20 + 25*dance + 15*energy + 0.5*(loudness+20) + np.random.normal(0, 8, n), 0, 100) | |
| df = pd.DataFrame({"danceability": dance, "energy": energy, "loudness": loudness, | |
| "tempo": tempo, "valence": valence, "acousticness": acou, | |
| "speechiness": speech, "instrumentalness": instru, | |
| "explicit": np.random.binomial(1, 0.15, n), | |
| "popularity": pop.astype(int)}) | |
| features = ["danceability", "energy", "loudness", "speechiness", | |
| "acousticness", "instrumentalness", "valence", "tempo", "explicit"] | |
| df["explicit"] = df["explicit"].astype(int) | |
| df = df[features + ["popularity"]].dropna() | |
| print(f"\nBuilding LSTM sequences (window=5)...") | |
| X, y, scaler_X, scaler_y = build_spotify_sequences(df, features, "popularity", window=5) | |
| print(f"Sequence shape: X={X.shape}, y={y.shape}") | |
| model, history, y_test, y_pred, mae, r2 = train_and_evaluate( | |
| X, y, scaler_y, "Spotify", epochs=epochs | |
| ) | |
| plot_results(history, y_test, y_pred, mae, r2, "Spotify", "lstm_spotify.png") | |
| return {"domain": "spotify", "mae": round(mae, 3), "r2": round(r2, 3)} | |
| def run_amazon_lstm(epochs=50): | |
| print("\n" + "=" * 60) | |
| print("LSTM β AMAZON SALES PREDICTION") | |
| print("=" * 60) | |
| paths = ["amazon_synthetic.csv", "amazon/amazon.csv"] | |
| df = None | |
| for p in paths: | |
| if os.path.exists(p): | |
| raw = pd.read_csv(p) | |
| print(f"Loaded: {p} ({len(raw)} records)") | |
| # Try to get the needed columns | |
| if "log_sales" not in raw.columns and "rating_count" in raw.columns: | |
| raw["rating_count"] = pd.to_numeric( | |
| raw["rating_count"].astype(str).str.replace(",", ""), errors="coerce" | |
| ) | |
| raw["log_sales"] = np.log1p(raw["rating_count"]) | |
| if all(c in raw.columns for c in ["actual_price", "discount_pct", "rating", "sentiment_score", "log_sales"]): | |
| df = raw | |
| break | |
| if df is None: | |
| print("No Amazon data found. Generating synthetic...") | |
| np.random.seed(0) | |
| n = 800 | |
| actual = np.random.lognormal(7, 1.2, n).clip(50, 80000) | |
| disc = np.random.uniform(5, 80, n) | |
| discounted = actual * (1 - disc/100) | |
| rating = np.random.normal(4, 0.5, n).clip(1, 5) | |
| sent = np.random.normal(0.5, 0.3, n).clip(-1, 1) | |
| log_sales = np.clip(2 + 1.5*rating + 1.2*sent + np.random.normal(0, 0.8, n), 0, 15) | |
| df = pd.DataFrame({"actual_price": actual, "discounted_price": discounted, | |
| "discount_pct": disc, "rating": rating, "sentiment_score": sent, | |
| "log_sales": log_sales}) | |
| features = ["actual_price", "discounted_price", "discount_pct", "rating", "sentiment_score"] | |
| df = df[features + ["log_sales"]].dropna() | |
| # Normalise price to prevent scale domination | |
| from sklearn.preprocessing import StandardScaler | |
| df[["actual_price", "discounted_price"]] = StandardScaler().fit_transform( | |
| df[["actual_price", "discounted_price"]] | |
| ) | |
| print(f"\nBuilding LSTM sequences (window=5)...") | |
| X, y, scaler_X, scaler_y = build_amazon_sequences(df, features, "log_sales", window=5) | |
| print(f"Sequence shape: X={X.shape}, y={y.shape}") | |
| model, history, y_test, y_pred, mae, r2 = train_and_evaluate( | |
| X, y, scaler_y, "Amazon", epochs=epochs | |
| ) | |
| plot_results(history, y_test, y_pred, mae, r2, "Amazon", "lstm_amazon.png") | |
| return {"domain": "amazon", "mae": round(mae, 3), "r2": round(r2, 3)} | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="LSTM Deep Learning β Extra Credit") | |
| parser.add_argument("--mode", choices=["spotify", "amazon", "both"], default="both") | |
| parser.add_argument("--epochs", type=int, default=50, help="Max training epochs (EarlyStopping applies)") | |
| args = parser.parse_args() | |
| results = [] | |
| if args.mode in ("spotify", "both"): | |
| results.append(run_spotify_lstm(args.epochs)) | |
| if args.mode in ("amazon", "both"): | |
| results.append(run_amazon_lstm(args.epochs)) | |
| print("\n" + "=" * 60) | |
| print("LSTM SUMMARY") | |
| print("=" * 60) | |
| for r in results: | |
| print(f" {r['domain'].upper():10s} MAE={r['mae']} RΒ²={r['r2']}") | |
| print("\nOutputs: lstm_spotify.png, lstm_amazon.png") | |
| print("Include these plots and metrics in the individual reports as DL comparison.") | |