HMDN / app.py
Badumetsibb's picture
Update app.py
25ce4de verified
import gradio as gr
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import requests
import os
import plotly.graph_objects as go
from sklearn.preprocessing import StandardScaler
# --- 1. CONFIG & SECRETS ---
API_KEY = os.getenv("TWELVEDATA_KEY")
NTFY_TOPIC = os.getenv("NTFY_TOPIC")
PAIR = "EUR/USD"
TIMEFRAME = "15min"
LOOKBACK = 30
# Global State
GLOBAL_STATE = {
"model": None,
"last_trade": None,
"scaler": None, # Saved scaler for returns
"is_trained": False
}
# --- 2. THE MODEL ARCHITECTURE (GRU V2) ---
class MacroMDN(nn.Module):
def __init__(self, input_dim, hidden_dim=64, num_gaussians=3):
super(MacroMDN, self).__init__()
self.ln_in = nn.LayerNorm(input_dim)
self.gru = nn.GRU(input_dim, hidden_dim, num_layers=1, batch_first=True, dropout=0.1)
self.ln_out = nn.LayerNorm(hidden_dim)
self.attention = nn.Linear(hidden_dim, 1)
self.z_pi = nn.Linear(hidden_dim, num_gaussians)
self.z_sigma = nn.Linear(hidden_dim, num_gaussians)
self.z_mu = nn.Linear(hidden_dim, num_gaussians)
nn.init.constant_(self.z_sigma.bias, -2.0) # Lower start variance
def forward(self, x):
x = self.ln_in(x)
gru_out, _ = self.gru(x)
gru_out = self.ln_out(gru_out)
attn_weights = F.softmax(self.attention(gru_out), dim=1)
context = torch.sum(attn_weights * gru_out, dim=1)
pi = F.softmax(self.z_pi(context), dim=1)
sigma = F.softplus(self.z_sigma(context)) + 1e-6
mu = self.z_mu(context)
return pi, sigma, mu
def mdn_loss(pi, sigma, mu, y):
if y.dim() == 1: y = y.unsqueeze(1)
dist = torch.distributions.Normal(loc=mu, scale=sigma)
log_prob = dist.log_prob(y)
loss = -torch.logsumexp(torch.log(pi + 1e-8) + log_prob, dim=1)
return torch.mean(loss)
# --- 3. DATA PIPELINE (STATIONARY MODE) ---
def get_forex_data():
if not API_KEY: return None, "❌ Error: TWELVEDATA_KEY missing."
url = f"https://api.twelvedata.com/time_series?symbol={PAIR}&interval={TIMEFRAME}&outputsize=500&apikey={API_KEY}"
try:
r = requests.get(url).json()
if 'values' not in r: return None, f"❌ API Error: {r.get('message', 'Unknown')}"
df = pd.DataFrame(r['values'])
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').set_index('datetime')
df = df[['open', 'high', 'low', 'close']].astype(float)
return df, "✅ Data Fetched"
except Exception as e: return None, str(e)
def get_events_data():
try:
url = "https://nfs.faireconomy.media/ff_calendar_thisweek.json"
r = requests.get(url, headers={"User-Agent": "V23/1.0"}, timeout=5)
data = r.json()
parsed = []
impact_map = {'Low': 1, 'Medium': 2, 'High': 3}
for i in data:
if i.get('country') in ['EUR', 'USD']:
dt = pd.to_datetime(i.get('date'), utc=True).tz_localize(None)
imp = impact_map.get(i.get('impact'), 0)
parsed.append({'DateTime': dt, 'Impact_Score': imp})
df = pd.DataFrame(parsed)
if not df.empty: df = df.sort_values('DateTime').set_index('DateTime')
return df
except: return pd.DataFrame()
def prepare_tensors(price_df, event_df):
# 1. Merge Events
if not event_df.empty:
merged = pd.merge_asof(price_df, event_df, left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta('4 hours')).fillna(0)
else:
merged = price_df.copy()
merged['Impact_Score'] = 0
merged['Surprise'] = 0.0
# 2. CALCULATE RETURNS (STATIONARITY FIX)
# Instead of raw price, we use % change. This kills the "Anchoring Bias".
merged['ret_close'] = merged['close'].pct_change().fillna(0)
merged['ret_open'] = merged['open'].pct_change().fillna(0)
merged['ret_high'] = merged['high'].pct_change().fillna(0)
merged['ret_low'] = merged['low'].pct_change().fillna(0)
# 3. Scale Returns (They are small, so we multiply or scale)
scaler = StandardScaler()
feature_cols = ['ret_open', 'ret_high', 'ret_low', 'ret_close', 'Surprise', 'Impact_Score']
# Fit transform
data_scaled = scaler.fit_transform(merged[feature_cols].values)
# 4. Sliding Window
X_data = []
for i in range(LOOKBACK, len(data_scaled)):
X_data.append(data_scaled[i-LOOKBACK:i])
X_tensor = torch.FloatTensor(np.array(X_data))
# Return extra info to reconstruct price later
# We need the 'close' prices corresponding to the end of each window to calculate the next step
reference_prices = merged['close'].values[LOOKBACK:]
# We also need the scaler stats for the 'ret_close' column (index 3) to unscale predictions
ret_mean = scaler.mean_[3]
ret_scale = scaler.scale_[3]
return X_tensor, merged.index[LOOKBACK:], reference_prices, ret_mean, ret_scale
# --- 4. CORE LOGIC ---
def send_ntfy(message):
if not NTFY_TOPIC: return
try:
requests.post(f"https://ntfy.sh/{NTFY_TOPIC}", data=message.encode('utf-8'), headers={"Title": "Holographic AI", "Priority": "high"})
except: pass
def hard_reset():
GLOBAL_STATE["model"] = None
GLOBAL_STATE["is_trained"] = False
return None, "<div>♻️ RESET DONE. Click Refresh.</div>", "Reset."
def run_analysis():
log_buffer = []
# Init Model
if GLOBAL_STATE["model"] is None:
GLOBAL_STATE["model"] = MacroMDN(input_dim=6, hidden_dim=64, num_gaussians=3)
log_buffer.append("🧠 Model Initialized (V3 Relative)")
model = GLOBAL_STATE["model"]
# Get Data
price_df, msg = get_forex_data()
if price_df is None: return None, msg, msg
event_df = get_events_data()
# PREPARE DATA (RETURNS MODE)
X_tensor, dates, ref_prices, ret_mean, ret_std = prepare_tensors(price_df, event_df)
# --- CALIBRATION (Training on RETURNS) ---
if not GLOBAL_STATE["is_trained"]:
log_buffer.append("⚙️ Calibrating on Volatility...")
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
model.train()
# Target: The NEXT return (shift -1)
# X[t] uses history to predict Return[t+1]
# We align X and y
train_X = X_tensor[:-1]
# Calculate actual returns from reference prices for y
# ref_prices[i] is Close at time t.
# We want (Close[t+1] - Close[t]) / Close[t] -> This is embedded in the next step's scaled data
# Simpler: Just use the scaled 'ret_close' from the X tensor generation?
# No, let's recalculate from prices to be safe and explicit.
actual_returns = np.diff(ref_prices) / ref_prices[:-1]
# Normalize target same as input was normalized
actual_returns_scaled = (actual_returns - ret_mean) / ret_std
train_y = torch.FloatTensor(actual_returns_scaled).unsqueeze(1)
# Truncate X to match y length (y is 1 shorter due to diff)
train_X = train_X[:len(train_y)]
dataset = torch.utils.data.TensorDataset(train_X, train_y)
loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
for epoch in range(50):
for batch_X, batch_y in loader:
optimizer.zero_grad()
pi, sigma, mu = model(batch_X)
loss = mdn_loss(pi, sigma, mu, batch_y)
loss.backward()
optimizer.step()
GLOBAL_STATE["is_trained"] = True
log_buffer.append("✅ Calibration Complete.")
# --- INFERENCE ---
model.eval()
with torch.no_grad():
pi, sigma, mu = model(X_tensor)
max_idx = torch.argmax(pi, dim=1)
pred_mu = mu[torch.arange(len(mu)), max_idx].numpy()
pred_sigma = sigma[torch.arange(len(sigma)), max_idx].numpy()
# --- RECONSTRUCTION (The Magic) ---
# 1. Unscale the predicted RETURN
pred_ret = (pred_mu * ret_std) + ret_mean
pred_ret_sigma = pred_sigma * ret_std
# 2. Apply Return to Previous Price to get Target Price
# Pred_Price[t] = Price[t-1] * (1 + Pred_Ret[t])
# We shift ref_prices to match alignment.
# X_tensor[i] is data up to time t. It predicts t+1.
# So we apply prediction to ref_prices[i] (Price at time t).
prev_prices = ref_prices # Price at end of window
pred_prices = prev_prices * (1 + pred_ret)
# Calculate Cloud Levels based on Price
# Sigma is in % terms. So 2*Sigma is e.g. 0.002 (0.2%)
# Upper = Price * (1 + Mean + 2*Sigma)
upper_band = prev_prices * (1 + pred_ret + (2 * pred_ret_sigma))
lower_band = prev_prices * (1 + pred_ret - (2 * pred_ret_sigma))
# DataFrame for Plotting
# We align dates. pred_prices[i] is prediction for dates[i] + 1 step?
# Actually, ref_prices[i] corresponds to dates[i].
# The prediction is for the NEXT candle.
# For plotting, let's visualize the "Fitted Path" vs Actual.
# To align visually: Predicted Price at time T should constitute the "Model's belief for time T"
# generated at T-1.
# Shift for alignment:
# pred_prices[i] is the prediction made using data UP TO i. So it matches i+1.
# We plot pred_prices[:-1] vs ref_prices[1:]
plot_dates = dates[1:]
plot_actual = ref_prices[1:]
plot_pred = pred_prices[:-1]
plot_upper = upper_band[:-1]
plot_lower = lower_band[:-1]
plot_sigma = pred_ret_sigma[:-1]
df = pd.DataFrame({
'Close': plot_actual,
'Pred': plot_pred,
'Upper': plot_upper,
'Lower': plot_lower,
'Sigma': plot_sigma
}, index=plot_dates)
# Z-Score (Now Calculated on PRICE GAP)
df['Gap'] = df['Pred'] - df['Close']
# Convert Sigma to Price terms for Z calc: Price * Sigma_Pct
df['Price_Sigma'] = df['Close'] * df['Sigma']
df['Raw_Z'] = df['Gap'] / (df['Price_Sigma'] + 1e-9)
df['Rolling_Z'] = df['Raw_Z'] - df['Raw_Z'].rolling(window=50, min_periods=1).mean()
if len(df) > 0:
last_z = df['Rolling_Z'].iloc[-1]
last_price = df['Close'].iloc[-1]
status = "WAIT"
color = "gray"
if last_z > 1.8:
status = "BUY SIGNAL"
color = "green"
if GLOBAL_STATE["last_trade"] != "BUY":
send_ntfy(f"BUY EURUSD | Z: {last_z:.2f} | Price: {last_price}")
GLOBAL_STATE["last_trade"] = "BUY"
elif last_z < -1.8:
status = "SELL SIGNAL"
color = "red"
if GLOBAL_STATE["last_trade"] != "SELL":
send_ntfy(f"SELL EURUSD | Z: {last_z:.2f} | Price: {last_price}")
GLOBAL_STATE["last_trade"] = "SELL"
fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['Close'], mode='lines', name='Price', line=dict(color='rgba(255, 255, 255, 0.5)')))
fig.add_trace(go.Scatter(x=df.index, y=df['Upper'], mode='lines', line=dict(width=0), showlegend=False))
fig.add_trace(go.Scatter(x=df.index, y=df['Lower'], mode='lines', line=dict(width=0), fill='tonexty', fillcolor='rgba(0, 255, 255, 0.1)', name='Liquidity Cloud'))
fig.add_trace(go.Scatter(x=df.index, y=df['Pred'], mode='lines', name='AI Path', line=dict(color='#00ffff', width=2)))
fig.update_layout(template="plotly_dark", title=f"Holographic FX (V3 Returns): {status} (Z: {last_z:.2f})", height=600)
info_html = f"""<div style="text-align: center; padding: 10px; background-color: {color}; color: white;"><h3>{status}</h3><p>Z: {last_z:.3f} | Price: {last_price}</p></div>"""
return fig, info_html, "\n".join(log_buffer)
else:
return None, "No Data", "Wait..."
# --- 5. UI ---
with gr.Blocks(title="Holographic FX Core", theme=gr.themes.Monochrome()) as app:
gr.Markdown("# 👁️ Holographic Liquidity Regime (V3 Stationary)")
with gr.Row():
refresh_btn = gr.Button("🔄 Refresh", variant="primary")
reset_btn = gr.Button("⚠️ HARD RESET", variant="stop")
with gr.Row(): status_box = gr.HTML()
plot = gr.Plot()
logs = gr.Textbox(label="Logs")
refresh_btn.click(fn=run_analysis, outputs=[plot, status_box, logs])
reset_btn.click(fn=hard_reset, outputs=[plot, status_box, logs])
app.load(fn=run_analysis, outputs=[plot, status_box, logs])
if __name__ == "__main__":
app.launch()