auroraLLm / app.py
Badumetsibb's picture
Update app.py
d6e3fad verified
# app.py (Version FINAL - All Bugs Fixed)
# This is the complete, final, and fully-functional code, built on a stable Keras 2 architecture.
# --- Core Libraries & Setup ---
import pandas as pd
import numpy as np
import warnings
import joblib
import json
import os
import gradio as gr
import requests
import time
from datetime import datetime
import pytz
import threading
import math
from huggingface_hub import hf_hub_download
import firebase_admin
from firebase_admin import credentials, db
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from groq import Groq
# --- Environment Configuration ---
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Suppress verbose logs
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')
warnings.filterwarnings("ignore", category=FutureWarning)
# --- ML/DL & NLP Libraries (STABLE TENSORFLOW.KERAS) ---
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import Sequence
from transformers import BertTokenizer, TFBertModel
# --- Live Data & Cache Configuration ---
from twelvedata import TDClient
EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json"
CACHE_DURATION_SECONDS = 600
_EVENT_CACHE = {"data": None, "timestamp": 0}
# ==============================================================================
# --- V17 EVOLUTION ENGINE ---
# ==============================================================================
class ConceptMiner:
def __init__(self, log_path='signals_v17', min_trades_for_analysis=20):
self.log_path = log_path; self.min_trades = min_trades_for_analysis; self.analyzed_trade_keys = set()
def run_analysis(self):
print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing performance from '{self.log_path}'.")
ref = db.reference(self.log_path); all_data = ref.get()
if not all_data: print("EVOLUTION_ENGINE: [ConceptMiner] No trade logs found yet."); return []
new_trades = {k: v for k, v in all_data.items() if v.get('reward') is not None and k not in self.analyzed_trade_keys}
if len(new_trades) < self.min_trades: print(f"EVOLUTION_ENGINE: [ConceptMiner] Not enough new trades ({len(new_trades)}/{self.min_trades})."); return []
print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing {len(new_trades)} new trades.")
df = pd.DataFrame.from_dict(new_trades, orient='index')
context_df = pd.DataFrame(df['context_vector'].tolist(), index=df.index)
feature_names = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day', 'regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING']
context_df.columns = feature_names[:len(context_df.columns)]
full_df = pd.concat([df[['reward', 'strategy']], context_df], axis=1)
features = full_df.drop(columns=['reward', 'strategy']).values
scaled_features = StandardScaler().fit_transform(features)
dbscan = DBSCAN(eps=1.0, min_samples=4); clusters = dbscan.fit_predict(scaled_features)
full_df['cluster'] = clusters
significant_clusters = full_df[full_df['cluster'] != -1]
if significant_clusters.empty: self.analyzed_trade_keys.update(new_trades.keys()); return []
losing_clusters = significant_clusters.groupby('cluster')['reward'].mean()
losing_clusters = losing_clusters[losing_clusters < -0.25]
patterns = []
for cid in losing_clusters.index:
c_data = significant_clusters[significant_clusters['cluster'] == cid]
avg_ctx = c_data.drop(columns=['reward', 'strategy', 'cluster']).mean()
patterns.append({"source_cluster_id": int(cid), "trade_count": len(c_data), "average_reward": c_data['reward'].mean(), "dominant_strategy": c_data['strategy'].mode()[0], "dominant_regime": avg_ctx.filter(like='regime').idxmax().split('_')[1], "avg_market_conditions": avg_ctx.to_dict()})
print(f"EVOLUTION_ENGINE: [ConceptMiner] πŸ”₯ Discovered new weakness (Cluster #{cid}).")
self.analyzed_trade_keys.update(new_trades.keys()); return patterns
class ConceptNamer:
def __init__(self, api_key): self.client = Groq(api_key=api_key)
def name_new_concept(self, pattern):
print(f"EVOLUTION_ENGINE: [ConceptNamer] Naming weakness from Cluster #{pattern['source_cluster_id']}...")
system_prompt = "You are an expert market strategist AI. Your job is to analyze a data pattern representing a trading agent's weakness and turn it into a memorable, actionable 'Concept'. The output must be a clean JSON object and nothing else."
atr_threshold = round(pattern['avg_market_conditions']['ATR'] + 0.1, 2)
human_prompt = f"""
Analyze this data pattern where a trading agent consistently loses money. Create a new 'Concept' for it.
**Pattern Fingerprint:**
- **Dominant Market Regime:** {pattern['dominant_regime']}
- **The Strategy That Fails Here:** {pattern['dominant_strategy']}
- **Average Reward (from -1 to 1):** {pattern['average_reward']:.2f}
- **Key Market Conditions (scaled values):**
- Volatility (ATR): {pattern['avg_market_conditions']['ATR']:.4f}
**Your Task:**
Generate a JSON object with the following structure.
{{
"concept_name": "A short, memorable name for this market trap (e.g., 'Momentum Mirage', 'Volatility Void').",
"description_agent": "A concise, one-sentence explanation of this trap for the agent's logs.",
"description_human": "A human-readable explanation of why this is a trap for the agent.",
"actionable_rule": {{
"type": "veto_trade",
"conditions": [
{{"variable": "regime", "operator": "==", "value": "{pattern['dominant_regime']}"}},
{{"variable": "ATR", "operator": "<", "value": {atr_threshold} }}
],
"target_strategy": "{pattern['dominant_strategy']}"
}}
}}
"""
try:
chat_completion = self.client.chat.completions.create(messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": human_prompt}], model="llama3-8b-8192", temperature=0.6)
response_text = chat_completion.choices[0].message.content
if response_text.startswith("```json"): response_text = response_text[7:-3].strip()
return json.loads(response_text)
except Exception as e: print(f"EVOLUTION_ENGINE: [ConceptNamer] ❌ Groq API Error: {e}"); return None
class ConceptVetoSystem:
def __init__(self): self.concepts = {}; self.lock = threading.Lock(); self.load_concepts_from_firebase()
def load_concepts_from_firebase(self):
with self.lock:
print("--- V17: (Re)Loading concepts from Firebase... ---")
try:
if not firebase_admin._apps: return
ref = db.reference('concepts/'); concepts_data = ref.get()
if concepts_data: self.concepts = concepts_data; print(f"βœ… V17: Loaded {len(self.concepts)} concepts.")
else: print("🟑 V17: No concepts found in Firebase yet.")
except Exception as e: print(f"❌ V17: ERROR - Could not load concepts: {e}")
def check_for_veto(self, current_context, chosen_strategy_name):
with self.lock:
for concept_id, concept in self.concepts.items():
rule = concept.get('actionable_rule', {})
if rule.get('type') != 'veto_trade' or rule.get('target_strategy') != chosen_strategy_name: continue
conditions_met = True
for condition in rule.get('conditions', []):
variable, operator, value = condition['variable'], condition['operator'], condition['value']
context_value = current_context.get(variable)
if context_value is None: conditions_met = False; break
if operator == '==' and not context_value == value: conditions_met = False; break
if operator == '<' and not context_value < value: conditions_met = False; break
if conditions_met: print(f"🚨 V17 VETO: Conditions for '{concept.get('concept_name')}' met."); return True, concept.get('concept_name')
return False, None
# ==============================================================================
# --- CORE AGENT LOGIC (FIXED AND READABLE) ---
# ==============================================================================
class CausalReasoningNetwork:
def __init__(self, processed_data):
self.data = processed_data.copy()
def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'):
atr = self.data[volatility_indicator]
low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66)
ema_slope = self.data[trend_indicator].diff(periods=3)
regimes = []
for i in range(len(self.data)):
atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0
if atr_val > high_vol_threshold: regimes.append('TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT')
elif atr_val < low_vol_threshold: regimes.append('RANGING')
else: regimes.append('CHOPPY')
self.data['regime'] = regimes
return self.data
class RuleBasedSituationRoom:
def __init__(self, params):
self.params = params
def generate_thesis(self, predictions, sequence_df):
latest_data = sequence_df.iloc[-1]; current_price = latest_data['close']
if not predictions or any(k not in predictions for k in ['5m', '15m', '1h']):
action = "BUY" if current_price > latest_data['EMA_20'] else "SELL"
confidence, strategy, reasoning = "LOW", "Trend Following", f"Simple EMA Crossover ({action})."
else:
dir_5m = "BUY" if predictions['5m'] > current_price else "SELL"
dir_15m = "BUY" if predictions['15m'] > current_price else "SELL"
dir_1h = "BUY" if predictions['1h'] > current_price else "SELL"
if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following"
elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp"
elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following"
else: action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play"
if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning}
atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001
sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0)
if confidence == "MEDIUM": tp_mult *= 0.75
elif confidence == "LOW": tp_mult *= 0.5
if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr)
else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr)
return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy}
class MarketRegimeFilter:
def __init__(self):
self.allowed_strategies = {'TRENDING': ['Trend Following'], 'BREAKOUT': ['Trend Following', 'Scalp'], 'CHOPPY': ['Scalp'], 'RANGING': []}
def should_trade(self, current_regime, trade_thesis):
if trade_thesis['action'] == 'NO_TRADE': return False
return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, [])
class PredictionCoreTransformer:
def __init__(self, sequence_length=48):
self.scaler, self.model, self.sequence_length, self.feature_names = None, None, sequence_length, None
self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path = 'calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json'
def is_calibrated(self):
return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path])
def load_calibrated_artifacts(self):
print("--- Loading pre-calibrated artifacts ---")
self.model = load_model(self.calibrated_model_path)
self.scaler = joblib.load(self.calibrated_scaler_path)
with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f)
print("--- Instant startup successful ---"); return True
def calibrate(self, base_model_path, calibration_data):
print("--- STARTING ONE-TIME AGENT CALIBRATION ---")
self.feature_names = [col for col in calibration_data.columns if col not in ['regime','close','open','high','low','event','impact','title','detail','country','date','currency','actual','forecast','previous']]
self.scaler = MinMaxScaler(feature_range=(0,1)); self.scaler.fit(calibration_data[self.feature_names])
self.model = load_model(base_model_path); print("Pre-trained model loaded.")
class CalibrationGenerator(Sequence):
def __init__(self, data, scaler, feature_names, seq_len):
self.data, self.scaler, self.feature_names, self.seq_len = data.copy(), scaler, feature_names, seq_len
self.data['target_5m']=self.data['open'].shift(-1); self.data['target_15m']=self.data['open'].shift(-3); self.data['target_1h']=self.data['open'].shift(-12); self.data.dropna(inplace=True)
self.features_df, self.targets_df = self.data[self.feature_names], self.data[['target_5m', 'target_15m', 'target_1h']]
self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len
def __len__(self): return self.n_samples
def __getitem__(self, idx):
seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names))
y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']]
return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])}
calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length)
self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output':'mean_squared_error', '15m_output':'mean_squared_error', '1h_output':'mean_squared_error'})
self.model.fit(calibration_generator, epochs=3, verbose=1)
self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path)
with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f)
print("--- AGENT RE-CALIBRATION COMPLETE ---"); return True
def predict_single(self, input_sequence):
if not self.scaler: raise RuntimeError("Agent not calibrated.")
features = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features)
reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names))
predictions = self.model.predict(reshaped, verbose=0)
pred_5m = predictions['5m_output']
pred_15m = predictions['15m_output']
pred_1h = predictions['1h_output']
preds_dict = {"5m": pred_5m, "15m": pred_15m, "1h": pred_1h}
preds_str = f"5m: {pred_5m:.5f} | 15m: {pred_15m:.5f} | 1h: {pred_1h:.5f}"
return preds_dict, preds_str
class LinUCBBandit:
def __init__(self, strategies, d, alpha=1.0, regularization=1.0):
self.strategies, self.d, self.alpha, self.reg = list(strategies), d, alpha, regularization
self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies}; self.b = {s: np.zeros(self.d) for s in self.strategies}
def select(self, context_vector):
scores = {}
for s in self.strategies:
A_inv = np.linalg.inv(self.A[s]); theta = A_inv.dot(self.b[s])
mean_reward = theta.dot(context_vector); uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector))
scores[s] = mean_reward + uncertainty
return max(scores, key=scores.get)
def update(self, strategy, context_vector, reward):
x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x
def increase_exploration(self):
self.alpha *= 2.0
print(f"Bandit: Increased exploration alpha to {self.alpha}")
class RTDBLoggerV2:
def __init__(self, db_ref_name='signals_v2'):
self.ref = None
try:
sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL')
cred = credentials.Certificate(json.loads(sa_key_json))
if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url})
self.ref = db.reference(db_ref_name)
except Exception as e: print(f"RTDB Logger Failed: {e}")
def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector):
if self.ref: self.ref.push({"timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry), "stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()], "pnl": None, "reward": None, "outcome_reason": None})
class PageHinkley:
def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3):
self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0
def update(self, x):
self.mean = self.mean * self.alpha + x * (1 - self.alpha); self.cumulative = max(0, self.cumulative + x - self.mean - self.delta)
if self.cumulative > self.lambda_: self.cumulative = 0; return True
return False
class StrategyManager:
def __init__(self, situation_room, prediction_engine):
self.situation_room, self.prediction_engine = situation_room, prediction_engine
def list_strategies(self):
def predictive_strategy(seq):
preds, s = self.prediction_engine.predict_single(seq)
return self.situation_room.generate_thesis(preds, seq), s
def ema_crossover_strategy(seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA)"
return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy}
CONTEXT_FEATURES=['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day']; REGIME_COLS=['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING']; CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS)
class ContextVectorPreprocessor:
def __init__(self):
self.scaler = MinMaxScaler(feature_range=(-1, 1)); self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib'
def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path)
def load_calibrated_scaler(self): self.scaler = joblib.load(self.calibrated_context_scaler_path); return True
def calibrate(self, data): self.scaler.fit(data[CONTEXT_FEATURES]); joblib.dump(self.scaler, self.calibrated_context_scaler_path)
def build_context_vector(self, df):
df_copy = df.copy(); df_copy[CONTEXT_FEATURES] = df_copy[CONTEXT_FEATURES].astype(float)
last_row = df_copy[CONTEXT_FEATURES].iloc[-1:].values; scaled_vec = self.scaler.transform(last_row).flatten()
last_regime = df_copy.iloc[-1]['regime']; regime_vec = np.zeros(len(REGIME_COLS))
try: regime_vec[REGIME_COLS.index(f"regime_{last_regime}")] = 1
except ValueError: pass
return np.concatenate([scaled_vec, regime_vec])
class LiveDataStore:
def __init__(self, api_key, tokenizer, model, interval=120):
self.api_key, self.tokenizer, self.model, self.interval = api_key, tokenizer, model, interval
self._data_lock = threading.Lock(); self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "N/A", "time_str": "N/A"}
self._stop = threading.Event(); self._thread = None
def _fetch_and_update(self):
try:
price_data = fetch_twelvedata_prices(self.api_key, output_size=500)
if not price_data.empty:
features, next_event = create_feature_set_for_inference(price_data, fetch_live_events_with_cache(), self.tokenizer, self.model)
with self._data_lock: self._latest_features, self._next_event_info = features, next_event
except Exception as e: print(f"LiveDataStore Error: {e}")
def _update_loop(self):
while not self._stop.is_set(): self._fetch_and_update(); self._stop.wait(self.interval)
def start(self):
self._stop.clear(); self._thread = threading.Thread(target=self._update_loop, daemon=True); self._thread.start(); self._fetch_and_update()
def get_latest_data(self, num_bars=None):
with self._data_lock: return (self._latest_features.iloc[-num_bars:].copy() if num_bars and not self._latest_features.empty else self._latest_features.copy()), self._next_event_info
def get_raw_price_data(self, num_bars=None):
with self._data_lock:
if self._latest_features.empty: return pd.DataFrame()
df = self._latest_features[['open', 'high', 'low', 'close']].copy()
if num_bars: df = df.iloc[-num_bars:]
df.reset_index(inplace=True); df['Datetime'] = df['Datetime'].dt.tz_localize('UTC') if df['Datetime'].dt.tz is None else df['Datetime'].dt.tz_convert('UTC'); return df
def fetch_live_events_with_cache():
if _EVENT_CACHE.get("data") and (time.time() - _EVENT_CACHE.get("timestamp", 0) < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"]
try:
r = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V17-Agent/1.0"}, timeout=10); r.raise_for_status(); data = r.json()
_EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, time.time(); return data
except requests.RequestException: return _EVENT_CACHE.get("data", [])
def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200):
try:
ts = TDClient(apikey=api_key).time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC")
df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df
except Exception: return pd.DataFrame()
def create_feature_set_for_inference(price_df, events_json, tokenizer, model):
price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True)
price_features = price_features.tz_localize('UTC') if price_features.index.tz is None else price_features.tz_convert('UTC')
price_features.rename(columns={'close': 'Price', 'open': 'Open', 'high': 'High', 'low': 'Low'}, inplace=True); delta = price_features['Price'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean()
high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift())
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean()
price_features.rename(columns={'Price': 'close', 'Open': 'open', 'High': 'high', 'Low': 'low'}, inplace=True); events = pd.DataFrame(events_json)
next_event_info = {"title": "None within 24h", "time_str": "N/A"}; processed_events = pd.DataFrame()
if not events.empty and 'date' in events.columns:
events = events[events['country'].isin(['USD', 'EUR'])].copy(); events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True)
inputs = tokenizer(events['title'].fillna('').tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = model(inputs).last_hidden_state[:, 0, :].numpy()
# FIXED BUG #1: Use embeddings.shape
processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape)], index=events.index)], axis=1)
merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60))
all_high_impact = events[events['impact'] == 'High'] if 'impact' in events.columns and not events.empty else pd.DataFrame()
if not all_high_impact.empty:
upcoming = all_high_impact[all_high_impact.index > merged_data.index[-1]]
if not upcoming.empty:
# FIXED BUG #2: Use .iloc to select the first row as a Series
next_event = upcoming.iloc
time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0
merged_data['time_to_event'] = time_to_next
next_event_info = {"title": f"{next_event['country']} {next_event['title']}", "time_str": f"in {time_to_next:.2f}h"}
else: merged_data['time_to_event'] = 9999
# FIXED BUG #3: Convert pandas index to numpy array before multi-dimensional indexing
df_idx = merged_data.index.to_numpy().astype(np.int64) // 10**9
evt_times = all_high_impact.index.to_numpy().astype(np.int64) // 10**9
time_diffs = df_idx[:, None] - evt_times[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600
else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999
merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek
finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; merged_data[finbert_cols] = merged_data[finbert_cols].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True); return merged_data, next_event_info
def evaluate_pending_signals_v2(perf_logger, bandit, change_detector, live_data_store):
if not perf_logger.ref: return; now_utc = pd.Timestamp.now(tz='UTC')
try:
all_signals = perf_logger.ref.get()
if not all_signals: return; live_prices = live_data_store.get_raw_price_data(num_bars=288)
if live_prices.empty: return
for k, s in all_signals.items():
if s.get('reward') is not None: continue; sig_time = pd.to_datetime(s['timestamp'])
if now_utc < (sig_time + pd.Timedelta(minutes=5)): continue
entry, sl, tp, action = float(s['entry']), float(s['stop_loss']), float(s['take_profit']), s['action']; relevant_bars = live_prices[live_prices['Datetime'] > sig_time]
if relevant_bars.empty: continue; outcome, exit_price = "Time Exit", relevant_bars.iloc[-1]['close']
for _, bar in relevant_bars.iterrows():
if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome = sl, "SL"; break
if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome = tp, "TP"; break
pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price); reward = np.clip(pnl / 0.005, -1.0, 1.0); ctx = np.array(s['context_vector']); bandit.update(s['strategy'], ctx, reward)
if change_detector.update(-reward): bandit.increase_exploration()
perf_logger.ref.child(k).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome})
except Exception as e: print(f"Evaluator Error: {e}")
def send_ntfy_notification(topic, thesis):
if topic:
message = (f"Strategy: {thesis.get('strategy_type')} ({thesis.get('confidence')})\n"
f"Reason: {thesis.get('reasoning')}\n"
f"Entry: {thesis.get('entry')} | SL: {thesis.get('stop_loss')} | TP: {thesis.get('take_profit')}")
headers = {"Title": f"V17 Signal: {thesis.get('action')} EUR/USD"}
try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers=headers)
except requests.RequestException: pass
def download_models_from_hf(repo_id, token):
return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=token)
# ==============================================================================
# --- MAIN WORKER & ORCHESTRATOR ---
# ==============================================================================
def main_worker():
print("--- [Persistent Agent V17 - The Evolution Agent] Worker Thread Started ---")
api_key, hf_token, ntfy_topic, groq_key = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V17'), os.environ.get('GROQ_API_KEY')
if not groq_key: print("FATAL: GROQ_API_KEY not set."); return
tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert')
model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True)
pred_engine = PredictionCoreTransformer()
ctx_preprocessor = ContextVectorPreprocessor()
live_store = LiveDataStore(api_key, tokenizer, model)
live_store.start()
if not pred_engine.is_calibrated() or not ctx_preprocessor.is_calibrated():
print("Calibrating agent for the first time...")
base_model_path = download_models_from_hf("Badumetsibb/conscious-trading-agent-models", hf_token)
calib_prices = fetch_twelvedata_prices(api_key, output_size=5000)
if len(calib_prices) < 500: print("FATAL: Not enough data for calibration."); return
calib_features, _ = create_feature_set_for_inference(calib_prices, fetch_live_events_with_cache(), tokenizer, model)
pred_engine.calibrate(base_model_path, calib_features.copy())
calib_with_regime = CausalReasoningNetwork(calib_features.copy()).identify_volatility_regimes()
ctx_preprocessor.calibrate(calib_with_regime)
else:
pred_engine.load_calibrated_artifacts()
ctx_preprocessor.load_calibrated_scaler()
miner = ConceptMiner(log_path='signals_v17')
namer = ConceptNamer(api_key=groq_key)
veto_system = ConceptVetoSystem()
logger = RTDBLoggerV2(db_ref_name='signals_v17')
sit_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0})
regime_filter = MarketRegimeFilter()
strat_manager = StrategyManager(sit_room, pred_engine)
bandit = LinUCBBandit(strat_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5)
change_detector = PageHinkley()
def evolution_cycle():
time.sleep(1800)
while True:
try:
new_patterns = miner.run_analysis()
if new_patterns:
for p in new_patterns:
if all(c.get('source_cluster_id') != p['source_cluster_id'] for c in veto_system.concepts.values()):
new_concept = namer.name_new_concept(p)
if new_concept:
cid = new_concept['concept_name'].lower().replace(' ', '_') + f"_c{p['source_cluster_id']}"
new_concept['source_cluster_id'] = p['source_cluster_id']
db.reference(f"concepts/{cid}").set(new_concept)
print(f"EVOLUTION_ENGINE: βœ… New concept '{new_concept['concept_name']}' learned!")
veto_system.load_concepts_from_firebase()
except Exception as e: print(f"Evolution Cycle Error: {e}")
time.sleep(7200)
threading.Thread(target=evolution_cycle, daemon=True).start()
print("--- WORKER V17: Initialization Complete. ---")
while True:
try:
features, next_event = live_store.get_latest_data(num_bars=pred_engine.sequence_length)
if features.empty or len(features) < pred_engine.sequence_length:
time.sleep(60); continue
features_with_regime = CausalReasoningNetwork(features.copy()).identify_volatility_regimes()
input_seq = features_with_regime.iloc[-pred_engine.sequence_length:]
latest_ctx = input_seq.iloc[-1]
ctx_vec = ctx_preprocessor.build_context_vector(input_seq)
chosen_strat = bandit.select(ctx_vec)
thesis, preds_str = strat_manager.list_strategies()[chosen_strat](input_seq)
is_vetoed, veto_reason = veto_system.check_for_veto(latest_ctx, chosen_strat)
is_tradeable = regime_filter.should_trade(latest_ctx['regime'], thesis)
final_action, final_reasoning = thesis['action'], f"Bandit chose '{chosen_strat}'. Thesis: '{thesis['reasoning']}'"
if is_vetoed:
final_action, final_reasoning = "NO TRADE", final_reasoning + f" -> ❌ VETOED by Concept: '{veto_reason}'."
elif not is_tradeable and final_action != "NO TRADE":
final_action, final_reasoning = "NO TRADE", final_reasoning + " -> ❌ REJECTED by Regime Filter."
elif final_action != "NO TRADE":
final_reasoning += " -> βœ… EXECUTABLE."
if final_action in ["BUY", "SELL"]:
ts = pd.Timestamp.now(tz='UTC').isoformat()
logger.log_signal(ts, chosen_strat, final_action, thesis['entry'], thesis['stop_loss'], thesis['take_profit'], ctx_vec)
send_ntfy_notification(ntfy_topic, thesis)
evaluate_pending_signals_v2(logger, bandit, change_detector, live_store)
status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{latest_ctx['close']:.5f}", "market_regime": latest_ctx['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event['title']} ({next_event['time_str']})"}
with open('status_v17.json', 'w') as f: json.dump(status, f)
print(f"WORKER V17: Cycle complete. Signal: {final_action}. Sleeping.")
time.sleep(300)
except Exception as e:
print(f"Main Loop Error: {e}"); time.sleep(60)
# ==============================================================================
# --- GRADIO DASHBOARD AND STARTUP ---
# ==============================================================================
def get_latest_status_v17():
if not os.path.exists('status_v17.json'):
return "Initializing...", "N/A", "N/A", "N/A", "Waiting for first cycle.", "N/A", "N/A"
try:
with open('status_v17.json', 'r') as f: s = json.load(f)
return (f"Status at: {s.get('last_checked', 'N/A')}", s.get('market_price', 'N/A'), s.get('market_regime', 'N/A'), s.get('signal', 'N/A'), s.get('reasoning', 'N/A'), s.get('predictions', 'N/A'), s.get('next_event', 'N/A'))
except (json.JSONDecodeError, IOError):
return "Error reading status.", "N/A", "N/A", "N/A", "File may be in use.", "N/A", "N/A"
if __name__ == "__main__":
required_secrets = ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'GROQ_API_KEY', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL', 'NTFY_TOPIC_V17']
if not all(os.environ.get(k) for k in required_secrets):
print(f"FATAL: Missing secrets. Please set: {required_secrets}")
exit()
try:
sa_key = json.loads(os.environ.get('FIRESTORE_SA_KEY'))
cred = credentials.Certificate(sa_key)
if not firebase_admin._apps:
firebase_admin.initialize_app(cred, {'databaseURL': os.environ.get('FIREBASE_DB_URL')})
print("βœ… Firebase connection established.")
except Exception as e:
print(f"FATAL: Firebase init failed: {e}")
exit()
worker_thread = threading.Thread(target=main_worker, daemon=True)
worker_thread.start()
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🧠 V17 Evolution Agent")
gr.Markdown("**Secrets Status:** βœ… All required secrets appear to be set.")
refresh_btn = gr.Button("Refresh Status", variant="primary")
status_output = gr.Textbox(label="Status", interactive=False)
gr.Markdown("## Agent's Last Analysis")
with gr.Row():
price_output = gr.Textbox(label="Last Market Price")
regime_output = gr.Textbox(label="Last Market Regime")
with gr.Row():
predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)")
event_output = gr.Textbox(label="Next High-Impact Event")
action_output = gr.Textbox(label="Final Signal / Action")
reasoning_output = gr.Textbox(label="Full Reasoning", lines=3)
refresh_btn.click(fn=get_latest_status_v17, inputs=[], outputs=[
status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output
])
demo.launch()