import gradio as gr import pandas as pd import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import requests import os import time import plotly.graph_objects as go from plotly.subplots import make_subplots from sklearn.preprocessing import StandardScaler # --- 1. CONFIG & SECRETS --- API_KEY = os.getenv("TWELVEDATA_KEY") NTFY_TOPIC = os.getenv("NTFY_TOPIC") TARGET_PAIR = "EUR/USD" SYMBOLS = ["EUR/USD", "GBP/USD", "USD/JPY", "XAU/USD"] TIMEFRAME = "15min" LOOKBACK = 30 # Global State GLOBAL_STATE = { "base_model": None, # The Transformer (Teacher) "shadow_model": None, # The Online Learner (Student) "last_trade": None, "scaler": None, "is_base_trained": False } # --- 2. THE TEACHER: CONSTELLATION TRANSFORMER (Your Original Model) --- class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() self.encoding = nn.Parameter(torch.zeros(1, max_len, d_model)) def forward(self, x): seq_len = x.size(1) return x + self.encoding[:, :seq_len, :] class ConstellationTransformer(nn.Module): def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2, num_gaussians=3): super(ConstellationTransformer, self).__init__() self.embedding = nn.Linear(input_dim, d_model) self.pos_encoder = PositionalEncoding(d_model) encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True, dropout=0.1) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) self.z_pi = nn.Linear(d_model, num_gaussians) self.z_sigma = nn.Linear(d_model, num_gaussians) self.z_mu = nn.Linear(d_model, num_gaussians) nn.init.constant_(self.z_sigma.bias, -2.0) def forward(self, x): x = self.embedding(x) x = self.pos_encoder(x) x = self.transformer(x) context = x[:, -1, :] pi = F.softmax(self.z_pi(context), dim=1) sigma = F.softplus(self.z_sigma(context)) + 1e-6 mu = self.z_mu(context) return pi, sigma, mu def mdn_loss(pi, sigma, mu, y): if y.dim() == 1: y = y.unsqueeze(1) dist = torch.distributions.Normal(loc=mu, scale=sigma) log_prob = dist.log_prob(y) loss = -torch.logsumexp(torch.log(pi + 1e-8) + log_prob, dim=1) return torch.mean(loss) # --- 3. THE STUDENT: ONLINE META LEARNER (New) --- class OnlineMetaLearner(nn.Module): def __init__(self, input_dim=3, hidden_dim=32): super(OnlineMetaLearner, self).__init__() # It takes [Base_Pred, Base_Sigma, Volatility] as input self.net = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.Tanh(), # Tanh is safer for corrections (bound between -1 and 1) nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1) # Outputs the CORRECTION ) # Fast learning rate for "On-the-Spot" adaptation self.optimizer = torch.optim.Adam(self.parameters(), lr=0.01) self.loss_fn = nn.MSELoss() def forward(self, x): return self.net(x) def learn_step(self, x, target_error): self.train() self.optimizer.zero_grad() pred_correction = self.forward(x) loss = self.loss_fn(pred_correction, target_error) loss.backward() self.optimizer.step() return pred_correction.item() # --- 4. DATA PIPELINE --- def get_constellation_data(): if not API_KEY: return None, "❌ Error: TWELVEDATA_KEY missing." dfs = [] for sym in SYMBOLS: url = f"https://api.twelvedata.com/time_series?symbol={sym}&interval={TIMEFRAME}&outputsize=500&apikey={API_KEY}" try: r = requests.get(url).json() if 'values' not in r: continue df = pd.DataFrame(r['values']) df['datetime'] = pd.to_datetime(df['datetime']) df = df.sort_values('datetime').set_index('datetime') df = df[['close']].astype(float) df.rename(columns={'close': sym}, inplace=True) dfs.append(df) time.sleep(0.1) except: pass if not dfs: return None, "❌ Failed to fetch data." master_df = pd.concat(dfs, axis=1).ffill().dropna() return master_df, "✅ Constellation Aligned" def get_events_data(): try: url = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" r = requests.get(url, headers={"User-Agent": "V23/1.0"}, timeout=5) data = r.json() parsed = [] impact_map = {'Low': 1, 'Medium': 2, 'High': 3} for i in data: if i.get('country') in ['EUR', 'USD']: dt = pd.to_datetime(i.get('date'), utc=True).tz_localize(None) imp = impact_map.get(i.get('impact'), 0) parsed.append({'DateTime': dt, 'Impact_Score': imp}) df = pd.DataFrame(parsed) if not df.empty: df = df.sort_values('DateTime').set_index('DateTime') return df except: return pd.DataFrame() def prepare_tensors(master_df, event_df): if not event_df.empty: merged = pd.merge_asof(master_df, event_df, left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta('4 hours')).fillna(0) else: merged = master_df.copy(); merged['Impact_Score'] = 0 merged['Surprise'] = 0.0 feature_cols = [] for sym in SYMBOLS: col_name = f"{sym}_ret" merged[col_name] = merged[sym].pct_change().fillna(0) feature_cols.append(col_name) feature_cols.extend(['Surprise', 'Impact_Score']) scaler = StandardScaler() data_scaled = scaler.fit_transform(merged[feature_cols].values) X_data = [] for i in range(LOOKBACK, len(data_scaled)): X_data.append(data_scaled[i-LOOKBACK:i]) X_tensor = torch.FloatTensor(np.array(X_data)) target_idx = 0 ret_mean = scaler.mean_[target_idx] ret_scale = scaler.scale_[target_idx] ref_prices = merged[TARGET_PAIR].values[LOOKBACK:] return X_tensor, merged.index[LOOKBACK:], ref_prices, ret_mean, ret_scale, data_scaled # --- 5. CORE LOGIC --- def send_ntfy(message): if not NTFY_TOPIC: return try: requests.post(f"https://ntfy.sh/{NTFY_TOPIC}", data=message.encode('utf-8'), headers={"Title": "Hybrid V4", "Priority": "high"}) except: pass def hard_reset(): GLOBAL_STATE["base_model"] = None GLOBAL_STATE["shadow_model"] = None GLOBAL_STATE["is_base_trained"] = False return None, "
♻️ Memory Wiped.
", "Reset." def run_analysis(): log_buffer = [] # 1. Initialize Base Model (Teacher) if GLOBAL_STATE["base_model"] is None: GLOBAL_STATE["base_model"] = ConstellationTransformer(input_dim=6, d_model=64, num_layers=2) log_buffer.append("🧠 Base Transformer Initialized") # 2. Initialize Shadow Model (Student) - Always fresh or persistent? # Let's keep it persistent so it gets smarter over time, but reset if hard_reset called if GLOBAL_STATE["shadow_model"] is None: GLOBAL_STATE["shadow_model"] = OnlineMetaLearner(input_dim=3) log_buffer.append("👻 Shadow Learner Initialized") base_model = GLOBAL_STATE["base_model"] shadow_model = GLOBAL_STATE["shadow_model"] # 3. Data master_df, msg = get_constellation_data() if master_df is None: return None, msg, msg event_df = get_events_data() X_tensor, dates, ref_prices, ret_mean, ret_std, raw_features = prepare_tensors(master_df, event_df) # 4. Train Base Model (The "Pre-Knowledge") # We KEEP this to prevent the "Drunk" zig-zags. The Base Model must be smart first. if not GLOBAL_STATE["is_base_trained"]: log_buffer.append("⚙️ Training Base Transformer (50 Epochs)...") optimizer = torch.optim.Adam(base_model.parameters(), lr=0.005) base_model.train() train_X = X_tensor[:-1] actual_returns = np.diff(ref_prices) / ref_prices[:-1] actual_returns_scaled = (actual_returns - ret_mean) / ret_std train_y = torch.FloatTensor(actual_returns_scaled).unsqueeze(1) train_X = train_X[:len(train_y)] dataset = torch.utils.data.TensorDataset(train_X, train_y) loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True) for epoch in range(50): for batch_X, batch_y in loader: optimizer.zero_grad() pi, sigma, mu = base_model(batch_X) loss = mdn_loss(pi, sigma, mu, batch_y) loss.backward() optimizer.step() GLOBAL_STATE["is_base_trained"] = True log_buffer.append("✅ Base Calibration Complete.") # 5. HYBRID INFERENCE LOOP # We now run through the data again. # Base Model predicts -> Shadow Model corrects -> Weights update -> Next candle log_buffer.append("🧬 Running Online Adaptation Loop...") final_preds = [] base_preds = [] corrections = [] base_model.eval() # We loop through history to let the Shadow Model "learn" the Base Model's weaknesses loop_len = len(X_tensor) - 1 for i in range(loop_len): # A. Base Model Prediction (Frozen weights here) with torch.no_grad(): inp = X_tensor[i].unsqueeze(0) pi, sigma, mu = base_model(inp) max_idx = torch.argmax(pi, dim=1) base_ret = mu[0, max_idx].item() base_sigma = sigma[0, max_idx].item() # B. Prepare Shadow Input # [Base_Prediction, Base_Confidence, Volatility] # Volatility is approx from raw features (feature 0 is EURUSD ret) vol = abs(raw_features[LOOKBACK+i][0]) shadow_in = torch.tensor([[base_ret, base_sigma, vol]], dtype=torch.float32) # C. Shadow Prediction (Correction) # Note: We call forward(), not learn_step() yet because we don't know the future with torch.no_grad(): correction = shadow_model(shadow_in).item() final_ret = base_ret + correction # D. Get Real Next Value real_ret_raw = (ref_prices[i+1] - ref_prices[i]) / ref_prices[i] real_ret_scaled = (real_ret_raw - ret_mean) / ret_std # E. Calculate Error for Shadow # The target for the Shadow is: "What should I have added to Base to make it perfect?" target_correction = real_ret_scaled - base_ret target_tensor = torch.tensor([[target_correction]], dtype=torch.float32) # F. LEARN ON THE SPOT shadow_model.learn_step(shadow_in, target_tensor) base_preds.append(base_ret) corrections.append(correction) final_preds.append(final_ret) # 6. Reconstruction & Plotting plot_dates = dates[1:1+len(final_preds)] plot_actual = ref_prices[1:1+len(final_preds)] # Reconstruct prices from returns pred_prices_base = [] pred_prices_final = [] for k in range(len(final_preds)): prev_p = ref_prices[k] # Use actual previous to prevent drift # Base b_ret = (base_preds[k] * ret_std) + ret_mean pred_prices_base.append(prev_p * (1 + b_ret)) # Final f_ret = (final_preds[k] * ret_std) + ret_mean pred_prices_final.append(prev_p * (1 + f_ret)) df = pd.DataFrame({ 'Close': plot_actual, 'Base': pred_prices_base, 'Final': pred_prices_final, 'Correction': corrections }, index=plot_dates) # Z-Score df['Gap'] = df['Final'] - df['Close'] df['Z'] = (df['Gap'] - df['Gap'].rolling(50).mean()) / (df['Gap'].rolling(50).std() + 1e-9) if len(df) > 0: last_z = df['Z'].iloc[-1] last_p = df['Close'].iloc[-1] status = "NEUTRAL" color = "gray" if last_z > 2.0: status, color = "BUY SIGNAL", "green" if last_z < -2.0: status, color = "SELL SIGNAL", "red" # Check notification if "SIGNAL" in status and GLOBAL_STATE["last_trade"] != status: send_ntfy(f"{status} EURUSD | Z: {last_z:.2f}") GLOBAL_STATE["last_trade"] = status fig = make_subplots(rows=3, cols=1, shared_xaxes=True, row_heights=[0.5, 0.25, 0.25], subplot_titles=("Hybrid Price Model", "AI Correction (Shadow)", "Divergence")) # 1. Price fig.add_trace(go.Scatter(x=df.index, y=df['Close'], name='Price', line=dict(color='gray')), row=1, col=1) fig.add_trace(go.Scatter(x=df.index, y=df['Base'], name='Base Transformer', line=dict(color='cyan', dash='dot')), row=1, col=1) fig.add_trace(go.Scatter(x=df.index, y=df['Final'], name='Adapted (Shadow)', line=dict(color='yellow', width=2)), row=1, col=1) # 2. Correction fig.add_trace(go.Bar(x=df.index, y=df['Correction'], name='Learned Correction', marker_color='purple'), row=2, col=1) # 3. Z fig.add_trace(go.Bar(x=df.index, y=df['Z'], name='Z-Score', marker_color=df['Z'].apply(lambda x: 'green' if x>0 else 'red')), row=3, col=1) fig.add_hline(y=2, line_dash="dot", row=3, col=1); fig.add_hline(y=-2, line_dash="dot", row=3, col=1) fig.update_layout(template="plotly_dark", height=800, title=f"Hybrid V4: {status}") info = f"

{status}

Z: {last_z:.3f}
" return fig, info, "\n".join(log_buffer) return None, "No Data", "Wait" # --- 6. UI --- with gr.Blocks(title="Hybrid V4") as app: gr.Markdown("# 👁️ Hybrid V4: Transformer + Shadow Learner") with gr.Row(): r = gr.Button("🔄 Scan", variant="primary") w = gr.Button("⚠️ Wipe", variant="stop") s = gr.HTML() p = gr.Plot() l = gr.Textbox() r.click(run_analysis, outputs=[p, s, l]) w.click(hard_reset, outputs=[p, s, l]) app.load(run_analysis, outputs=[p, s, l]) if __name__ == "__main__": app.launch(ssr_mode=False)