File size: 5,353 Bytes
fc7b4a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from src.preprocessing.preprocessor import dataset_read, bulk_preprocessing
from src.spectttra.spectttra_trainer import spectttra_train
from src.llm2vectrain.model import load_llm2vec_model
from src.llm2vectrain.llm2vec_trainer import l2vec_train
from src.models.mlp import build_mlp, load_config

from src.utils.config_loader import DATASET_NPZ, PCA_MODEL
from src.utils.dataset import dataset_scaler, dataset_splitter
from sklearn.decomposition import PCA

from pathlib import Path
import numpy as np
import logging
import joblib

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


def train_mlp_model(data : dict):
    """
    Train the MLP model with extracted features.
    
    Parameters
    ----------
        data : dict{np.array}
            A dictionary of np.arrays, containing the train/test/val split.
    """
    logger.info("Starting MLP training...")
    
    # Load MLP configuration
    config = load_config("config/model_config.yml")

    # Destructure the dictionary to get data split
    X_train, y_train = data["train"]
    X_val, y_val     = data["val"]
    X_test, y_test   = data["test"]
    
    # Build and train MLP
    mlp_classifier = build_mlp(input_dim=X_train.shape[1], config=config)
    
    # Show model summary
    mlp_classifier.get_model_summary()
    
    # Train the model
    history = mlp_classifier.train(X_train, y_train, X_val, y_val)
    
    # Load best model and evaluate on test set
    try:
        mlp_classifier.load_model("models/mlp/mlp_best.pth")
        logger.info("Loaded best model for final evaluation")
    except FileNotFoundError:
        logger.warning("Best model not found, using current model")
    
    # Final evaluation
    test_results = mlp_classifier.evaluate(X_test, y_test)

    # Save final model
    mlp_classifier.save_model("models/mlp/mlp_multimodal.pth")
    
    logger.info("MLP training completed successfully!")
    logger.info(f"Final test accuracy: {test_results['test_accuracy']:.2f}%")
    
    return mlp_classifier


def train_pipeline():
    """
    Training script which includes preprocessing, feature extraction, and training the MLP model.

    The train pipeline saves the train dataset in an .npz format.

    Parameters
    ----------
    None

    Returns
    -------
    None
    """

    # Instantiate X and Y vectors
    X, Y = None, None

    dataset_path = Path(DATASET_NPZ)

    if dataset_path.exists():
        logger.info("Training dataset already exists. Loading file...")

        loaded_data = np.load(DATASET_NPZ)
        X = loaded_data["X"]
        Y = loaded_data["Y"]
    else:
        logger.info("Training dataset does not exist. Processing data...")
        # Get batches from dataset and return full Y labels
        batches, Y = dataset_read(batch_size=500)
        batch_count = 1

        # Instantiate LLM2Vec and PCA model
        llm2vec_model = load_llm2vec_model()

        # Preallocate spaces for both audio and lyric vectors to reduce memory overhead
        audio_vectors = np.zeros((len(Y), 384), dtype=np.float32)
        lyric_vectors = np.zeros((len(Y), 4096), dtype=np.float32)

        start_idx = 0
        for batch in batches:

            logger.info(f"Bulk Preprocessing - Batch {batch_count}.")
            audio, lyrics = bulk_preprocessing(batch, batch_count)
            batch_count += 1

            # Call the train methods for both SpecTTTra and LLM2Vec
            logger.info("Starting SpecTTTra feature extraction...")
            audio_features = spectttra_train(audio)

            logger.info("Starting LLM2Vec feature extraction...")
            lyrics_features = l2vec_train(llm2vec_model, lyrics)

            batch_size = audio_features.shape[0]

            # Store the results on preallocated spaces
            audio_vectors[start_idx:start_idx + batch_size, :] = audio_features
            lyric_vectors[start_idx:start_idx + batch_size, :] = lyrics_features
        
            # Delete stored instance for next batch to remove overhead
            del audio, lyrics, audio_features, lyrics_features

        # Run standard scaling on audio and lyrics separately
        logger.info("Running standard scaling for audio and lyrics...")
        audio_vectors, lyric_vectors = dataset_scaler(audio_vectors, lyric_vectors)

        # Start training the PCA to the collected lyrics features
        logger.info("PCA Training on lyric vectors...")
        pca = PCA(n_components=256, svd_solver="randomized", random_state=42)
        lyric_vectors = pca.fit_transform(lyric_vectors)
        
        # Save the trained PCA model
        joblib.dump(pca, "models/fusion/pca.pkl")

        # Concatenate audio features and reduced lyrics features
        X = np.concatenate([audio_vectors, lyric_vectors], axis=1)
        logger.info(f"Audio and Lyrics Concatenated. Final features shape: {X.shape}")

        # Convert label list into np.array
        Y = np.array(Y)

        # Save both X and Y to an .npz file for easier loading
        logger.info("Saving dataset for future testing...")
        np.savez(DATASET_NPZ, X=X, Y=Y)

    # Do data splitting
    data = dataset_splitter(X, Y)

    logger.info("Starting MLP training...")
    train_mlp_model(data)


if __name__ == "__main__":
    train_pipeline()