Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import requests | |
| import os | |
| import plotly.graph_objects as go | |
| from sklearn.preprocessing import StandardScaler | |
| # --- 1. CONFIG & SECRETS --- | |
| API_KEY = os.getenv("TWELVEDATA_KEY") | |
| NTFY_TOPIC = os.getenv("NTFY_TOPIC") | |
| PAIR = "EUR/USD" | |
| TIMEFRAME = "15min" | |
| LOOKBACK = 30 | |
| # Global State | |
| GLOBAL_STATE = { | |
| "model": None, | |
| "last_trade": None, | |
| "scaler": None, # Saved scaler for returns | |
| "is_trained": False | |
| } | |
| # --- 2. THE MODEL ARCHITECTURE (GRU V2) --- | |
| class MacroMDN(nn.Module): | |
| def __init__(self, input_dim, hidden_dim=64, num_gaussians=3): | |
| super(MacroMDN, self).__init__() | |
| self.ln_in = nn.LayerNorm(input_dim) | |
| self.gru = nn.GRU(input_dim, hidden_dim, num_layers=1, batch_first=True, dropout=0.1) | |
| self.ln_out = nn.LayerNorm(hidden_dim) | |
| self.attention = nn.Linear(hidden_dim, 1) | |
| self.z_pi = nn.Linear(hidden_dim, num_gaussians) | |
| self.z_sigma = nn.Linear(hidden_dim, num_gaussians) | |
| self.z_mu = nn.Linear(hidden_dim, num_gaussians) | |
| nn.init.constant_(self.z_sigma.bias, -2.0) # Lower start variance | |
| def forward(self, x): | |
| x = self.ln_in(x) | |
| gru_out, _ = self.gru(x) | |
| gru_out = self.ln_out(gru_out) | |
| attn_weights = F.softmax(self.attention(gru_out), dim=1) | |
| context = torch.sum(attn_weights * gru_out, dim=1) | |
| pi = F.softmax(self.z_pi(context), dim=1) | |
| sigma = F.softplus(self.z_sigma(context)) + 1e-6 | |
| mu = self.z_mu(context) | |
| return pi, sigma, mu | |
| def mdn_loss(pi, sigma, mu, y): | |
| if y.dim() == 1: y = y.unsqueeze(1) | |
| dist = torch.distributions.Normal(loc=mu, scale=sigma) | |
| log_prob = dist.log_prob(y) | |
| loss = -torch.logsumexp(torch.log(pi + 1e-8) + log_prob, dim=1) | |
| return torch.mean(loss) | |
| # --- 3. DATA PIPELINE (STATIONARY MODE) --- | |
| def get_forex_data(): | |
| if not API_KEY: return None, "❌ Error: TWELVEDATA_KEY missing." | |
| url = f"https://api.twelvedata.com/time_series?symbol={PAIR}&interval={TIMEFRAME}&outputsize=500&apikey={API_KEY}" | |
| try: | |
| r = requests.get(url).json() | |
| if 'values' not in r: return None, f"❌ API Error: {r.get('message', 'Unknown')}" | |
| df = pd.DataFrame(r['values']) | |
| df['datetime'] = pd.to_datetime(df['datetime']) | |
| df = df.sort_values('datetime').set_index('datetime') | |
| df = df[['open', 'high', 'low', 'close']].astype(float) | |
| return df, "✅ Data Fetched" | |
| except Exception as e: return None, str(e) | |
| def get_events_data(): | |
| try: | |
| url = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" | |
| r = requests.get(url, headers={"User-Agent": "V23/1.0"}, timeout=5) | |
| data = r.json() | |
| parsed = [] | |
| impact_map = {'Low': 1, 'Medium': 2, 'High': 3} | |
| for i in data: | |
| if i.get('country') in ['EUR', 'USD']: | |
| dt = pd.to_datetime(i.get('date'), utc=True).tz_localize(None) | |
| imp = impact_map.get(i.get('impact'), 0) | |
| parsed.append({'DateTime': dt, 'Impact_Score': imp}) | |
| df = pd.DataFrame(parsed) | |
| if not df.empty: df = df.sort_values('DateTime').set_index('DateTime') | |
| return df | |
| except: return pd.DataFrame() | |
| def prepare_tensors(price_df, event_df): | |
| # 1. Merge Events | |
| if not event_df.empty: | |
| merged = pd.merge_asof(price_df, event_df, left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta('4 hours')).fillna(0) | |
| else: | |
| merged = price_df.copy() | |
| merged['Impact_Score'] = 0 | |
| merged['Surprise'] = 0.0 | |
| # 2. CALCULATE RETURNS (STATIONARITY FIX) | |
| # Instead of raw price, we use % change. This kills the "Anchoring Bias". | |
| merged['ret_close'] = merged['close'].pct_change().fillna(0) | |
| merged['ret_open'] = merged['open'].pct_change().fillna(0) | |
| merged['ret_high'] = merged['high'].pct_change().fillna(0) | |
| merged['ret_low'] = merged['low'].pct_change().fillna(0) | |
| # 3. Scale Returns (They are small, so we multiply or scale) | |
| scaler = StandardScaler() | |
| feature_cols = ['ret_open', 'ret_high', 'ret_low', 'ret_close', 'Surprise', 'Impact_Score'] | |
| # Fit transform | |
| data_scaled = scaler.fit_transform(merged[feature_cols].values) | |
| # 4. Sliding Window | |
| X_data = [] | |
| for i in range(LOOKBACK, len(data_scaled)): | |
| X_data.append(data_scaled[i-LOOKBACK:i]) | |
| X_tensor = torch.FloatTensor(np.array(X_data)) | |
| # Return extra info to reconstruct price later | |
| # We need the 'close' prices corresponding to the end of each window to calculate the next step | |
| reference_prices = merged['close'].values[LOOKBACK:] | |
| # We also need the scaler stats for the 'ret_close' column (index 3) to unscale predictions | |
| ret_mean = scaler.mean_[3] | |
| ret_scale = scaler.scale_[3] | |
| return X_tensor, merged.index[LOOKBACK:], reference_prices, ret_mean, ret_scale | |
| # --- 4. CORE LOGIC --- | |
| def send_ntfy(message): | |
| if not NTFY_TOPIC: return | |
| try: | |
| requests.post(f"https://ntfy.sh/{NTFY_TOPIC}", data=message.encode('utf-8'), headers={"Title": "Holographic AI", "Priority": "high"}) | |
| except: pass | |
| def hard_reset(): | |
| GLOBAL_STATE["model"] = None | |
| GLOBAL_STATE["is_trained"] = False | |
| return None, "<div>♻️ RESET DONE. Click Refresh.</div>", "Reset." | |
| def run_analysis(): | |
| log_buffer = [] | |
| # Init Model | |
| if GLOBAL_STATE["model"] is None: | |
| GLOBAL_STATE["model"] = MacroMDN(input_dim=6, hidden_dim=64, num_gaussians=3) | |
| log_buffer.append("🧠 Model Initialized (V3 Relative)") | |
| model = GLOBAL_STATE["model"] | |
| # Get Data | |
| price_df, msg = get_forex_data() | |
| if price_df is None: return None, msg, msg | |
| event_df = get_events_data() | |
| # PREPARE DATA (RETURNS MODE) | |
| X_tensor, dates, ref_prices, ret_mean, ret_std = prepare_tensors(price_df, event_df) | |
| # --- CALIBRATION (Training on RETURNS) --- | |
| if not GLOBAL_STATE["is_trained"]: | |
| log_buffer.append("⚙️ Calibrating on Volatility...") | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.005) | |
| model.train() | |
| # Target: The NEXT return (shift -1) | |
| # X[t] uses history to predict Return[t+1] | |
| # We align X and y | |
| train_X = X_tensor[:-1] | |
| # Calculate actual returns from reference prices for y | |
| # ref_prices[i] is Close at time t. | |
| # We want (Close[t+1] - Close[t]) / Close[t] -> This is embedded in the next step's scaled data | |
| # Simpler: Just use the scaled 'ret_close' from the X tensor generation? | |
| # No, let's recalculate from prices to be safe and explicit. | |
| actual_returns = np.diff(ref_prices) / ref_prices[:-1] | |
| # Normalize target same as input was normalized | |
| actual_returns_scaled = (actual_returns - ret_mean) / ret_std | |
| train_y = torch.FloatTensor(actual_returns_scaled).unsqueeze(1) | |
| # Truncate X to match y length (y is 1 shorter due to diff) | |
| train_X = train_X[:len(train_y)] | |
| dataset = torch.utils.data.TensorDataset(train_X, train_y) | |
| loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True) | |
| for epoch in range(50): | |
| for batch_X, batch_y in loader: | |
| optimizer.zero_grad() | |
| pi, sigma, mu = model(batch_X) | |
| loss = mdn_loss(pi, sigma, mu, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| GLOBAL_STATE["is_trained"] = True | |
| log_buffer.append("✅ Calibration Complete.") | |
| # --- INFERENCE --- | |
| model.eval() | |
| with torch.no_grad(): | |
| pi, sigma, mu = model(X_tensor) | |
| max_idx = torch.argmax(pi, dim=1) | |
| pred_mu = mu[torch.arange(len(mu)), max_idx].numpy() | |
| pred_sigma = sigma[torch.arange(len(sigma)), max_idx].numpy() | |
| # --- RECONSTRUCTION (The Magic) --- | |
| # 1. Unscale the predicted RETURN | |
| pred_ret = (pred_mu * ret_std) + ret_mean | |
| pred_ret_sigma = pred_sigma * ret_std | |
| # 2. Apply Return to Previous Price to get Target Price | |
| # Pred_Price[t] = Price[t-1] * (1 + Pred_Ret[t]) | |
| # We shift ref_prices to match alignment. | |
| # X_tensor[i] is data up to time t. It predicts t+1. | |
| # So we apply prediction to ref_prices[i] (Price at time t). | |
| prev_prices = ref_prices # Price at end of window | |
| pred_prices = prev_prices * (1 + pred_ret) | |
| # Calculate Cloud Levels based on Price | |
| # Sigma is in % terms. So 2*Sigma is e.g. 0.002 (0.2%) | |
| # Upper = Price * (1 + Mean + 2*Sigma) | |
| upper_band = prev_prices * (1 + pred_ret + (2 * pred_ret_sigma)) | |
| lower_band = prev_prices * (1 + pred_ret - (2 * pred_ret_sigma)) | |
| # DataFrame for Plotting | |
| # We align dates. pred_prices[i] is prediction for dates[i] + 1 step? | |
| # Actually, ref_prices[i] corresponds to dates[i]. | |
| # The prediction is for the NEXT candle. | |
| # For plotting, let's visualize the "Fitted Path" vs Actual. | |
| # To align visually: Predicted Price at time T should constitute the "Model's belief for time T" | |
| # generated at T-1. | |
| # Shift for alignment: | |
| # pred_prices[i] is the prediction made using data UP TO i. So it matches i+1. | |
| # We plot pred_prices[:-1] vs ref_prices[1:] | |
| plot_dates = dates[1:] | |
| plot_actual = ref_prices[1:] | |
| plot_pred = pred_prices[:-1] | |
| plot_upper = upper_band[:-1] | |
| plot_lower = lower_band[:-1] | |
| plot_sigma = pred_ret_sigma[:-1] | |
| df = pd.DataFrame({ | |
| 'Close': plot_actual, | |
| 'Pred': plot_pred, | |
| 'Upper': plot_upper, | |
| 'Lower': plot_lower, | |
| 'Sigma': plot_sigma | |
| }, index=plot_dates) | |
| # Z-Score (Now Calculated on PRICE GAP) | |
| df['Gap'] = df['Pred'] - df['Close'] | |
| # Convert Sigma to Price terms for Z calc: Price * Sigma_Pct | |
| df['Price_Sigma'] = df['Close'] * df['Sigma'] | |
| df['Raw_Z'] = df['Gap'] / (df['Price_Sigma'] + 1e-9) | |
| df['Rolling_Z'] = df['Raw_Z'] - df['Raw_Z'].rolling(window=50, min_periods=1).mean() | |
| if len(df) > 0: | |
| last_z = df['Rolling_Z'].iloc[-1] | |
| last_price = df['Close'].iloc[-1] | |
| status = "WAIT" | |
| color = "gray" | |
| if last_z > 1.8: | |
| status = "BUY SIGNAL" | |
| color = "green" | |
| if GLOBAL_STATE["last_trade"] != "BUY": | |
| send_ntfy(f"BUY EURUSD | Z: {last_z:.2f} | Price: {last_price}") | |
| GLOBAL_STATE["last_trade"] = "BUY" | |
| elif last_z < -1.8: | |
| status = "SELL SIGNAL" | |
| color = "red" | |
| if GLOBAL_STATE["last_trade"] != "SELL": | |
| send_ntfy(f"SELL EURUSD | Z: {last_z:.2f} | Price: {last_price}") | |
| GLOBAL_STATE["last_trade"] = "SELL" | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Price', line=dict(color='rgba(255, 255, 255, 0.5)'))) | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Upper'], mode='lines', line=dict(width=0), showlegend=False)) | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Lower'], mode='lines', line=dict(width=0), fill='tonexty', fillcolor='rgba(0, 255, 255, 0.1)', name='Liquidity Cloud')) | |
| fig.add_trace(go.Scatter(x=df.index, y=df['Pred'], mode='lines', name='AI Path', line=dict(color='#00ffff', width=2))) | |
| fig.update_layout(template="plotly_dark", title=f"Holographic FX (V3 Returns): {status} (Z: {last_z:.2f})", height=600) | |
| info_html = f"""<div style="text-align: center; padding: 10px; background-color: {color}; color: white;"><h3>{status}</h3><p>Z: {last_z:.3f} | Price: {last_price}</p></div>""" | |
| return fig, info_html, "\n".join(log_buffer) | |
| else: | |
| return None, "No Data", "Wait..." | |
| # --- 5. UI --- | |
| with gr.Blocks(title="Holographic FX Core", theme=gr.themes.Monochrome()) as app: | |
| gr.Markdown("# 👁️ Holographic Liquidity Regime (V3 Stationary)") | |
| with gr.Row(): | |
| refresh_btn = gr.Button("🔄 Refresh", variant="primary") | |
| reset_btn = gr.Button("⚠️ HARD RESET", variant="stop") | |
| with gr.Row(): status_box = gr.HTML() | |
| plot = gr.Plot() | |
| logs = gr.Textbox(label="Logs") | |
| refresh_btn.click(fn=run_analysis, outputs=[plot, status_box, logs]) | |
| reset_btn.click(fn=hard_reset, outputs=[plot, status_box, logs]) | |
| app.load(fn=run_analysis, outputs=[plot, status_box, logs]) | |
| if __name__ == "__main__": | |
| app.launch() |