File size: 10,381 Bytes
5f10e37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam
# (Helper functions for data preprocessing - can be imported or defined here)
# For simplicity, let's assume a similar preprocessing to deep_mql_model
def preprocess_data_for_transfer(df: pd.DataFrame, look_back: int = 60, features_cols=['Close'], target_col='Close'):
"""
Prepares data for a deep learning model, similar to deep_mql_model.
"""
df_copy = df.copy()
if 'returns' not in df_copy.columns and 'Close' in df_copy.columns:
df_copy['returns'] = df_copy['Close'].pct_change()
# Example: ensure all specified columns exist, dropna if necessary
all_cols = list(set(features_cols + [target_col] + (['returns'] if 'returns' in df_copy.columns else [])))
df_copy = df_copy[all_cols].dropna()
if df_copy.empty:
return np.array([]), np.array([]), None, []
data_to_scale = df_copy.values
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data_to_scale)
target_idx_in_scaled = df_copy.columns.tolist().index(target_col)
X, y = [], []
for i in range(look_back, len(scaled_data)):
X.append(scaled_data[i-look_back:i])
y.append(scaled_data[i, target_idx_in_scaled])
return np.array(X), np.array(y), scaler, df_copy.columns.tolist()
def create_base_model(input_shape, base_model_type='lstm', units1=50, units2=50, dropout_rate=0.2):
"""
Creates a base model architecture for pre-training or as part of transfer learning.
"""
if base_model_type == 'lstm':
model = Sequential([
LSTM(units1, return_sequences=True, input_shape=input_shape, name="base_lstm_1"),
Dropout(dropout_rate, name="base_dropout_1"),
LSTM(units2, return_sequences=False, name="base_lstm_2"),
Dropout(dropout_rate, name="base_dropout_2"),
Dense(25, activation='relu', name="base_dense_1")
], name="base_model")
# Add other base model types like CNN here
# elif base_model_type == 'cnn':
# from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten
# model = Sequential([
# Conv1D(64, 3, activation='relu', input_shape=input_shape, name="base_conv1d_1"),
# MaxPooling1D(2, name="base_maxpool_1"),
# Flatten(name="base_flatten"),
# Dense(50, activation='relu', name="base_dense_1")
# ], name="base_cnn_model")
else:
raise ValueError(f"Unsupported base_model_type: {base_model_type}")
# The model is not compiled here as it might be part of a larger model or compiled later.
return model
def adapt_model_for_transfer(base_model: Model, num_classes_new_task=1, learning_rate=0.001):
"""
Adapts a pre-trained base model for a new task.
- Freezes base model layers.
- Adds new classification/regression head.
- Compiles the new model.
"""
# Freeze the layers of the base model
base_model.trainable = False
# Create new model on top
inputs = Input(shape=base_model.input_shape[1:]) # Get shape without batch size
x = base_model(inputs, training=False) # Pass training=False for frozen layers
# Add new layers for the specific task
x = Dense(128, activation='relu', name="transfer_dense_1")(x)
x = Dropout(0.3, name="transfer_dropout_1")(x)
outputs = Dense(num_classes_new_task, activation='linear' if num_classes_new_task == 1 else 'softmax', name="transfer_output")(x)
adapted_model = Model(inputs, outputs, name="adapted_transfer_model")
adapted_model.compile(optimizer=Adam(learning_rate=learning_rate),
loss='mean_squared_error' if num_classes_new_task == 1 else 'categorical_crossentropy',
metrics=['mean_absolute_error'] if num_classes_new_task == 1 else ['accuracy'])
return adapted_model
def fine_tune_model(model: Model, X_train, y_train, X_val, y_val, unfreeze_at_layer_name=None, fine_tune_lr=1e-5, epochs=10, batch_size=32):
"""
Fine-tunes the model.
- Optionally unfreezes some layers of the base model.
- Re-compiles with a lower learning rate.
- Continues training.
"""
if unfreeze_at_layer_name:
model.trainable = True # Unfreeze the entire model first
# Then, selectively re-freeze layers *before* the unfreeze_at_layer_name
# This is a common strategy: unfreeze top layers of base model
for layer in model.get_layer('base_model').layers: # Assumes base_model is nested with this name
if layer.name == unfreeze_at_layer_name:
break
layer.trainable = False
else: # If no specific layer, keep base model frozen or unfreeze all (depending on previous state)
# For this example, let's assume we unfreeze the whole base_model if unfreeze_at_layer_name is None
# Or, more commonly, one might unfreeze the last few layers of the base model.
# If base_model is a sub-model:
if 'base_model' in [l.name for l in model.layers]:
model.get_layer('base_model').trainable = True # Unfreeze the whole base model part
print("Unfrozen all layers in 'base_model' for fine-tuning.")
model.compile(optimizer=Adam(learning_rate=fine_tune_lr),
loss=model.loss, # Use the same loss as before
metrics=model.metrics_names[1:]) # Use the same metrics
print(f"Starting fine-tuning with learning rate: {fine_tune_lr}")
history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_val, y_val), verbose=1)
return model, history
if __name__ == '__main__':
# --- Simulate Pre-training (on a 'large' general dataset) ---
print("Simulating pre-training of base model...")
# Dummy general dataset
dates_general = pd.date_range(start='2020-01-01', periods=1000, freq='B')
data_general_np = np.random.rand(1000, 1) * 100 + 50 # Single feature 'Close'
general_data = pd.DataFrame(data_general_np, index=dates_general, columns=['Close'])
general_data['Close'] = general_data['Close'] + np.sin(np.linspace(0, 50, 1000)) * 30
look_back_tl = 60
X_general, y_general, scaler_general, _ = preprocess_data_for_transfer(general_data, look_back=look_back_tl)
if X_general.shape[0] > 0:
base_model_input_shape = (X_general.shape[1], X_general.shape[2])
base_model_tl = create_base_model(base_model_input_shape)
# Compile for pre-training (if it were standalone)
base_model_tl.compile(optimizer=Adam(0.001), loss='mean_squared_error')
print(f"Base model summary (for pre-training):")
base_model_tl.summary()
# Simulate pre-training
# base_model_tl.fit(X_general, y_general, epochs=5, batch_size=32, verbose=1) # Short pre-train for demo
print("Base model 'pre-trained' (simulated - no actual training in this step for speed).")
# In a real scenario, you would save these weights: base_model_tl.save_weights('pretrained_base_weights.h5')
else:
print("Not enough general data for pre-training simulation.")
base_model_tl = None
# --- Transfer Learning (on a 'small' specific dataset) ---
if base_model_tl:
print("\nSimulating transfer learning to a new task/dataset...")
# Dummy specific dataset
dates_specific = pd.date_range(start='2023-01-01', periods=200, freq='B')
data_specific_np = np.random.rand(200, 1) * 70 + 30 # Different scale/behavior
specific_data = pd.DataFrame(data_specific_np, index=dates_specific, columns=['Close'])
specific_data['Close'] = specific_data['Close'] + np.cos(np.linspace(0, 10, 200)) * 15
X_specific, y_specific, scaler_specific, _ = preprocess_data_for_transfer(specific_data, look_back=look_back_tl)
if X_specific.shape[0] > 100: # Ensure enough data for train/val split
split_idx = int(len(X_specific) * 0.8)
X_train_sp, y_train_sp = X_specific[:split_idx], y_specific[:split_idx]
X_val_sp, y_val_sp = X_specific[split_idx:], y_specific[split_idx:]
# 1. Adapt the "pre-trained" base model
# Assume base_model_tl has pre-trained weights (even if just initialized for this demo)
adapted_model_tl = adapt_model_for_transfer(base_model_tl, num_classes_new_task=1)
print("Adapted model summary:")
adapted_model_tl.summary()
# 2. Initial training on new task (with base frozen)
print("Training adapted model on new task (base frozen)...")
# adapted_model_tl.fit(X_train_sp, y_train_sp, epochs=10, batch_size=16, validation_data=(X_val_sp, y_val_sp), verbose=1)
print("'Trained' adapted model (simulated - no actual training for speed).")
# 3. Fine-tune (unfreeze some layers of base_model and train with low LR)
print("\nFine-tuning model...")
# Example: unfreeze layers from 'base_lstm_2' onwards in the base_model part
# For this demo, let's try unfreezing the whole base_model part by passing None
fine_tuned_model, history = fine_tune_model(
adapted_model_tl,
X_train_sp, y_train_sp,
X_val_sp, y_val_sp,
unfreeze_at_layer_name=None, # Unfreeze all of base_model
# unfreeze_at_layer_name='base_lstm_2', # Or specify a layer
fine_tune_lr=1e-5,
epochs=5, # Short fine-tune for demo
batch_size=16
)
print("Model fine-tuned.")
# Example prediction
if len(X_val_sp) > 0:
preds = fine_tuned_model.predict(X_val_sp)
print(f"\nSample predictions on validation set (first 5): {preds[:5].flatten()}")
print(f"Actual values (first 5): {y_val_sp[:5].flatten()}")
else:
print("Not enough specific data for transfer learning simulation.")
else:
print("Base model not available, skipping transfer learning simulation.") |