File size: 5,136 Bytes
22e8f32
6b882ab
 
 
 
22e8f32
 
f0300e9
 
7928ce2
22e8f32
 
40ed09a
 
 
7928ce2
40ed09a
 
f0300e9
6b882ab
f0300e9
 
 
 
 
22e8f32
40ed09a
7928ce2
6b882ab
7928ce2
 
 
 
6b882ab
 
 
 
7928ce2
6b882ab
 
 
7928ce2
 
 
6b882ab
 
7928ce2
6b882ab
 
7928ce2
 
 
6b882ab
 
 
 
7928ce2
 
6b882ab
 
7928ce2
6b882ab
7928ce2
 
6b882ab
7928ce2
 
 
 
 
6b882ab
 
 
7928ce2
6b882ab
7928ce2
 
40ed09a
6b882ab
 
22e8f32
6b882ab
 
40ed09a
6b882ab
 
22e8f32
40ed09a
6b882ab
 
 
 
 
 
 
40ed09a
6b882ab
 
40ed09a
6b882ab
40ed09a
6b882ab
 
 
 
40ed09a
6b882ab
 
40ed09a
22e8f32
40ed09a
6b882ab
 
 
 
40ed09a
22e8f32
 
 
6b882ab
 
22e8f32
 
c91df12
 
6b882ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
5 Independent Binary TFT Classifiers β€” one per ETF.
Each model answers: "Will this ETF beat the risk-free rate over the next 5 days?"
At inference time, ETFs are ranked by their YES probability.
This avoids the regime-lock problem of 5-class softmax.
"""

import random
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Dense, Dropout, LayerNormalization,
    MultiHeadAttention, GlobalAveragePooling1D,
    Multiply, Add, Activation, Conv1D
)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# ── Fixed random seed ────────────────────────────────────────────────────────
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)


def grn_block(x, units, dropout_rate=0.15):
    """Gated Residual Network β€” works on 2D and 3D tensors."""
    if x.shape[-1] != units:
        residual = Dense(units, use_bias=False)(x)
    else:
        residual = x
    h    = Dense(units)(x)
    h    = Activation('elu')(h)
    h    = Dense(units)(h)
    h    = Dropout(dropout_rate)(h)
    gate = Dense(units, activation='sigmoid')(x)
    h    = Multiply()([h, gate])
    out  = Add()([residual, h])
    out  = LayerNormalization(epsilon=1e-6)(out)
    return out


def build_binary_tft(seq_len, num_features, units=64, num_heads=4,
                     num_attn_layers=2, dropout_rate=0.15):
    """
    Binary TFT classifier: outputs P(ETF beats cash over next 5 days).
    Output: single sigmoid neuron β†’ probability in [0, 1].
    """
    inputs = Input(shape=(seq_len, num_features), name='input')

    # Feature gating
    proj  = Dense(units)(inputs)
    gate  = Dense(units, activation='sigmoid')(inputs)
    x     = Multiply()([proj, gate])
    x     = LayerNormalization(epsilon=1e-6)(x)

    # Local temporal patterns
    x = Conv1D(units, kernel_size=3, padding='causal', activation='relu')(x)

    # GRN
    x = grn_block(x, units, dropout_rate)

    # Positional encoding
    positions = np.arange(seq_len).reshape(-1, 1).astype(np.float32)
    dims      = np.arange(0, units, 2).astype(np.float32)
    angles    = positions / np.power(10000.0, dims / units)
    sin_enc   = np.sin(angles)
    cos_enc   = np.cos(angles)
    pos_enc   = np.concatenate([sin_enc, cos_enc if units % 2 == 0
                                 else cos_enc[:, :-1]], axis=-1)
    x = x + pos_enc[np.newaxis].astype(np.float32)

    # Stacked attention
    key_dim = max(1, units // num_heads)
    for i in range(num_attn_layers):
        attn = MultiHeadAttention(
            num_heads=num_heads, key_dim=key_dim,
            dropout=dropout_rate, name=f'attn_{i}'
        )(x, x)
        x = LayerNormalization(epsilon=1e-6)(x + Dropout(dropout_rate)(attn))
        x = grn_block(x, units, dropout_rate)

    x = GlobalAveragePooling1D()(x)
    x = grn_block(x, units // 2, dropout_rate, )
    x = Dropout(dropout_rate)(x)

    # Binary output
    output = Dense(1, activation='sigmoid', name='beat_cash_prob')(x)

    return Model(inputs=inputs, outputs=output, name='BinaryTFT')


def train_binary_tft(X_train, y_train, X_val, y_val, etf_name="ETF", epochs=150):
    """
    Train one binary TFT for a single ETF.
    y_train/y_val: 1D array of 0/1 (1 = ETF beat cash over next 5 days).
    """
    random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

    model = build_binary_tft(
        seq_len=X_train.shape[1],
        num_features=X_train.shape[2]
    )
    model.compile(
        optimizer=tf.keras.optimizers.Adam(5e-4),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=20,
                      restore_best_weights=True, verbose=0),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                          patience=8, min_lr=1e-5, verbose=0)
    ]
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs, batch_size=64,
        callbacks=callbacks, verbose=0
    )
    return model, history


def train_all_binary_tfts(X_train, y_train_matrix, X_val, y_val_matrix,
                           etf_names, epochs=150):
    """
    Train one binary TFT per ETF.
    y_train_matrix: shape (N, n_etfs) β€” each column is 0/1 for one ETF.
    Returns list of trained models.
    """
    models = []
    histories = []
    for j, name in enumerate(etf_names):
        m, h = train_binary_tft(
            X_train, y_train_matrix[:, j],
            X_val,   y_val_matrix[:, j],
            etf_name=name, epochs=epochs
        )
        models.append(m)
        histories.append(h)
    return models, histories


def predict_binary_tfts(models, X_test):
    """
    Run inference for all binary models.
    Returns (N, n_etfs) array of P(beat cash) for each ETF.
    """
    probs = np.hstack([m.predict(X_test, verbose=0) for m in models])
    return probs  # shape (N, n_etfs)