# app.py (Version FINAL - All Bugs Fixed) # This is the complete, final, and fully-functional code, built on a stable Keras 2 architecture. # --- Core Libraries & Setup --- import pandas as pd import numpy as np import warnings import joblib import json import os import gradio as gr import requests import time from datetime import datetime import pytz import threading import math from huggingface_hub import hf_hub_download import firebase_admin from firebase_admin import credentials, db from sklearn.preprocessing import StandardScaler from sklearn.cluster import DBSCAN from groq import Groq # --- Environment Configuration --- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Suppress verbose logs warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') warnings.filterwarnings("ignore", category=FutureWarning) # --- ML/DL & NLP Libraries (STABLE TENSORFLOW.KERAS) --- import tensorflow as tf from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Model, load_model from tensorflow.keras.utils import Sequence from transformers import BertTokenizer, TFBertModel # --- Live Data & Cache Configuration --- from twelvedata import TDClient EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" CACHE_DURATION_SECONDS = 600 _EVENT_CACHE = {"data": None, "timestamp": 0} # ============================================================================== # --- V17 EVOLUTION ENGINE --- # ============================================================================== class ConceptMiner: def __init__(self, log_path='signals_v17', min_trades_for_analysis=20): self.log_path = log_path; self.min_trades = min_trades_for_analysis; self.analyzed_trade_keys = set() def run_analysis(self): print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing performance from '{self.log_path}'.") ref = db.reference(self.log_path); all_data = ref.get() if not all_data: print("EVOLUTION_ENGINE: [ConceptMiner] No trade logs found yet."); return [] new_trades = {k: v for k, v in all_data.items() if v.get('reward') is not None and k not in self.analyzed_trade_keys} if len(new_trades) < self.min_trades: print(f"EVOLUTION_ENGINE: [ConceptMiner] Not enough new trades ({len(new_trades)}/{self.min_trades})."); return [] print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing {len(new_trades)} new trades.") df = pd.DataFrame.from_dict(new_trades, orient='index') context_df = pd.DataFrame(df['context_vector'].tolist(), index=df.index) feature_names = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day', 'regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING'] context_df.columns = feature_names[:len(context_df.columns)] full_df = pd.concat([df[['reward', 'strategy']], context_df], axis=1) features = full_df.drop(columns=['reward', 'strategy']).values scaled_features = StandardScaler().fit_transform(features) dbscan = DBSCAN(eps=1.0, min_samples=4); clusters = dbscan.fit_predict(scaled_features) full_df['cluster'] = clusters significant_clusters = full_df[full_df['cluster'] != -1] if significant_clusters.empty: self.analyzed_trade_keys.update(new_trades.keys()); return [] losing_clusters = significant_clusters.groupby('cluster')['reward'].mean() losing_clusters = losing_clusters[losing_clusters < -0.25] patterns = [] for cid in losing_clusters.index: c_data = significant_clusters[significant_clusters['cluster'] == cid] avg_ctx = c_data.drop(columns=['reward', 'strategy', 'cluster']).mean() patterns.append({"source_cluster_id": int(cid), "trade_count": len(c_data), "average_reward": c_data['reward'].mean(), "dominant_strategy": c_data['strategy'].mode()[0], "dominant_regime": avg_ctx.filter(like='regime').idxmax().split('_')[1], "avg_market_conditions": avg_ctx.to_dict()}) print(f"EVOLUTION_ENGINE: [ConceptMiner] 🔥 Discovered new weakness (Cluster #{cid}).") self.analyzed_trade_keys.update(new_trades.keys()); return patterns class ConceptNamer: def __init__(self, api_key): self.client = Groq(api_key=api_key) def name_new_concept(self, pattern): print(f"EVOLUTION_ENGINE: [ConceptNamer] Naming weakness from Cluster #{pattern['source_cluster_id']}...") system_prompt = "You are an expert market strategist AI. Your job is to analyze a data pattern representing a trading agent's weakness and turn it into a memorable, actionable 'Concept'. The output must be a clean JSON object and nothing else." atr_threshold = round(pattern['avg_market_conditions']['ATR'] + 0.1, 2) human_prompt = f""" Analyze this data pattern where a trading agent consistently loses money. Create a new 'Concept' for it. **Pattern Fingerprint:** - **Dominant Market Regime:** {pattern['dominant_regime']} - **The Strategy That Fails Here:** {pattern['dominant_strategy']} - **Average Reward (from -1 to 1):** {pattern['average_reward']:.2f} - **Key Market Conditions (scaled values):** - Volatility (ATR): {pattern['avg_market_conditions']['ATR']:.4f} **Your Task:** Generate a JSON object with the following structure. {{ "concept_name": "A short, memorable name for this market trap (e.g., 'Momentum Mirage', 'Volatility Void').", "description_agent": "A concise, one-sentence explanation of this trap for the agent's logs.", "description_human": "A human-readable explanation of why this is a trap for the agent.", "actionable_rule": {{ "type": "veto_trade", "conditions": [ {{"variable": "regime", "operator": "==", "value": "{pattern['dominant_regime']}"}}, {{"variable": "ATR", "operator": "<", "value": {atr_threshold} }} ], "target_strategy": "{pattern['dominant_strategy']}" }} }} """ try: chat_completion = self.client.chat.completions.create(messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": human_prompt}], model="llama3-8b-8192", temperature=0.6) response_text = chat_completion.choices[0].message.content if response_text.startswith("```json"): response_text = response_text[7:-3].strip() return json.loads(response_text) except Exception as e: print(f"EVOLUTION_ENGINE: [ConceptNamer] ❌ Groq API Error: {e}"); return None class ConceptVetoSystem: def __init__(self): self.concepts = {}; self.lock = threading.Lock(); self.load_concepts_from_firebase() def load_concepts_from_firebase(self): with self.lock: print("--- V17: (Re)Loading concepts from Firebase... ---") try: if not firebase_admin._apps: return ref = db.reference('concepts/'); concepts_data = ref.get() if concepts_data: self.concepts = concepts_data; print(f"✅ V17: Loaded {len(self.concepts)} concepts.") else: print("🟡 V17: No concepts found in Firebase yet.") except Exception as e: print(f"❌ V17: ERROR - Could not load concepts: {e}") def check_for_veto(self, current_context, chosen_strategy_name): with self.lock: for concept_id, concept in self.concepts.items(): rule = concept.get('actionable_rule', {}) if rule.get('type') != 'veto_trade' or rule.get('target_strategy') != chosen_strategy_name: continue conditions_met = True for condition in rule.get('conditions', []): variable, operator, value = condition['variable'], condition['operator'], condition['value'] context_value = current_context.get(variable) if context_value is None: conditions_met = False; break if operator == '==' and not context_value == value: conditions_met = False; break if operator == '<' and not context_value < value: conditions_met = False; break if conditions_met: print(f"🚨 V17 VETO: Conditions for '{concept.get('concept_name')}' met."); return True, concept.get('concept_name') return False, None # ============================================================================== # --- CORE AGENT LOGIC (FIXED AND READABLE) --- # ============================================================================== class CausalReasoningNetwork: def __init__(self, processed_data): self.data = processed_data.copy() def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): atr = self.data[volatility_indicator] low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66) ema_slope = self.data[trend_indicator].diff(periods=3) regimes = [] for i in range(len(self.data)): atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 if atr_val > high_vol_threshold: regimes.append('TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT') elif atr_val < low_vol_threshold: regimes.append('RANGING') else: regimes.append('CHOPPY') self.data['regime'] = regimes return self.data class RuleBasedSituationRoom: def __init__(self, params): self.params = params def generate_thesis(self, predictions, sequence_df): latest_data = sequence_df.iloc[-1]; current_price = latest_data['close'] if not predictions or any(k not in predictions for k in ['5m', '15m', '1h']): action = "BUY" if current_price > latest_data['EMA_20'] else "SELL" confidence, strategy, reasoning = "LOW", "Trend Following", f"Simple EMA Crossover ({action})." else: dir_5m = "BUY" if predictions['5m'] > current_price else "SELL" dir_15m = "BUY" if predictions['15m'] > current_price else "SELL" dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following" elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp" elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following" else: action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play" if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning} atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) if confidence == "MEDIUM": tp_mult *= 0.75 elif confidence == "LOW": tp_mult *= 0.5 if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy} class MarketRegimeFilter: def __init__(self): self.allowed_strategies = {'TRENDING': ['Trend Following'], 'BREAKOUT': ['Trend Following', 'Scalp'], 'CHOPPY': ['Scalp'], 'RANGING': []} def should_trade(self, current_regime, trade_thesis): if trade_thesis['action'] == 'NO_TRADE': return False return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, []) class PredictionCoreTransformer: def __init__(self, sequence_length=48): self.scaler, self.model, self.sequence_length, self.feature_names = None, None, sequence_length, None self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path = 'calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json' def is_calibrated(self): return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) def load_calibrated_artifacts(self): print("--- Loading pre-calibrated artifacts ---") self.model = load_model(self.calibrated_model_path) self.scaler = joblib.load(self.calibrated_scaler_path) with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) print("--- Instant startup successful ---"); return True def calibrate(self, base_model_path, calibration_data): print("--- STARTING ONE-TIME AGENT CALIBRATION ---") self.feature_names = [col for col in calibration_data.columns if col not in ['regime','close','open','high','low','event','impact','title','detail','country','date','currency','actual','forecast','previous']] self.scaler = MinMaxScaler(feature_range=(0,1)); self.scaler.fit(calibration_data[self.feature_names]) self.model = load_model(base_model_path); print("Pre-trained model loaded.") class CalibrationGenerator(Sequence): def __init__(self, data, scaler, feature_names, seq_len): self.data, self.scaler, self.feature_names, self.seq_len = data.copy(), scaler, feature_names, seq_len self.data['target_5m']=self.data['open'].shift(-1); self.data['target_15m']=self.data['open'].shift(-3); self.data['target_1h']=self.data['open'].shift(-12); self.data.dropna(inplace=True) self.features_df, self.targets_df = self.data[self.feature_names], self.data[['target_5m', 'target_15m', 'target_1h']] self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len def __len__(self): return self.n_samples def __getitem__(self, idx): seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names)) y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']] return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])} calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length) self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output':'mean_squared_error', '15m_output':'mean_squared_error', '1h_output':'mean_squared_error'}) self.model.fit(calibration_generator, epochs=3, verbose=1) self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path) with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f) print("--- AGENT RE-CALIBRATION COMPLETE ---"); return True def predict_single(self, input_sequence): if not self.scaler: raise RuntimeError("Agent not calibrated.") features = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features) reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)) predictions = self.model.predict(reshaped, verbose=0) pred_5m = predictions['5m_output'] pred_15m = predictions['15m_output'] pred_1h = predictions['1h_output'] preds_dict = {"5m": pred_5m, "15m": pred_15m, "1h": pred_1h} preds_str = f"5m: {pred_5m:.5f} | 15m: {pred_15m:.5f} | 1h: {pred_1h:.5f}" return preds_dict, preds_str class LinUCBBandit: def __init__(self, strategies, d, alpha=1.0, regularization=1.0): self.strategies, self.d, self.alpha, self.reg = list(strategies), d, alpha, regularization self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies}; self.b = {s: np.zeros(self.d) for s in self.strategies} def select(self, context_vector): scores = {} for s in self.strategies: A_inv = np.linalg.inv(self.A[s]); theta = A_inv.dot(self.b[s]) mean_reward = theta.dot(context_vector); uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector)) scores[s] = mean_reward + uncertainty return max(scores, key=scores.get) def update(self, strategy, context_vector, reward): x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x def increase_exploration(self): self.alpha *= 2.0 print(f"Bandit: Increased exploration alpha to {self.alpha}") class RTDBLoggerV2: def __init__(self, db_ref_name='signals_v2'): self.ref = None try: sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') cred = credentials.Certificate(json.loads(sa_key_json)) if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url}) self.ref = db.reference(db_ref_name) except Exception as e: print(f"RTDB Logger Failed: {e}") def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector): if self.ref: self.ref.push({"timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry), "stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()], "pnl": None, "reward": None, "outcome_reason": None}) class PageHinkley: def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3): self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0 def update(self, x): self.mean = self.mean * self.alpha + x * (1 - self.alpha); self.cumulative = max(0, self.cumulative + x - self.mean - self.delta) if self.cumulative > self.lambda_: self.cumulative = 0; return True return False class StrategyManager: def __init__(self, situation_room, prediction_engine): self.situation_room, self.prediction_engine = situation_room, prediction_engine def list_strategies(self): def predictive_strategy(seq): preds, s = self.prediction_engine.predict_single(seq) return self.situation_room.generate_thesis(preds, seq), s def ema_crossover_strategy(seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA)" return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy} CONTEXT_FEATURES=['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day']; REGIME_COLS=['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING']; CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS) class ContextVectorPreprocessor: def __init__(self): self.scaler = MinMaxScaler(feature_range=(-1, 1)); self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib' def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path) def load_calibrated_scaler(self): self.scaler = joblib.load(self.calibrated_context_scaler_path); return True def calibrate(self, data): self.scaler.fit(data[CONTEXT_FEATURES]); joblib.dump(self.scaler, self.calibrated_context_scaler_path) def build_context_vector(self, df): df_copy = df.copy(); df_copy[CONTEXT_FEATURES] = df_copy[CONTEXT_FEATURES].astype(float) last_row = df_copy[CONTEXT_FEATURES].iloc[-1:].values; scaled_vec = self.scaler.transform(last_row).flatten() last_regime = df_copy.iloc[-1]['regime']; regime_vec = np.zeros(len(REGIME_COLS)) try: regime_vec[REGIME_COLS.index(f"regime_{last_regime}")] = 1 except ValueError: pass return np.concatenate([scaled_vec, regime_vec]) class LiveDataStore: def __init__(self, api_key, tokenizer, model, interval=120): self.api_key, self.tokenizer, self.model, self.interval = api_key, tokenizer, model, interval self._data_lock = threading.Lock(); self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "N/A", "time_str": "N/A"} self._stop = threading.Event(); self._thread = None def _fetch_and_update(self): try: price_data = fetch_twelvedata_prices(self.api_key, output_size=500) if not price_data.empty: features, next_event = create_feature_set_for_inference(price_data, fetch_live_events_with_cache(), self.tokenizer, self.model) with self._data_lock: self._latest_features, self._next_event_info = features, next_event except Exception as e: print(f"LiveDataStore Error: {e}") def _update_loop(self): while not self._stop.is_set(): self._fetch_and_update(); self._stop.wait(self.interval) def start(self): self._stop.clear(); self._thread = threading.Thread(target=self._update_loop, daemon=True); self._thread.start(); self._fetch_and_update() def get_latest_data(self, num_bars=None): with self._data_lock: return (self._latest_features.iloc[-num_bars:].copy() if num_bars and not self._latest_features.empty else self._latest_features.copy()), self._next_event_info def get_raw_price_data(self, num_bars=None): with self._data_lock: if self._latest_features.empty: return pd.DataFrame() df = self._latest_features[['open', 'high', 'low', 'close']].copy() if num_bars: df = df.iloc[-num_bars:] df.reset_index(inplace=True); df['Datetime'] = df['Datetime'].dt.tz_localize('UTC') if df['Datetime'].dt.tz is None else df['Datetime'].dt.tz_convert('UTC'); return df def fetch_live_events_with_cache(): if _EVENT_CACHE.get("data") and (time.time() - _EVENT_CACHE.get("timestamp", 0) < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"] try: r = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V17-Agent/1.0"}, timeout=10); r.raise_for_status(); data = r.json() _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, time.time(); return data except requests.RequestException: return _EVENT_CACHE.get("data", []) def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): try: ts = TDClient(apikey=api_key).time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df except Exception: return pd.DataFrame() def create_feature_set_for_inference(price_df, events_json, tokenizer, model): price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) price_features = price_features.tz_localize('UTC') if price_features.index.tz is None else price_features.tz_convert('UTC') price_features.rename(columns={'close': 'Price', 'open': 'Open', 'high': 'High', 'low': 'Low'}, inplace=True); delta = price_features['Price'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean() high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()) tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean() price_features.rename(columns={'Price': 'close', 'Open': 'open', 'High': 'high', 'Low': 'low'}, inplace=True); events = pd.DataFrame(events_json) next_event_info = {"title": "None within 24h", "time_str": "N/A"}; processed_events = pd.DataFrame() if not events.empty and 'date' in events.columns: events = events[events['country'].isin(['USD', 'EUR'])].copy(); events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) inputs = tokenizer(events['title'].fillna('').tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = model(inputs).last_hidden_state[:, 0, :].numpy() # FIXED BUG #1: Use embeddings.shape processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape)], index=events.index)], axis=1) merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) all_high_impact = events[events['impact'] == 'High'] if 'impact' in events.columns and not events.empty else pd.DataFrame() if not all_high_impact.empty: upcoming = all_high_impact[all_high_impact.index > merged_data.index[-1]] if not upcoming.empty: # FIXED BUG #2: Use .iloc to select the first row as a Series next_event = upcoming.iloc time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0 merged_data['time_to_event'] = time_to_next next_event_info = {"title": f"{next_event['country']} {next_event['title']}", "time_str": f"in {time_to_next:.2f}h"} else: merged_data['time_to_event'] = 9999 # FIXED BUG #3: Convert pandas index to numpy array before multi-dimensional indexing df_idx = merged_data.index.to_numpy().astype(np.int64) // 10**9 evt_times = all_high_impact.index.to_numpy().astype(np.int64) // 10**9 time_diffs = df_idx[:, None] - evt_times[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; merged_data[finbert_cols] = merged_data[finbert_cols].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True); return merged_data, next_event_info def evaluate_pending_signals_v2(perf_logger, bandit, change_detector, live_data_store): if not perf_logger.ref: return; now_utc = pd.Timestamp.now(tz='UTC') try: all_signals = perf_logger.ref.get() if not all_signals: return; live_prices = live_data_store.get_raw_price_data(num_bars=288) if live_prices.empty: return for k, s in all_signals.items(): if s.get('reward') is not None: continue; sig_time = pd.to_datetime(s['timestamp']) if now_utc < (sig_time + pd.Timedelta(minutes=5)): continue entry, sl, tp, action = float(s['entry']), float(s['stop_loss']), float(s['take_profit']), s['action']; relevant_bars = live_prices[live_prices['Datetime'] > sig_time] if relevant_bars.empty: continue; outcome, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] for _, bar in relevant_bars.iterrows(): if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome = sl, "SL"; break if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome = tp, "TP"; break pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price); reward = np.clip(pnl / 0.005, -1.0, 1.0); ctx = np.array(s['context_vector']); bandit.update(s['strategy'], ctx, reward) if change_detector.update(-reward): bandit.increase_exploration() perf_logger.ref.child(k).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome}) except Exception as e: print(f"Evaluator Error: {e}") def send_ntfy_notification(topic, thesis): if topic: message = (f"Strategy: {thesis.get('strategy_type')} ({thesis.get('confidence')})\n" f"Reason: {thesis.get('reasoning')}\n" f"Entry: {thesis.get('entry')} | SL: {thesis.get('stop_loss')} | TP: {thesis.get('take_profit')}") headers = {"Title": f"V17 Signal: {thesis.get('action')} EUR/USD"} try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers=headers) except requests.RequestException: pass def download_models_from_hf(repo_id, token): return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=token) # ============================================================================== # --- MAIN WORKER & ORCHESTRATOR --- # ============================================================================== def main_worker(): print("--- [Persistent Agent V17 - The Evolution Agent] Worker Thread Started ---") api_key, hf_token, ntfy_topic, groq_key = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V17'), os.environ.get('GROQ_API_KEY') if not groq_key: print("FATAL: GROQ_API_KEY not set."); return tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert') model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) pred_engine = PredictionCoreTransformer() ctx_preprocessor = ContextVectorPreprocessor() live_store = LiveDataStore(api_key, tokenizer, model) live_store.start() if not pred_engine.is_calibrated() or not ctx_preprocessor.is_calibrated(): print("Calibrating agent for the first time...") base_model_path = download_models_from_hf("Badumetsibb/conscious-trading-agent-models", hf_token) calib_prices = fetch_twelvedata_prices(api_key, output_size=5000) if len(calib_prices) < 500: print("FATAL: Not enough data for calibration."); return calib_features, _ = create_feature_set_for_inference(calib_prices, fetch_live_events_with_cache(), tokenizer, model) pred_engine.calibrate(base_model_path, calib_features.copy()) calib_with_regime = CausalReasoningNetwork(calib_features.copy()).identify_volatility_regimes() ctx_preprocessor.calibrate(calib_with_regime) else: pred_engine.load_calibrated_artifacts() ctx_preprocessor.load_calibrated_scaler() miner = ConceptMiner(log_path='signals_v17') namer = ConceptNamer(api_key=groq_key) veto_system = ConceptVetoSystem() logger = RTDBLoggerV2(db_ref_name='signals_v17') sit_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) regime_filter = MarketRegimeFilter() strat_manager = StrategyManager(sit_room, pred_engine) bandit = LinUCBBandit(strat_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) change_detector = PageHinkley() def evolution_cycle(): time.sleep(1800) while True: try: new_patterns = miner.run_analysis() if new_patterns: for p in new_patterns: if all(c.get('source_cluster_id') != p['source_cluster_id'] for c in veto_system.concepts.values()): new_concept = namer.name_new_concept(p) if new_concept: cid = new_concept['concept_name'].lower().replace(' ', '_') + f"_c{p['source_cluster_id']}" new_concept['source_cluster_id'] = p['source_cluster_id'] db.reference(f"concepts/{cid}").set(new_concept) print(f"EVOLUTION_ENGINE: ✅ New concept '{new_concept['concept_name']}' learned!") veto_system.load_concepts_from_firebase() except Exception as e: print(f"Evolution Cycle Error: {e}") time.sleep(7200) threading.Thread(target=evolution_cycle, daemon=True).start() print("--- WORKER V17: Initialization Complete. ---") while True: try: features, next_event = live_store.get_latest_data(num_bars=pred_engine.sequence_length) if features.empty or len(features) < pred_engine.sequence_length: time.sleep(60); continue features_with_regime = CausalReasoningNetwork(features.copy()).identify_volatility_regimes() input_seq = features_with_regime.iloc[-pred_engine.sequence_length:] latest_ctx = input_seq.iloc[-1] ctx_vec = ctx_preprocessor.build_context_vector(input_seq) chosen_strat = bandit.select(ctx_vec) thesis, preds_str = strat_manager.list_strategies()[chosen_strat](input_seq) is_vetoed, veto_reason = veto_system.check_for_veto(latest_ctx, chosen_strat) is_tradeable = regime_filter.should_trade(latest_ctx['regime'], thesis) final_action, final_reasoning = thesis['action'], f"Bandit chose '{chosen_strat}'. Thesis: '{thesis['reasoning']}'" if is_vetoed: final_action, final_reasoning = "NO TRADE", final_reasoning + f" -> ❌ VETOED by Concept: '{veto_reason}'." elif not is_tradeable and final_action != "NO TRADE": final_action, final_reasoning = "NO TRADE", final_reasoning + " -> ❌ REJECTED by Regime Filter." elif final_action != "NO TRADE": final_reasoning += " -> ✅ EXECUTABLE." if final_action in ["BUY", "SELL"]: ts = pd.Timestamp.now(tz='UTC').isoformat() logger.log_signal(ts, chosen_strat, final_action, thesis['entry'], thesis['stop_loss'], thesis['take_profit'], ctx_vec) send_ntfy_notification(ntfy_topic, thesis) evaluate_pending_signals_v2(logger, bandit, change_detector, live_store) status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{latest_ctx['close']:.5f}", "market_regime": latest_ctx['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event['title']} ({next_event['time_str']})"} with open('status_v17.json', 'w') as f: json.dump(status, f) print(f"WORKER V17: Cycle complete. Signal: {final_action}. Sleeping.") time.sleep(300) except Exception as e: print(f"Main Loop Error: {e}"); time.sleep(60) # ============================================================================== # --- GRADIO DASHBOARD AND STARTUP --- # ============================================================================== def get_latest_status_v17(): if not os.path.exists('status_v17.json'): return "Initializing...", "N/A", "N/A", "N/A", "Waiting for first cycle.", "N/A", "N/A" try: with open('status_v17.json', 'r') as f: s = json.load(f) return (f"Status at: {s.get('last_checked', 'N/A')}", s.get('market_price', 'N/A'), s.get('market_regime', 'N/A'), s.get('signal', 'N/A'), s.get('reasoning', 'N/A'), s.get('predictions', 'N/A'), s.get('next_event', 'N/A')) except (json.JSONDecodeError, IOError): return "Error reading status.", "N/A", "N/A", "N/A", "File may be in use.", "N/A", "N/A" if __name__ == "__main__": required_secrets = ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'GROQ_API_KEY', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL', 'NTFY_TOPIC_V17'] if not all(os.environ.get(k) for k in required_secrets): print(f"FATAL: Missing secrets. Please set: {required_secrets}") exit() try: sa_key = json.loads(os.environ.get('FIRESTORE_SA_KEY')) cred = credentials.Certificate(sa_key) if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': os.environ.get('FIREBASE_DB_URL')}) print("✅ Firebase connection established.") except Exception as e: print(f"FATAL: Firebase init failed: {e}") exit() worker_thread = threading.Thread(target=main_worker, daemon=True) worker_thread.start() with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧠 V17 Evolution Agent") gr.Markdown("**Secrets Status:** ✅ All required secrets appear to be set.") refresh_btn = gr.Button("Refresh Status", variant="primary") status_output = gr.Textbox(label="Status", interactive=False) gr.Markdown("## Agent's Last Analysis") with gr.Row(): price_output = gr.Textbox(label="Last Market Price") regime_output = gr.Textbox(label="Last Market Regime") with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)") event_output = gr.Textbox(label="Next High-Impact Event") action_output = gr.Textbox(label="Final Signal / Action") reasoning_output = gr.Textbox(label="Full Reasoning", lines=3) refresh_btn.click(fn=get_latest_status_v17, inputs=[], outputs=[ status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output ]) demo.launch()