import gradio as gr import pandas as pd import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import requests import os import plotly.graph_objects as go from sklearn.preprocessing import StandardScaler # --- 1. CONFIG & SECRETS --- API_KEY = os.getenv("TWELVEDATA_KEY") NTFY_TOPIC = os.getenv("NTFY_TOPIC") PAIR = "EUR/USD" TIMEFRAME = "15min" LOOKBACK = 30 # Global State GLOBAL_STATE = { "model": None, "last_trade": None, "scaler": None, # Saved scaler for returns "is_trained": False } # --- 2. THE MODEL ARCHITECTURE (GRU V2) --- class MacroMDN(nn.Module): def __init__(self, input_dim, hidden_dim=64, num_gaussians=3): super(MacroMDN, self).__init__() self.ln_in = nn.LayerNorm(input_dim) self.gru = nn.GRU(input_dim, hidden_dim, num_layers=1, batch_first=True, dropout=0.1) self.ln_out = nn.LayerNorm(hidden_dim) self.attention = nn.Linear(hidden_dim, 1) self.z_pi = nn.Linear(hidden_dim, num_gaussians) self.z_sigma = nn.Linear(hidden_dim, num_gaussians) self.z_mu = nn.Linear(hidden_dim, num_gaussians) nn.init.constant_(self.z_sigma.bias, -2.0) # Lower start variance def forward(self, x): x = self.ln_in(x) gru_out, _ = self.gru(x) gru_out = self.ln_out(gru_out) attn_weights = F.softmax(self.attention(gru_out), dim=1) context = torch.sum(attn_weights * gru_out, dim=1) pi = F.softmax(self.z_pi(context), dim=1) sigma = F.softplus(self.z_sigma(context)) + 1e-6 mu = self.z_mu(context) return pi, sigma, mu def mdn_loss(pi, sigma, mu, y): if y.dim() == 1: y = y.unsqueeze(1) dist = torch.distributions.Normal(loc=mu, scale=sigma) log_prob = dist.log_prob(y) loss = -torch.logsumexp(torch.log(pi + 1e-8) + log_prob, dim=1) return torch.mean(loss) # --- 3. DATA PIPELINE (STATIONARY MODE) --- def get_forex_data(): if not API_KEY: return None, "❌ Error: TWELVEDATA_KEY missing." url = f"https://api.twelvedata.com/time_series?symbol={PAIR}&interval={TIMEFRAME}&outputsize=500&apikey={API_KEY}" try: r = requests.get(url).json() if 'values' not in r: return None, f"❌ API Error: {r.get('message', 'Unknown')}" df = pd.DataFrame(r['values']) df['datetime'] = pd.to_datetime(df['datetime']) df = df.sort_values('datetime').set_index('datetime') df = df[['open', 'high', 'low', 'close']].astype(float) return df, "✅ Data Fetched" except Exception as e: return None, str(e) def get_events_data(): try: url = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" r = requests.get(url, headers={"User-Agent": "V23/1.0"}, timeout=5) data = r.json() parsed = [] impact_map = {'Low': 1, 'Medium': 2, 'High': 3} for i in data: if i.get('country') in ['EUR', 'USD']: dt = pd.to_datetime(i.get('date'), utc=True).tz_localize(None) imp = impact_map.get(i.get('impact'), 0) parsed.append({'DateTime': dt, 'Impact_Score': imp}) df = pd.DataFrame(parsed) if not df.empty: df = df.sort_values('DateTime').set_index('DateTime') return df except: return pd.DataFrame() def prepare_tensors(price_df, event_df): # 1. Merge Events if not event_df.empty: merged = pd.merge_asof(price_df, event_df, left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta('4 hours')).fillna(0) else: merged = price_df.copy() merged['Impact_Score'] = 0 merged['Surprise'] = 0.0 # 2. CALCULATE RETURNS (STATIONARITY FIX) # Instead of raw price, we use % change. This kills the "Anchoring Bias". merged['ret_close'] = merged['close'].pct_change().fillna(0) merged['ret_open'] = merged['open'].pct_change().fillna(0) merged['ret_high'] = merged['high'].pct_change().fillna(0) merged['ret_low'] = merged['low'].pct_change().fillna(0) # 3. Scale Returns (They are small, so we multiply or scale) scaler = StandardScaler() feature_cols = ['ret_open', 'ret_high', 'ret_low', 'ret_close', 'Surprise', 'Impact_Score'] # Fit transform data_scaled = scaler.fit_transform(merged[feature_cols].values) # 4. Sliding Window X_data = [] for i in range(LOOKBACK, len(data_scaled)): X_data.append(data_scaled[i-LOOKBACK:i]) X_tensor = torch.FloatTensor(np.array(X_data)) # Return extra info to reconstruct price later # We need the 'close' prices corresponding to the end of each window to calculate the next step reference_prices = merged['close'].values[LOOKBACK:] # We also need the scaler stats for the 'ret_close' column (index 3) to unscale predictions ret_mean = scaler.mean_[3] ret_scale = scaler.scale_[3] return X_tensor, merged.index[LOOKBACK:], reference_prices, ret_mean, ret_scale # --- 4. CORE LOGIC --- def send_ntfy(message): if not NTFY_TOPIC: return try: requests.post(f"https://ntfy.sh/{NTFY_TOPIC}", data=message.encode('utf-8'), headers={"Title": "Holographic AI", "Priority": "high"}) except: pass def hard_reset(): GLOBAL_STATE["model"] = None GLOBAL_STATE["is_trained"] = False return None, "
♻️ RESET DONE. Click Refresh.
", "Reset." def run_analysis(): log_buffer = [] # Init Model if GLOBAL_STATE["model"] is None: GLOBAL_STATE["model"] = MacroMDN(input_dim=6, hidden_dim=64, num_gaussians=3) log_buffer.append("🧠 Model Initialized (V3 Relative)") model = GLOBAL_STATE["model"] # Get Data price_df, msg = get_forex_data() if price_df is None: return None, msg, msg event_df = get_events_data() # PREPARE DATA (RETURNS MODE) X_tensor, dates, ref_prices, ret_mean, ret_std = prepare_tensors(price_df, event_df) # --- CALIBRATION (Training on RETURNS) --- if not GLOBAL_STATE["is_trained"]: log_buffer.append("⚙️ Calibrating on Volatility...") optimizer = torch.optim.Adam(model.parameters(), lr=0.005) model.train() # Target: The NEXT return (shift -1) # X[t] uses history to predict Return[t+1] # We align X and y train_X = X_tensor[:-1] # Calculate actual returns from reference prices for y # ref_prices[i] is Close at time t. # We want (Close[t+1] - Close[t]) / Close[t] -> This is embedded in the next step's scaled data # Simpler: Just use the scaled 'ret_close' from the X tensor generation? # No, let's recalculate from prices to be safe and explicit. actual_returns = np.diff(ref_prices) / ref_prices[:-1] # Normalize target same as input was normalized actual_returns_scaled = (actual_returns - ret_mean) / ret_std train_y = torch.FloatTensor(actual_returns_scaled).unsqueeze(1) # Truncate X to match y length (y is 1 shorter due to diff) train_X = train_X[:len(train_y)] dataset = torch.utils.data.TensorDataset(train_X, train_y) loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True) for epoch in range(50): for batch_X, batch_y in loader: optimizer.zero_grad() pi, sigma, mu = model(batch_X) loss = mdn_loss(pi, sigma, mu, batch_y) loss.backward() optimizer.step() GLOBAL_STATE["is_trained"] = True log_buffer.append("✅ Calibration Complete.") # --- INFERENCE --- model.eval() with torch.no_grad(): pi, sigma, mu = model(X_tensor) max_idx = torch.argmax(pi, dim=1) pred_mu = mu[torch.arange(len(mu)), max_idx].numpy() pred_sigma = sigma[torch.arange(len(sigma)), max_idx].numpy() # --- RECONSTRUCTION (The Magic) --- # 1. Unscale the predicted RETURN pred_ret = (pred_mu * ret_std) + ret_mean pred_ret_sigma = pred_sigma * ret_std # 2. Apply Return to Previous Price to get Target Price # Pred_Price[t] = Price[t-1] * (1 + Pred_Ret[t]) # We shift ref_prices to match alignment. # X_tensor[i] is data up to time t. It predicts t+1. # So we apply prediction to ref_prices[i] (Price at time t). prev_prices = ref_prices # Price at end of window pred_prices = prev_prices * (1 + pred_ret) # Calculate Cloud Levels based on Price # Sigma is in % terms. So 2*Sigma is e.g. 0.002 (0.2%) # Upper = Price * (1 + Mean + 2*Sigma) upper_band = prev_prices * (1 + pred_ret + (2 * pred_ret_sigma)) lower_band = prev_prices * (1 + pred_ret - (2 * pred_ret_sigma)) # DataFrame for Plotting # We align dates. pred_prices[i] is prediction for dates[i] + 1 step? # Actually, ref_prices[i] corresponds to dates[i]. # The prediction is for the NEXT candle. # For plotting, let's visualize the "Fitted Path" vs Actual. # To align visually: Predicted Price at time T should constitute the "Model's belief for time T" # generated at T-1. # Shift for alignment: # pred_prices[i] is the prediction made using data UP TO i. So it matches i+1. # We plot pred_prices[:-1] vs ref_prices[1:] plot_dates = dates[1:] plot_actual = ref_prices[1:] plot_pred = pred_prices[:-1] plot_upper = upper_band[:-1] plot_lower = lower_band[:-1] plot_sigma = pred_ret_sigma[:-1] df = pd.DataFrame({ 'Close': plot_actual, 'Pred': plot_pred, 'Upper': plot_upper, 'Lower': plot_lower, 'Sigma': plot_sigma }, index=plot_dates) # Z-Score (Now Calculated on PRICE GAP) df['Gap'] = df['Pred'] - df['Close'] # Convert Sigma to Price terms for Z calc: Price * Sigma_Pct df['Price_Sigma'] = df['Close'] * df['Sigma'] df['Raw_Z'] = df['Gap'] / (df['Price_Sigma'] + 1e-9) df['Rolling_Z'] = df['Raw_Z'] - df['Raw_Z'].rolling(window=50, min_periods=1).mean() if len(df) > 0: last_z = df['Rolling_Z'].iloc[-1] last_price = df['Close'].iloc[-1] status = "WAIT" color = "gray" if last_z > 1.8: status = "BUY SIGNAL" color = "green" if GLOBAL_STATE["last_trade"] != "BUY": send_ntfy(f"BUY EURUSD | Z: {last_z:.2f} | Price: {last_price}") GLOBAL_STATE["last_trade"] = "BUY" elif last_z < -1.8: status = "SELL SIGNAL" color = "red" if GLOBAL_STATE["last_trade"] != "SELL": send_ntfy(f"SELL EURUSD | Z: {last_z:.2f} | Price: {last_price}") GLOBAL_STATE["last_trade"] = "SELL" fig = go.Figure() fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Price', line=dict(color='rgba(255, 255, 255, 0.5)'))) fig.add_trace(go.Scatter(x=df.index, y=df['Upper'], mode='lines', line=dict(width=0), showlegend=False)) fig.add_trace(go.Scatter(x=df.index, y=df['Lower'], mode='lines', line=dict(width=0), fill='tonexty', fillcolor='rgba(0, 255, 255, 0.1)', name='Liquidity Cloud')) fig.add_trace(go.Scatter(x=df.index, y=df['Pred'], mode='lines', name='AI Path', line=dict(color='#00ffff', width=2))) fig.update_layout(template="plotly_dark", title=f"Holographic FX (V3 Returns): {status} (Z: {last_z:.2f})", height=600) info_html = f"""

{status}

Z: {last_z:.3f} | Price: {last_price}

""" return fig, info_html, "\n".join(log_buffer) else: return None, "No Data", "Wait..." # --- 5. UI --- with gr.Blocks(title="Holographic FX Core", theme=gr.themes.Monochrome()) as app: gr.Markdown("# 👁️ Holographic Liquidity Regime (V3 Stationary)") with gr.Row(): refresh_btn = gr.Button("🔄 Refresh", variant="primary") reset_btn = gr.Button("⚠️ HARD RESET", variant="stop") with gr.Row(): status_box = gr.HTML() plot = gr.Plot() logs = gr.Textbox(label="Logs") refresh_btn.click(fn=run_analysis, outputs=[plot, status_box, logs]) reset_btn.click(fn=hard_reset, outputs=[plot, status_box, logs]) app.load(fn=run_analysis, outputs=[plot, status_box, logs]) if __name__ == "__main__": app.launch()