amazon-spotify-analyzer / lstm_model.py
Seagle123's picture
Upload 4 files
b2590d8 verified
"""
EXTRA CREDIT β€” Deep Learning with LSTM
=======================================
LSTM model for temporal popularity prediction on Spotify.
Addresses the extra credit: "Try DL, LSTM, or RL for +1 pt in lowest case study"
The LSTM treats each track's audio features as a sequence across
popularity tiers (Obscure β†’ Low β†’ Mid β†’ Popular β†’ Hit), learning
temporal dynamics of how feature importance shifts across success levels.
Usage:
python3 lstm_model.py
python3 lstm_model.py --epochs 30 --mode spotify
python3 lstm_model.py --mode amazon
"""
import os
import sys
import argparse
import warnings
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
# ── TensorFlow / Keras ──────────────────────────────────────
try:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
TF_OK = True
print(f"TensorFlow {tf.__version__} loaded.")
except ImportError:
TF_OK = False
print("[ERROR] TensorFlow not installed. Run: pip install tensorflow")
sys.exit(1)
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
COLORS = ["#2E86AB", "#A23B72", "#F18F01", "#C73E1D", "#44BBA4"]
# ════════════════════════════════════════════════════════════
# DATA PREPARATION β€” SEQUENCE CONSTRUCTION
# ════════════════════════════════════════════════════════════
def build_spotify_sequences(df, features, target, window=5):
"""
Convert track-level data into overlapping windows of length `window`.
Tracks are sorted by popularity then split into windows, creating
pseudo-temporal sequences that simulate how audio characteristics
evolve across the popularity spectrum.
"""
df_sorted = df.sort_values(target).reset_index(drop=True)
X_all = df_sorted[features].values
y_all = df_sorted[target].values
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X_all)
y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten()
Xs, ys = [], []
for i in range(len(X_scaled) - window):
Xs.append(X_scaled[i:i + window])
ys.append(y_scaled[i + window])
return np.array(Xs), np.array(ys), scaler_X, scaler_y
def build_amazon_sequences(df, features, target, window=5):
"""
For Amazon: sort by rating (quality proxy), build overlapping windows.
"""
df_sorted = df.sort_values("rating").reset_index(drop=True)
X_all = df_sorted[features].values
y_all = df_sorted[target].values
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X_all)
y_scaled = scaler_y.fit_transform(y_all.reshape(-1, 1)).flatten()
Xs, ys = [], []
for i in range(len(X_scaled) - window):
Xs.append(X_scaled[i:i + window])
ys.append(y_scaled[i + window])
return np.array(Xs), np.array(ys), scaler_X, scaler_y
# ════════════════════════════════════════════════════════════
# LSTM MODEL BUILDER
# ════════════════════════════════════════════════════════════
def build_lstm(input_shape, units=64, dropout=0.2):
"""
Two-layer stacked LSTM with BatchNorm and Dropout.
Architecture chosen for sequence regression tasks.
"""
model = Sequential([
LSTM(units, input_shape=input_shape, return_sequences=True,
name="lstm_layer_1"),
BatchNormalization(),
Dropout(dropout),
LSTM(units // 2, return_sequences=False, name="lstm_layer_2"),
BatchNormalization(),
Dropout(dropout),
Dense(32, activation="relu", name="dense_1"),
Dense(1, activation="linear", name="output"),
])
model.compile(
optimizer=Adam(learning_rate=0.001),
loss="mse",
metrics=["mae"],
)
return model
# ════════════════════════════════════════════════════════════
# TRAINING & EVALUATION
# ════════════════════════════════════════════════════════════
def train_and_evaluate(X, y, scaler_y, domain, epochs=50, batch_size=32):
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = build_lstm(input_shape=(X.shape[1], X.shape[2]))
model.summary()
callbacks = [
EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True),
ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=4, min_lr=1e-5),
]
history = model.fit(
X_train, y_train,
validation_split=0.15,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=1,
)
y_pred_scaled = model.predict(X_test, verbose=0).flatten()
# Inverse transform predictions
y_test_orig = scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_orig = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
mae = mean_absolute_error(y_test_orig, y_pred_orig)
r2 = r2_score(y_test_orig, y_pred_orig)
print(f"\n{'─'*50}")
print(f"LSTM Results β€” {domain}")
print(f" MAE : {mae:.3f}")
print(f" RΒ² : {r2:.3f}")
print(f" Epochs trained: {len(history.history['loss'])}")
print(f"{'─'*50}")
return model, history, y_test_orig, y_pred_orig, mae, r2
# ════════════════════════════════════════════════════════════
# VISUALISATION
# ════════════════════════════════════════════════════════════
def plot_results(history, y_test, y_pred, mae, r2, domain, filename):
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
fig.suptitle(f"LSTM Deep Learning β€” {domain}", fontsize=14, fontweight="bold")
# Training curve
ax = axes[0]
ax.plot(history.history["loss"], color=COLORS[0], label="Train Loss")
ax.plot(history.history["val_loss"], color=COLORS[1], linestyle="--", label="Val Loss")
ax.set_title("Training & Validation Loss", fontweight="bold")
ax.set_xlabel("Epoch")
ax.set_ylabel("MSE Loss")
ax.legend()
# Actual vs predicted
ax = axes[1]
ax.scatter(y_test, y_pred, alpha=0.4, color=COLORS[1], s=20)
mn = min(y_test.min(), y_pred.min())
mx = max(y_test.max(), y_pred.max())
ax.plot([mn, mx], [mn, mx], "r--", lw=2, label="Perfect fit")
ax.set_title(f"Actual vs Predicted\nRΒ² = {r2:.3f}", fontweight="bold")
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
ax.legend()
# Residuals
ax = axes[2]
residuals = y_test - y_pred
ax.hist(residuals, bins=30, color=COLORS[2], edgecolor="white")
ax.axvline(0, color="red", linestyle="--")
ax.set_title(f"Residuals Distribution\nMAE = {mae:.3f}", fontweight="bold")
ax.set_xlabel("Residual")
ax.set_ylabel("Count")
plt.tight_layout()
plt.savefig(filename, dpi=150, bbox_inches="tight")
plt.close()
print(f"Saved: {filename}")
# ════════════════════════════════════════════════════════════
# MAIN
# ════════════════════════════════════════════════════════════
def run_spotify_lstm(epochs=50):
print("\n" + "=" * 60)
print("LSTM β€” SPOTIFY POPULARITY PREDICTION")
print("=" * 60)
paths = ["spotify_synthetic.csv", "spotify/dataset.csv", "dataset.csv"]
df = None
for p in paths:
if os.path.exists(p):
df = pd.read_csv(p)
print(f"Loaded: {p} ({len(df)} records)")
break
if df is None:
print("No Spotify data found. Generating synthetic...")
np.random.seed(42)
n = 800
from scipy.stats import beta as beta_dist
dance = beta_dist.rvs(5, 3, size=n)
energy = beta_dist.rvs(4, 3, size=n)
loudness = np.random.normal(-8, 4, n).clip(-40, 0)
tempo = np.random.normal(120, 20, n).clip(60, 200)
valence = beta_dist.rvs(3, 3, size=n)
acou = beta_dist.rvs(2, 5, size=n)
speech = beta_dist.rvs(2, 8, size=n)
instru = beta_dist.rvs(1, 9, size=n)
pop = np.clip(20 + 25*dance + 15*energy + 0.5*(loudness+20) + np.random.normal(0, 8, n), 0, 100)
df = pd.DataFrame({"danceability": dance, "energy": energy, "loudness": loudness,
"tempo": tempo, "valence": valence, "acousticness": acou,
"speechiness": speech, "instrumentalness": instru,
"explicit": np.random.binomial(1, 0.15, n),
"popularity": pop.astype(int)})
features = ["danceability", "energy", "loudness", "speechiness",
"acousticness", "instrumentalness", "valence", "tempo", "explicit"]
df["explicit"] = df["explicit"].astype(int)
df = df[features + ["popularity"]].dropna()
print(f"\nBuilding LSTM sequences (window=5)...")
X, y, scaler_X, scaler_y = build_spotify_sequences(df, features, "popularity", window=5)
print(f"Sequence shape: X={X.shape}, y={y.shape}")
model, history, y_test, y_pred, mae, r2 = train_and_evaluate(
X, y, scaler_y, "Spotify", epochs=epochs
)
plot_results(history, y_test, y_pred, mae, r2, "Spotify", "lstm_spotify.png")
return {"domain": "spotify", "mae": round(mae, 3), "r2": round(r2, 3)}
def run_amazon_lstm(epochs=50):
print("\n" + "=" * 60)
print("LSTM β€” AMAZON SALES PREDICTION")
print("=" * 60)
paths = ["amazon_synthetic.csv", "amazon/amazon.csv"]
df = None
for p in paths:
if os.path.exists(p):
raw = pd.read_csv(p)
print(f"Loaded: {p} ({len(raw)} records)")
# Try to get the needed columns
if "log_sales" not in raw.columns and "rating_count" in raw.columns:
raw["rating_count"] = pd.to_numeric(
raw["rating_count"].astype(str).str.replace(",", ""), errors="coerce"
)
raw["log_sales"] = np.log1p(raw["rating_count"])
if all(c in raw.columns for c in ["actual_price", "discount_pct", "rating", "sentiment_score", "log_sales"]):
df = raw
break
if df is None:
print("No Amazon data found. Generating synthetic...")
np.random.seed(0)
n = 800
actual = np.random.lognormal(7, 1.2, n).clip(50, 80000)
disc = np.random.uniform(5, 80, n)
discounted = actual * (1 - disc/100)
rating = np.random.normal(4, 0.5, n).clip(1, 5)
sent = np.random.normal(0.5, 0.3, n).clip(-1, 1)
log_sales = np.clip(2 + 1.5*rating + 1.2*sent + np.random.normal(0, 0.8, n), 0, 15)
df = pd.DataFrame({"actual_price": actual, "discounted_price": discounted,
"discount_pct": disc, "rating": rating, "sentiment_score": sent,
"log_sales": log_sales})
features = ["actual_price", "discounted_price", "discount_pct", "rating", "sentiment_score"]
df = df[features + ["log_sales"]].dropna()
# Normalise price to prevent scale domination
from sklearn.preprocessing import StandardScaler
df[["actual_price", "discounted_price"]] = StandardScaler().fit_transform(
df[["actual_price", "discounted_price"]]
)
print(f"\nBuilding LSTM sequences (window=5)...")
X, y, scaler_X, scaler_y = build_amazon_sequences(df, features, "log_sales", window=5)
print(f"Sequence shape: X={X.shape}, y={y.shape}")
model, history, y_test, y_pred, mae, r2 = train_and_evaluate(
X, y, scaler_y, "Amazon", epochs=epochs
)
plot_results(history, y_test, y_pred, mae, r2, "Amazon", "lstm_amazon.png")
return {"domain": "amazon", "mae": round(mae, 3), "r2": round(r2, 3)}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="LSTM Deep Learning β€” Extra Credit")
parser.add_argument("--mode", choices=["spotify", "amazon", "both"], default="both")
parser.add_argument("--epochs", type=int, default=50, help="Max training epochs (EarlyStopping applies)")
args = parser.parse_args()
results = []
if args.mode in ("spotify", "both"):
results.append(run_spotify_lstm(args.epochs))
if args.mode in ("amazon", "both"):
results.append(run_amazon_lstm(args.epochs))
print("\n" + "=" * 60)
print("LSTM SUMMARY")
print("=" * 60)
for r in results:
print(f" {r['domain'].upper():10s} MAE={r['mae']} RΒ²={r['r2']}")
print("\nOutputs: lstm_spotify.png, lstm_amazon.png")
print("Include these plots and metrics in the individual reports as DL comparison.")