import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential, Model from tensorflow.keras.layers import LSTM, Dense, Dropout, Input from tensorflow.keras.optimizers import Adam # (Helper functions for data preprocessing - can be imported or defined here) # For simplicity, let's assume a similar preprocessing to deep_mql_model def preprocess_data_for_transfer(df: pd.DataFrame, look_back: int = 60, features_cols=['Close'], target_col='Close'): """ Prepares data for a deep learning model, similar to deep_mql_model. """ df_copy = df.copy() if 'returns' not in df_copy.columns and 'Close' in df_copy.columns: df_copy['returns'] = df_copy['Close'].pct_change() # Example: ensure all specified columns exist, dropna if necessary all_cols = list(set(features_cols + [target_col] + (['returns'] if 'returns' in df_copy.columns else []))) df_copy = df_copy[all_cols].dropna() if df_copy.empty: return np.array([]), np.array([]), None, [] data_to_scale = df_copy.values scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data_to_scale) target_idx_in_scaled = df_copy.columns.tolist().index(target_col) X, y = [], [] for i in range(look_back, len(scaled_data)): X.append(scaled_data[i-look_back:i]) y.append(scaled_data[i, target_idx_in_scaled]) return np.array(X), np.array(y), scaler, df_copy.columns.tolist() def create_base_model(input_shape, base_model_type='lstm', units1=50, units2=50, dropout_rate=0.2): """ Creates a base model architecture for pre-training or as part of transfer learning. """ if base_model_type == 'lstm': model = Sequential([ LSTM(units1, return_sequences=True, input_shape=input_shape, name="base_lstm_1"), Dropout(dropout_rate, name="base_dropout_1"), LSTM(units2, return_sequences=False, name="base_lstm_2"), Dropout(dropout_rate, name="base_dropout_2"), Dense(25, activation='relu', name="base_dense_1") ], name="base_model") # Add other base model types like CNN here # elif base_model_type == 'cnn': # from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten # model = Sequential([ # Conv1D(64, 3, activation='relu', input_shape=input_shape, name="base_conv1d_1"), # MaxPooling1D(2, name="base_maxpool_1"), # Flatten(name="base_flatten"), # Dense(50, activation='relu', name="base_dense_1") # ], name="base_cnn_model") else: raise ValueError(f"Unsupported base_model_type: {base_model_type}") # The model is not compiled here as it might be part of a larger model or compiled later. return model def adapt_model_for_transfer(base_model: Model, num_classes_new_task=1, learning_rate=0.001): """ Adapts a pre-trained base model for a new task. - Freezes base model layers. - Adds new classification/regression head. - Compiles the new model. """ # Freeze the layers of the base model base_model.trainable = False # Create new model on top inputs = Input(shape=base_model.input_shape[1:]) # Get shape without batch size x = base_model(inputs, training=False) # Pass training=False for frozen layers # Add new layers for the specific task x = Dense(128, activation='relu', name="transfer_dense_1")(x) x = Dropout(0.3, name="transfer_dropout_1")(x) outputs = Dense(num_classes_new_task, activation='linear' if num_classes_new_task == 1 else 'softmax', name="transfer_output")(x) adapted_model = Model(inputs, outputs, name="adapted_transfer_model") adapted_model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mean_squared_error' if num_classes_new_task == 1 else 'categorical_crossentropy', metrics=['mean_absolute_error'] if num_classes_new_task == 1 else ['accuracy']) return adapted_model def fine_tune_model(model: Model, X_train, y_train, X_val, y_val, unfreeze_at_layer_name=None, fine_tune_lr=1e-5, epochs=10, batch_size=32): """ Fine-tunes the model. - Optionally unfreezes some layers of the base model. - Re-compiles with a lower learning rate. - Continues training. """ if unfreeze_at_layer_name: model.trainable = True # Unfreeze the entire model first # Then, selectively re-freeze layers *before* the unfreeze_at_layer_name # This is a common strategy: unfreeze top layers of base model for layer in model.get_layer('base_model').layers: # Assumes base_model is nested with this name if layer.name == unfreeze_at_layer_name: break layer.trainable = False else: # If no specific layer, keep base model frozen or unfreeze all (depending on previous state) # For this example, let's assume we unfreeze the whole base_model if unfreeze_at_layer_name is None # Or, more commonly, one might unfreeze the last few layers of the base model. # If base_model is a sub-model: if 'base_model' in [l.name for l in model.layers]: model.get_layer('base_model').trainable = True # Unfreeze the whole base model part print("Unfrozen all layers in 'base_model' for fine-tuning.") model.compile(optimizer=Adam(learning_rate=fine_tune_lr), loss=model.loss, # Use the same loss as before metrics=model.metrics_names[1:]) # Use the same metrics print(f"Starting fine-tuning with learning rate: {fine_tune_lr}") history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_val, y_val), verbose=1) return model, history if __name__ == '__main__': # --- Simulate Pre-training (on a 'large' general dataset) --- print("Simulating pre-training of base model...") # Dummy general dataset dates_general = pd.date_range(start='2020-01-01', periods=1000, freq='B') data_general_np = np.random.rand(1000, 1) * 100 + 50 # Single feature 'Close' general_data = pd.DataFrame(data_general_np, index=dates_general, columns=['Close']) general_data['Close'] = general_data['Close'] + np.sin(np.linspace(0, 50, 1000)) * 30 look_back_tl = 60 X_general, y_general, scaler_general, _ = preprocess_data_for_transfer(general_data, look_back=look_back_tl) if X_general.shape[0] > 0: base_model_input_shape = (X_general.shape[1], X_general.shape[2]) base_model_tl = create_base_model(base_model_input_shape) # Compile for pre-training (if it were standalone) base_model_tl.compile(optimizer=Adam(0.001), loss='mean_squared_error') print(f"Base model summary (for pre-training):") base_model_tl.summary() # Simulate pre-training # base_model_tl.fit(X_general, y_general, epochs=5, batch_size=32, verbose=1) # Short pre-train for demo print("Base model 'pre-trained' (simulated - no actual training in this step for speed).") # In a real scenario, you would save these weights: base_model_tl.save_weights('pretrained_base_weights.h5') else: print("Not enough general data for pre-training simulation.") base_model_tl = None # --- Transfer Learning (on a 'small' specific dataset) --- if base_model_tl: print("\nSimulating transfer learning to a new task/dataset...") # Dummy specific dataset dates_specific = pd.date_range(start='2023-01-01', periods=200, freq='B') data_specific_np = np.random.rand(200, 1) * 70 + 30 # Different scale/behavior specific_data = pd.DataFrame(data_specific_np, index=dates_specific, columns=['Close']) specific_data['Close'] = specific_data['Close'] + np.cos(np.linspace(0, 10, 200)) * 15 X_specific, y_specific, scaler_specific, _ = preprocess_data_for_transfer(specific_data, look_back=look_back_tl) if X_specific.shape[0] > 100: # Ensure enough data for train/val split split_idx = int(len(X_specific) * 0.8) X_train_sp, y_train_sp = X_specific[:split_idx], y_specific[:split_idx] X_val_sp, y_val_sp = X_specific[split_idx:], y_specific[split_idx:] # 1. Adapt the "pre-trained" base model # Assume base_model_tl has pre-trained weights (even if just initialized for this demo) adapted_model_tl = adapt_model_for_transfer(base_model_tl, num_classes_new_task=1) print("Adapted model summary:") adapted_model_tl.summary() # 2. Initial training on new task (with base frozen) print("Training adapted model on new task (base frozen)...") # adapted_model_tl.fit(X_train_sp, y_train_sp, epochs=10, batch_size=16, validation_data=(X_val_sp, y_val_sp), verbose=1) print("'Trained' adapted model (simulated - no actual training for speed).") # 3. Fine-tune (unfreeze some layers of base_model and train with low LR) print("\nFine-tuning model...") # Example: unfreeze layers from 'base_lstm_2' onwards in the base_model part # For this demo, let's try unfreezing the whole base_model part by passing None fine_tuned_model, history = fine_tune_model( adapted_model_tl, X_train_sp, y_train_sp, X_val_sp, y_val_sp, unfreeze_at_layer_name=None, # Unfreeze all of base_model # unfreeze_at_layer_name='base_lstm_2', # Or specify a layer fine_tune_lr=1e-5, epochs=5, # Short fine-tune for demo batch_size=16 ) print("Model fine-tuned.") # Example prediction if len(X_val_sp) > 0: preds = fine_tuned_model.predict(X_val_sp) print(f"\nSample predictions on validation set (first 5): {preds[:5].flatten()}") print(f"Actual values (first 5): {y_val_sp[:5].flatten()}") else: print("Not enough specific data for transfer learning simulation.") else: print("Base model not available, skipping transfer learning simulation.")