File size: 19,532 Bytes
5f10e37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# rnn/agents/deep_portfolio.py (ou onde você tem DeepPortfolioAI / DeepPortfolioAgentNetwork)
import numpy as np
from tensorflow.keras import regularizers
import tensorflow as tf
from tensorflow.keras.layers import (
Input, Conv1D, LSTM, Dense, Dropout,
MultiHeadAttention, Reshape, Concatenate,
TimeDistributed, GlobalAveragePooling1D, LayerNormalization
)
from tensorflow.keras.models import Model
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
# Comente as importações do transformers se não for testar o sentimento agora para simplificar
# from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
# --- DEFINIÇÕES DE CONFIGURAÇÃO (COPIE OU IMPORTE DO SEU CONFIG.PY) ---
# Se você não importar do config.py, defina-as aqui para o teste
WINDOW_SIZE_CONF = 60
NUM_ASSETS_CONF = 4 # Ex: ETH, BTC, ADA, SOL
NUM_FEATURES_PER_ASSET_CONF = 26 # Número de features calculadas para CADA ativo
# (open_div_atr, ..., buy_condition_v1, etc.)
L2_REG = 0.0001 # Exemplo, use o valor do seu config
# (Cole as classes AssetProcessor e DeepPortfolioAgentNetwork aqui se estiver em um novo script)
# Ou, se estiver no mesmo arquivo, elas já estarão definidas.
# ... (Definição das classes AssetProcessor e DeepPortfolioAgentNetwork como na resposta anterior) ...
# Certifique-se que a classe DeepPortfolioAgentNetwork está usando estas constantes:
# num_assets=NUM_ASSETS_CONF,
# sequence_length=WINDOW_SIZE_CONF,
# num_features_per_asset=NUM_FEATURES_PER_ASSET_CONF
class AssetProcessor(tf.keras.Model):
def __init__(self, sequence_length, num_features, cnn_filters1=32, cnn_filters2=64, lstm_units1=64, lstm_units2=32, dropout_rate=0.2, name="single_asset_processor_module", **kwargs): # Adicionado **kwargs
super(AssetProcessor, self).__init__(name=name, **kwargs) # Adicionado **kwargs
self.sequence_length = sequence_length
self.num_features = num_features
self.cnn_filters1 = cnn_filters1 # Salvar para get_config
self.cnn_filters2 = cnn_filters2
self.lstm_units1 = lstm_units1
self.lstm_units2 = lstm_units2
self.dropout_rate = dropout_rate
self.conv1 = Conv1D(filters=cnn_filters1, kernel_size=3, activation='relu', padding='same', name="asset_cnn1")
self.dropout_cnn1 = Dropout(dropout_rate, name="asset_cnn1_dropout")
self.conv2 = Conv1D(filters=cnn_filters2, kernel_size=3, activation='relu', padding='same', name="asset_cnn2")
self.dropout_cnn2 = Dropout(dropout_rate, name="asset_cnn2_dropout")
self.lstm1 = LSTM(lstm_units1, return_sequences=True, name="asset_lstm1")
self.dropout_lstm1 = Dropout(dropout_rate, name="asset_lstm1_dropout")
self.lstm2 = LSTM(lstm_units2, return_sequences=False, name="asset_lstm2_final")
self.dropout_lstm2 = Dropout(dropout_rate, name="asset_lstm2_dropout")
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.dropout_cnn1(x, training=training)
x = self.conv2(x)
x = self.dropout_cnn2(x, training=training)
x = self.lstm1(x, training=training)
x = self.dropout_lstm1(x, training=training)
x = self.lstm2(x, training=training)
x_processed_asset = self.dropout_lstm2(x, training=training)
return x_processed_asset
def get_config(self):
config = super().get_config()
config.update({
"sequence_length": self.sequence_length,
"num_features": self.num_features,
"cnn_filters1": self.cnn_filters1,
"cnn_filters2": self.cnn_filters2,
"lstm_units1": self.lstm_units1,
"lstm_units2": self.lstm_units2,
"dropout_rate": self.dropout_rate,
})
return config
class DeepPortfolioAgentNetwork(tf.keras.Model):
def __init__(self,
num_assets=int(NUM_ASSETS_CONF),
sequence_length=int(WINDOW_SIZE_CONF),
num_features_per_asset=int(NUM_FEATURES_PER_ASSET_CONF),
asset_cnn_filters1=32, asset_cnn_filters2=64,
asset_lstm_units1=64, asset_lstm_units2=32, asset_dropout=0.2,
mha_num_heads=4, mha_key_dim_divisor=2, # key_dim será asset_lstm_units2 // mha_key_dim_divisor
final_dense_units1=128, final_dense_units2=64, final_dropout=0.3,
use_sentiment_analysis=True,
output_latent_features=False, **kwargs): # Adicionado **kwargs
super(DeepPortfolioAgentNetwork, self).__init__(name="deep_portfolio_agent_network", **kwargs) # Adicionado **kwargs
print(f"DPN __init__ > num_assets ENTRADA: {num_assets}, tipo: {type(num_assets)}")
# Tentar extrair o valor escalar se for um tensor/variável ou TrackedDict
def get_int_value(param_name, val):
if isinstance(val, (tf.Tensor, tf.Variable)):
if val.shape == tf.TensorShape([]): # Escalar
print(f"DPN __init__: Convertendo {param_name} (Tensor/Variable escalar) para int.")
return int(val.numpy())
else:
raise ValueError(f"{param_name} é um Tensor/Variable mas não é escalar. Shape: {val.shape}")
elif isinstance(val, dict): # Pode ser um TrackedDict
# TrackedDict pode se comportar como um dict. Se o valor real está "escondido",
# precisamos descobrir como acessá-lo.
# Por agora, vamos tentar a conversão direta, e se falhar, o erro será mais claro.
# Se for um dict simples com uma chave específica, você precisaria dessa chave.
# O erro 'KeyError: value' sugere que ['value'] não é a forma correta.
# Geralmente, para hiperparâmetros, o TrackedDict deve conter o valor diretamente
# se o SB3 o passou corretamente.
print(f"DPN __init__: Tentando converter {param_name} (dict-like) para int.")
try:
return int(val) # Tentar conversão direta
except TypeError:
# Se TrackedDict se comporta como um tensor quando usado em ops TF,
# tf.get_static_value pode funcionar, ou apenas o uso direto
# em operações TF (mas range() não é uma op TF).
# Se for um tensor TF "disfarçado", .numpy() pode funcionar.
# Se for um dict com uma chave específica, essa chave seria necessária.
# O erro mostra que ['value'] não funcionou.
print(f"DPN __init__: Conversão direta de {param_name} (dict-like) para int falhou. Investigar TrackedDict.")
# Para depuração, você pode tentar imprimir os itens do dict:
# if isinstance(val, collections.abc.Mapping): # Checa se é um dict-like
# for k, v_item in val.items():
# print(f" {param_name} item: {k} -> {v_item}")
raise TypeError(f"{param_name} é {type(val)} e não pôde ser convertido para int diretamente. Valor: {val}")
else: # Tenta conversão direta para outros tipos
return int(val)
try:
self.num_assets = get_int_value("num_assets", num_assets)
self.sequence_length = get_int_value("sequence_length", sequence_length)
self.num_features_per_asset = get_int_value("num_features_per_asset", num_features_per_asset)
self.asset_lstm_output_dim = get_int_value("asset_lstm_units2", asset_lstm_units2) # Do kwargs
# Faça o mesmo para TODOS os outros parâmetros que devem ser inteiros e são passados
# para construtores de camadas Keras (cnn_filters, lstm_units, mha_num_heads, etc.)
# Exemplo:
# self.asset_cnn_filters1_val = get_int_value("asset_cnn_filters1", kwargs.get("asset_cnn_filters1"))
except Exception as e_conv:
print(f"ERRO CRÍTICO DE CONVERSÃO DE TIPO no __init__ da DeepPortfolioAgentNetwork: {e_conv}")
raise
print(f"DPN __init__ > self.num_assets APÓS conversão: {self.num_assets}, tipo: {type(self.num_assets)}")
self.num_assets = num_assets
self.sequence_length = sequence_length
self.num_features_per_asset = num_features_per_asset
self.asset_lstm_output_dim = asset_lstm_units2
self.asset_processor = AssetProcessor(
sequence_length=self.sequence_length, num_features=self.num_features_per_asset,
cnn_filters1=asset_cnn_filters1, cnn_filters2=asset_cnn_filters2,
lstm_units1=asset_lstm_units1, lstm_units2=asset_lstm_units2,
dropout_rate=asset_dropout
)
# Ajustar key_dim para ser compatível com a dimensão de entrada e num_heads
# key_dim * num_heads deve ser idealmente igual a asset_lstm_output_dim se for auto-atenção direta,
# ou o MHA projeta internamente. Para simplificar, vamos fazer key_dim ser divisível.
# Se asset_lstm_output_dim não for divisível por num_heads, key_dim pode ser diferente.
# Vamos definir key_dim explicitamente. Se asset_lstm_output_dim = 32 e num_heads = 4, key_dim pode ser 8.
# Ou deixar o MHA lidar com a projeção se key_dim for diferente.
# Para maior clareza, calculamos uma key_dim sensata.
calculated_key_dim = self.asset_lstm_output_dim // mha_key_dim_divisor
if calculated_key_dim == 0: # Evitar key_dim zero
calculated_key_dim = self.asset_lstm_output_dim # Fallback se for muito pequeno
print(f"AVISO: asset_lstm_output_dim ({self.asset_lstm_output_dim}) muito pequeno para mha_key_dim_divisor ({mha_key_dim_divisor}). Usando key_dim = {calculated_key_dim}")
self.attention = MultiHeadAttention(num_heads=mha_num_heads, key_dim=calculated_key_dim, dropout=0.1, name="multi_asset_attention")
self.attention_norm = LayerNormalization(epsilon=1e-6, name="attention_layernorm")
self.global_avg_pool_attention = GlobalAveragePooling1D(name="gap_after_attention")
self.use_sentiment = use_sentiment_analysis # Desabilitado por padrão para este teste
self.sentiment_embedding_size = 3
if self.use_sentiment:
try:
self.tokenizer = AutoTokenizer.from_pretrained('ProsusAI/finbert')
self.sentiment_model = TFAutoModelForSequenceClassification.from_pretrained('ProsusAI/finbert', from_pt=True)
print("Modelo FinBERT carregado para análise de sentimento.")
except Exception as e:
print(f"AVISO: Falha ao carregar FinBERT: {e}. Análise de sentimento será desabilitada.")
self.use_sentiment = False
dense_input_dim = self.use_sentiment
#if self.use_sentiment: dense_input_dim += self.sentiment_embedding_size
self.dense1 = Dense(final_dense_units1, activation='relu', kernel_regularizer=regularizers.l2(L2_REG), name="final_dense1")
self.dropout1 = Dropout(final_dropout, name="final_dropout1")
self.dense2 = Dense(final_dense_units2, activation='relu', kernel_regularizer=regularizers.l2(L2_REG), name="final_dense2")
self.dropout2 = Dropout(final_dropout, name="final_dropout2")
self.output_allocation = Dense(self.num_assets, activation='softmax', name="portfolio_allocation_output")
def call(self, inputs, training=False):
market_data_flat = inputs
print(type(self.num_assets))
asset_representations_list = []
for i in range(self.num_assets):
start_idx = i * self.num_features_per_asset
end_idx = (i + 1) * self.num_features_per_asset
current_asset_data = market_data_flat[:, :, start_idx:end_idx]
processed_asset_representation = self.asset_processor(current_asset_data, training=training)
asset_representations_list.append(processed_asset_representation)
stacked_asset_features = tf.stack(asset_representations_list, axis=1)
# Para MHA, query, value, key são (batch_size, Tq, dim), (batch_size, Tv, dim)
# Aqui, T = num_assets, dim = asset_lstm_output_dim
attention_output = self.attention(
query=stacked_asset_features, value=stacked_asset_features, key=stacked_asset_features,
training=training
)
attention_output = self.attention_norm(stacked_asset_features + attention_output)
context_vector_from_attention = self.global_avg_pool_attention(attention_output)
current_features_for_dense = context_vector_from_attention
# if self.use_sentiment: ... (lógica de concatenação)
x = self.dense1(current_features_for_dense)
x = self.dropout1(x, training=training)
x = self.dense2(x)
x = self.dropout2(x, training=training)
portfolio_weights = self.output_allocation(x)
return portfolio_weights
def get_config(self): # Necessário se você quiser salvar/carregar o modelo que usa este sub-modelo
config = super().get_config()
config.update({
"num_assets": self.num_assets,
"sequence_length": self.sequence_length,
"num_features_per_asset": self.num_features_per_asset,
# Adicione outros args do __init__ aqui para todas as camadas e sub-modelos
"asset_lstm_output_dim": self.asset_lstm_output_dim,
# ... e os parâmetros passados para AssetProcessor e MHA, etc.
})
return config
# @classmethod
# def from_config(cls, config): # Necessário para carregar com sub-modelo customizado
# # Extrair config do AssetProcessor se necessário
# return cls(**config)
if __name__ == '__main__':
print("Testando o Forward Pass do DeepPortfolioAgentNetwork...")
# 1. Definir Parâmetros para o Teste (devem corresponder ao config.py)
batch_size_test = 2 # Um batch pequeno para teste
seq_len_test = WINDOW_SIZE_CONF
num_assets_test = NUM_ASSETS_CONF
num_features_per_asset_test = NUM_FEATURES_PER_ASSET_CONF
total_features_flat = num_assets_test * num_features_per_asset_test
print(f"Configuração do Teste:")
print(f" Batch Size: {batch_size_test}")
print(f" Sequence Length (Window): {seq_len_test}")
print(f" Number of Assets: {num_assets_test}")
print(f" Features per Asset: {num_features_per_asset_test}")
print(f" Total Flat Features per Timestep: {total_features_flat}")
# 2. Criar Tensor de Input Mockado
# Shape: (batch_size, sequence_length, num_assets * num_features_per_asset)
mock_market_data_flat = tf.random.normal(
shape=(batch_size_test, seq_len_test, total_features_flat)
)
print(f"Shape do Input Mockado (market_data_flat): {mock_market_data_flat.shape}")
# 3. Instanciar o Modelo
# Use os mesmos hiperparâmetros que você definiria no config.py para a rede
print("\nInstanciando DeepPortfolioAgentNetwork...")
agent_network = DeepPortfolioAgentNetwork(
num_assets=num_assets_test,
sequence_length=seq_len_test,
num_features_per_asset=num_features_per_asset_test,
# Você pode variar os próximos parâmetros para testar diferentes configs
asset_cnn_filters1=32, asset_cnn_filters2=64,
asset_lstm_units1=64, asset_lstm_units2=32, # asset_lstm_units2 define asset_lstm_output_dim
asset_dropout=0.1,
mha_num_heads=4, mha_key_dim_divisor=4, # Ex: 32 // 4 = 8 para key_dim
final_dense_units1=64, final_dense_units2=32, final_dropout=0.2,
use_sentiment_analysis=False # Testar sem sentimento primeiro
)
# Para construir o modelo e ver o summary, você pode chamar com o input mockado
# ou explicitamente chamar model.build() se souber o input shape completo
# Chamar com input mockado é mais fácil para construir.
print("\nConstruindo o modelo com input mockado (primeira chamada)...")
try:
# É uma boa prática fazer a primeira chamada dentro de um tf.function para otimizar
# ou apenas chamar diretamente para teste.
_ = agent_network(mock_market_data_flat) # Chamada para construir as camadas
print("\n--- Summary da Rede Principal (DeepPortfolioAgentNetwork) ---")
agent_network.summary()
# O summary do asset_processor já foi impresso no __init__ do DeepPortfolioAgentNetwork
# se você descomentar as linhas de build/summary lá.
# Ou você pode imprimir aqui:
print("\n--- Summary do AssetProcessor (Sub-Modelo) ---")
agent_network.asset_processor.summary()
except Exception as e:
print(f"Erro ao construir a rede principal: {e}", exc_info=True)
exit()
# 4. Chamar model(mock_input) para o Forward Pass
print("\nExecutando Forward Pass...")
try:
predictions = agent_network(mock_market_data_flat, training=False) # Passar training=False para inferência
print("Forward Pass concluído com sucesso!")
except Exception as e:
print(f"Erro durante o Forward Pass: {e}", exc_info=True)
exit()
# 5. Verificar o Shape da Saída
print(f"\nShape da Saída (predictions): {predictions.shape}")
expected_output_shape = (batch_size_test, num_assets_test)
if predictions.shape == expected_output_shape:
print(f"Shape da Saída está CORRETO! Esperado: {expected_output_shape}")
else:
print(f"ERRO: Shape da Saída INCORRETO. Esperado: {expected_output_shape}, Obtido: {predictions.shape}")
# Verificar se a saída é uma distribuição de probabilidade (softmax)
if hasattr(predictions, 'numpy'): # Se for um EagerTensor
output_sum = tf.reduce_sum(predictions, axis=-1).numpy()
print(f"Soma das probabilidades de saída por amostra no batch (deve ser próximo de 1): {output_sum}")
if np.allclose(output_sum, 1.0):
print("Saída Softmax parece CORRETA (soma 1).")
else:
print("AVISO: Saída Softmax pode NÃO estar correta (soma diferente de 1).")
print("\nExemplo das primeiras predições (pesos do portfólio):")
print(predictions.numpy()[:min(5, batch_size_test)]) # Imprime até 5 predições do batch
# Teste com sentimento (se implementado e FinBERT carregado)
# agent_network.use_sentiment = True # Ativar para teste
# if agent_network.use_sentiment and hasattr(agent_network, 'tokenizer'):
# print("\nTestando Forward Pass COM SENTIMENTO...")
# mock_news_batch = ["positive news for asset 1", "market is very volatile today"] # Exemplo
# # A forma como você passa 'news' para o call() precisa ser definida.
# # Se for um dicionário:
# # mock_inputs_with_news = {"market_data": mock_market_data_flat, "news_data": mock_news_batch}
# # predictions_with_sentiment = agent_network(mock_inputs_with_news, training=False)
# # print(f"Shape da Saída com Sentimento: {predictions_with_sentiment.shape}")
# else:
# print("\nTeste com sentimento pulado (use_sentiment=False ou FinBERT não carregado).")
print("\nTeste do Forward Pass Concluído!")
|