# app_v20_complete_and_final.py - The full, working script with the definitive Keras fix. # --- Core Libraries --- import pandas as pd import numpy as np import warnings import joblib import json import os import gradio as gr import requests import time from datetime import datetime import pytz import threading import math from huggingface_hub import hf_hub_download import firebase_admin from firebase_admin import credentials, db import traceback # --- Environment and Dependencies --- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') # --- CRITICAL FIX: Force the entire environment to use the Keras 2 compatibility layer --- # This line MUST come before the tensorflow and transformers imports. os.environ['TF_USE_LEGACY_KERAS'] = '1' # --- Machine Learning & Deep Learning Libraries --- import tensorflow as tf from tensorflow import keras from keras import layers, Model from sklearn.preprocessing import MinMaxScaler from keras.utils import Sequence # --- NLP & Self-Evolution Libraries --- try: from transformers import BertTokenizer, TFBertModel, pipeline import torch NLP_LIBRARIES_AVAILABLE = True except ImportError: NLP_LIBRARIES_AVAILABLE = False # --- Live Data Fetching Configuration --- from twelvedata import TDClient EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" CACHE_DURATION_SECONDS = 600 _EVENT_CACHE = {"data": None, "timestamp": 0} # --- ALL CLASS AND FUNCTION DEFINITIONS --- class CausalReasoningNetwork: def __init__(self, processed_data): self.data = processed_data.copy() def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): atr = self.data[volatility_indicator] low_vol_threshold = atr.quantile(0.33) high_vol_threshold = atr.quantile(0.66) ema_slope = self.data[trend_indicator].diff(periods=3) regimes = [] for i in range(len(self.data)): atr_val = atr.iloc[i] slope_val = ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 if atr_val > high_vol_threshold: regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT' regimes.append(regime) elif atr_val < low_vol_threshold: regimes.append('RANGING') else: regimes.append('CHOPPY') self.data['regime'] = regimes return self.data class RuleBasedSituationRoom: def __init__(self, params): self.params = params def generate_thesis(self, predictions, sequence_df): latest_data = sequence_df.iloc[-1] current_price = latest_data['close'] if not predictions: dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL" action, confidence, strategy, reasoning = dir_ema, "LOW", "Trend Following", f"Simple EMA Crossover ({dir_ema})." else: dir_5m = "BUY" if predictions['5m'] > current_price else "SELL" dir_15m = "BUY" if predictions['15m'] > current_price else "SELL" dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play" if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following" elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp" elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following" if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning} atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) if confidence == "MEDIUM": tp_mult *= 0.75 if confidence == "LOW": tp_mult *= 0.5 if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy} class MarketRegimeFilter: def __init__(self): self.allowed_strategies = { 'TRENDING': ['Trend Following'], 'BREAKOUT': ['Trend Following', 'Scalp'], 'CHOPPY': ['Scalp', 'Mean Reversion'], 'RANGING': ['Mean Reversion'] } def should_trade(self, current_regime, trade_thesis): if trade_thesis['action'] == 'NO_TRADE': return False return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, []) def fetch_live_events_with_cache(): current_time = time.time() if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"] try: response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V20-Agent/1.0"}, timeout=10) response.raise_for_status() data = response.json() _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time return data except requests.RequestException as e: print(f"Error fetching event data: {e}") return _EVENT_CACHE.get("data", []) def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): try: td = TDClient(apikey=api_key) ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") df = ts.as_pandas().sort_index(ascending=True) df.index.name = 'Datetime' df.reset_index(inplace=True) return df except Exception as e: print(f"Error fetching price data: {e}") return pd.DataFrame() def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model): price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer') else: price_features = price_features.tz_convert('UTC') price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True) events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"} if not events.empty and 'date' in events.columns: RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy() events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) def parse_financial_number(s): if not isinstance(s, str) or not s: return np.nan s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]] else: multiplier = 1.0 try: return float(val_str) * multiplier except (ValueError, TypeError): return np.nan if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0) else: events['surprise'] = 0 events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('') if not events.empty and NLP_LIBRARIES_AVAILABLE: inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy() processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1) merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame() if not all_high_impact_events.empty: upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]] if not upcoming_events.empty: next_event = upcoming_events.iloc[0]; next_event_time = next_event.name; time_to_next = (next_event_time - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"} else: merged_data['time_to_event'] = 9999 df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9; time_diffs = df_index_sec[:, None] - event_times_sec[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int) finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True) return merged_data, next_event_info class PredictionCoreTransformer: def __init__(self, sequence_length=48): self.scaler = None; self.model = None; self.sequence_length = sequence_length; self.feature_names = None self.calibrated_model_path = 'calibrated_model.keras'; self.calibrated_scaler_path = 'calibrated_scaler.joblib'; self.calibrated_features_path = 'calibrated_features.json' def is_calibrated(self): return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) def load_calibrated_artifacts(self): print("--- Loading pre-calibrated artifacts for instant startup ---") self.model = keras.models.load_model(self.calibrated_model_path) self.scaler = joblib.load(self.calibrated_scaler_path) with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) print("--- Instant startup successful ---"); return True def calibrate(self, base_model_path, calibration_data): print("--- STARTING ONE-TIME AGENT CALIBRATION ---"); print(f"Loaded calibration data with {len(calibration_data)} rows.") self.feature_names = [col for col in calibration_data.columns if col not in ['regime', 'close', 'open', 'high', 'low', 'event', 'impact', 'title', 'detail', 'country', 'date', 'currency', 'actual', 'forecast', 'previous']] print(f"Identified {len(self.feature_names)} features for scaling and training.") self.scaler = MinMaxScaler(feature_range=(0, 1)); features_to_scale_df = calibration_data[self.feature_names]; self.scaler.fit(features_to_scale_df) print("New, compatible scaler has been fitted.") self.model = keras.models.load_model(base_model_path) print("Pre-trained model loaded successfully.") print("Starting fine-tuning process..."); class CalibrationGenerator(Sequence): def __init__(self, data, scaler, feature_names, seq_len): self.data = data.copy(); self.scaler = scaler; self.feature_names = feature_names; self.seq_len = seq_len self.data['target_5m'] = self.data['open'].shift(-1); self.data['target_15m'] = self.data['open'].shift(-3); self.data['target_1h'] = self.data['open'].shift(-12) self.data.dropna(inplace=True); self.features_df = self.data[self.feature_names]; self.targets_df = self.data[['target_5m', 'target_15m', 'target_1h']]; self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len def __len__(self): return self.n_samples def __getitem__(self, idx): seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names)) y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']] return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])} calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length) self.model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output': 'mean_squared_error', '15m_output': 'mean_squared_error', '1h_output': 'mean_squared_error'}) self.model.fit(calibration_generator, epochs=3, verbose=1) self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path) with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f) print("--- AGENT RE-CALIBRATION AND ARTIFACT SAVING COMPLETE ---"); return True def predict_single(self, input_sequence): if not self.scaler or not self.model: raise RuntimeError("Agent has not been calibrated. Cannot make predictions.") features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model); reshaped_sequence = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped_sequence, verbose=0) real_price_predictions = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {real_price_predictions['5m']:.5f} | 15m: {real_price_predictions['15m']:.5f} | 1h: {real_price_predictions['1h']:.5f}" return real_price_predictions, preds_str class LinUCBBandit: def __init__(self, strategies, d, alpha=1.0, regularization=1.0): self.strategies = list(strategies) self.d = d self.alpha = alpha self.reg = regularization self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies} self.b = {s: np.zeros(self.d) for s in self.strategies} def select(self, context_vector): scores = {} for s in self.strategies: A_inv = np.linalg.inv(self.A[s]) theta = A_inv.dot(self.b[s]) mean_reward = theta.dot(context_vector) uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector)) scores[s] = mean_reward + uncertainty return max(scores, key=scores.get) def update(self, strategy, context_vector, reward): x = context_vector.reshape(-1) if strategy not in self.A: self.A[strategy] = (self.reg * np.eye(self.d)) self.b[strategy] = np.zeros(self.d) self.A[strategy] += np.outer(x, x) self.b[strategy] += reward * x def increase_exploration(self, factor=1.2): self.alpha *= factor print(f"CONCEPT DRIFT ACTION: Increased bandit exploration alpha to {self.alpha:.2f}") class PageHinkley: def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3): self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0 def update(self, x): self.mean = self.mean * self.alpha + x * (1 - self.alpha) self.cumulative = max(0, self.cumulative + x - self.mean - self.delta) if self.cumulative > self.lambda_: self.cumulative = 0 return True return False class FirebaseManager: def __init__(self, db_signals_ref_name='signals_v20', db_strategies_ref_name='generated_strategies_v20'): self.signals_ref = None self.strategies_ref = None try: sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') if not all([sa_key_json, db_url]): print("FIREBASE MANAGER: Secrets not set. Logger disabled.") return cred = credentials.Certificate(json.loads(sa_key_json)) if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url}) self.signals_ref = db.reference(db_signals_ref_name) self.strategies_ref = db.reference(db_strategies_ref_name) print(f"FIREBASE MANAGER: Successfully connected. Signals: '{db_signals_ref_name}', Strategies: '{db_strategies_ref_name}'.") except Exception as e: print(f"FIREBASE MANAGER: CRITICAL ERROR - Failed to initialize: {e}") def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, market_regime, confidence, predictions_str): if not self.signals_ref: return try: signal_data = { "timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry), "stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()], "market_regime_at_trade": market_regime, "confidence_at_trade": confidence, "predictions_at_trade": predictions_str, "pnl": None, "reward": None, "outcome_reason": None } self.signals_ref.push(signal_data) except Exception as e: print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not write signal: {e}") def save_strategy(self, strategy_name, code_string): if not self.strategies_ref: return try: self.strategies_ref.child(strategy_name).set({"code": code_string, "created_at": time.time()}) print(f"FIREBASE MANAGER: Saved new strategy '{strategy_name}' to database.") except Exception as e: print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not save strategy: {e}") def load_all_strategies(self): if not self.strategies_ref: return {} try: return self.strategies_ref.get() or {} except Exception as e: print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not load strategies: {e}") return {} class StrategyManager: def __init__(self, situation_room, prediction_engine): self.situation_room = situation_room self.prediction_engine = prediction_engine self._strategies = self._initialize_base_strategies() def _initialize_base_strategies(self): def predictive_strategy(seq): preds_dict, preds_str = self.prediction_engine.predict_single(seq) return self.situation_room.generate_thesis(preds_dict, seq), preds_str def ema_crossover_strategy(seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)" return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy} def list_strategies(self): return self._strategies def add_strategy(self, name, function_obj): print(f"STRATEGY MANAGER: Adding new strategy '{name}' to the active pool.") self._strategies[name] = function_obj def load_evolved_strategies(self, firebase_manager): print("STRATEGY MANAGER: Checking for evolved strategies in the database...") evolved_strategies = firebase_manager.load_all_strategies() count = 0 for name, data in evolved_strategies.items(): code = data.get("code") if code: try: namespace = {} exec(code, globals(), namespace) strategy_func = namespace['generated_strategy'] def wrapper(func): def inner(sequence_df): thesis = func(sequence_df) return thesis, "N/A (Evolved)" return inner self.add_strategy(name, wrapper(strategy_func)) count += 1 except Exception as e: print(f"STRATEGY MANAGER: ERROR - Failed to load evolved strategy '{name}'. Reason: {e}") print(f"STRATEGY MANAGER: Loaded {count} evolved strategies from the database.") class CognitiveCore: def __init__(self, firebase_manager): self.firebase_manager = firebase_manager def analyze_performance_history(self): print("COGNITIVE CORE: Analyzing historical performance...") if not self.firebase_manager.signals_ref: return None all_signals = self.firebase_manager.signals_ref.get() if not all_signals: print("COGNITIVE CORE: No history to analyze."); return None df = pd.DataFrame.from_dict(all_signals, orient='index') df = df.dropna(subset=['reward', 'market_regime_at_trade']) if df.empty or len(df) < 20: print(f"COGNITIVE CORE: Not enough completed trades ({len(df)}) to analyze. Need at least 20."); return None regime_performance = df.groupby('market_regime_at_trade')['reward'].mean() if regime_performance.empty: return None worst_regime = regime_performance.idxmin() worst_regime_score = regime_performance.min() if worst_regime_score >= -0.1: print(f"COGNITIVE CORE INSIGHT: Performance is acceptable. No evolution needed."); return None memo = f""" **To:** Strategy Lab **From:** Cognitive Core **Subject:** Performance Analysis & Strategic Gaps **Key Findings:** A critical weakness has been identified in the **'{worst_regime.upper()}'** market regime, with an average reward of {worst_regime_score:.2f}. **Strategic Directive:** Generate a new **mean-reversion** strategy for **'{worst_regime.upper()}'** markets. The strategy should SELL when RSI is overbought (e.g., > 70) and BUY when RSI is oversold (e.g., < 30). Use a tight stop-loss of 1.5 times the ATR. """ print(f"COGNITIVE CORE INSIGHT: Generated memo targeting poor performance in '{worst_regime}' regime.") return memo class LocalStrategyLab: def __init__(self, model_name="microsoft/Phi-3-mini-4k-instruct"): self.generator = None if not NLP_LIBRARIES_AVAILABLE: print("STRATEGY LAB: WARNING - Transformers/Torch not installed. Strategy generation disabled.") return print(f"STRATEGY LAB: Initializing local LLM: {model_name}...") try: self.generator = pipeline( "text-generation", model=model_name, torch_dtype=torch.bfloat16, device_map="auto" ) print("STRATEGY LAB: LLM Initialized successfully.") except Exception as e: print(f"STRATEGY LAB: CRITICAL ERROR - Failed to initialize LLM pipeline: {e}") def _create_prompt(self, performance_memo): return f"""You are an expert quantitative trading strategist. Your task is to design a new, syntactically correct Python trading strategy function based on a performance analysis memo. **Performance Memo:** --- {performance_memo} --- **Instructions:** 1. Write a single Python function. The function name MUST be `generated_strategy`. 2. The function must accept one argument: `sequence_df`, a pandas DataFrame. The last row (`sequence_df.iloc[-1]`) is the current timestep. 3. The function MUST return a Python dictionary with the trade thesis. 4. The dictionary keys must include: "action", "entry", "stop_loss", "take_profit", "confidence", "reasoning", and "strategy_type". 5. For "action", use "BUY", "SELL", or "NO_TRADE". 6. Base your strategy logic directly on the 'Strategic Directive' in the memo. Use standard indicators available in the DataFrame like 'RSI', 'EMA_20', and 'ATR'. 7. The value for "strategy_type" should be "Mean Reversion". Begin the Python code block now. ```python """ def generate_new_strategy_code(self, memo): if not self.generator or not memo: return None prompt = self._create_prompt(memo) outputs = self.generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.6, top_k=50, top_p=0.95) generated_text = outputs['generated_text'] try: code = generated_text.split("```python")[1].split("```").strip() print("STRATEGY LAB: Successfully generated new strategy code.") return code except IndexError: print("STRATEGY LAB: ERROR - Could not extract Python code from LLM response."); return None CONTEXT_FEATURES = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day'] REGIME_COLS = ['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING'] CONTEXT_DIMENSION = len(CONTEXT_FEATURES) + len(REGIME_COLS) class ContextVectorPreprocessor: def __init__(self): self.scaler = MinMaxScaler(feature_range=(-1, 1)) self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib' self.feature_names = CONTEXT_FEATURES self.regime_cols = REGIME_COLS def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path) def load_calibrated_scaler(self): if self.is_calibrated(): print(f"--- Loading pre-calibrated context scaler ---") self.scaler = joblib.load(self.calibrated_context_scaler_path) return True return False def calibrate(self, calibration_data_for_context): print("--- Fitting Context Scaler ---") self.scaler.fit(calibration_data_for_context[self.feature_names]) joblib.dump(self.scaler, self.calibrated_context_scaler_path) print("--- Context Scaler calibration complete ---") def build_context_vector(self, df_with_regime): if 'regime' not in df_with_regime.columns: raise ValueError("DataFrame must contain a 'regime' column.") df_copy = df_with_regime.copy() df_copy[self.feature_names] = df_copy[self.feature_names].astype(float) last_row_features = df_copy[self.feature_names].iloc[-1:].values scaled_numeric_vec = self.scaler.transform(last_row_features).flatten() last_regime = df_copy.iloc[-1]['regime'] regime_vec = np.zeros(len(self.regime_cols)) try: regime_idx = self.regime_cols.index(f"regime_{last_regime}") regime_vec[regime_idx] = 1 except ValueError: print(f"Warning: Regime '{last_regime}' not found. Setting regime vector to zeros.") return np.concatenate([scaled_numeric_vec, regime_vec]) class LiveDataStore: def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120): self.api_key = api_key; self.finbert_tokenizer = finbert_tokenizer; self.finbert_model = finbert_model self.update_interval_seconds = update_interval_seconds; self._data_lock = threading.Lock() self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "None within 24h", "time_str": "N/A"} self._stop_event = threading.Event(); self._update_thread = None; self._last_update_time = None def _fetch_and_update(self, output_size=500): start_time = time.time() try: price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size) if price_data.empty: return events_data = fetch_live_events_with_cache() features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model) with self._data_lock: self._latest_features, self._next_event_info, self._last_update_time = features, next_event_info, pd.Timestamp.now(tz='UTC') except Exception as e: print(f"LiveDataStore ERROR during update: {e}") elapsed_time = time.time() - start_time sleep_duration = max(0, self.update_interval_seconds - elapsed_time) if sleep_duration > 0: self._stop_event.wait(sleep_duration) def _update_loop(self): while not self._stop_event.is_set(): self._fetch_and_update() def start(self): if self._update_thread is None or not self._update_thread.is_alive(): print("LiveDataStore: Starting background update thread.") self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True) self._update_thread.start(); print("LiveDataStore: Performing initial data fetch."); self._fetch_and_update() def get_latest_data(self, num_bars=None): with self._data_lock: if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info return self._latest_features.copy(), self._next_event_info def get_raw_price_data(self, num_bars=None): with self._data_lock: if self._latest_features.empty: return pd.DataFrame() df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy() if num_bars: df_slice = df_slice.iloc[-num_bars:] df_slice.reset_index(inplace=True) if df_slice['Datetime'].dt.tz is None: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_localize('UTC') else: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_convert('UTC') return df_slice def download_models_from_hf(repo_id, hf_token): print("Downloading base agent model from Hugging Face Hub...") try: return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=hf_token) except Exception as e: print(f"FATAL: Failed to download base model: {e}"); raise def send_ntfy_notification(topic, trade_thesis): if not topic: return title = f"V20 Signal: {trade_thesis.get('action')} EUR/USD" message = (f"Strategy: {trade_thesis.get('strategy_type')} ({trade_thesis.get('confidence')})\n" f"Reason: {trade_thesis.get('reasoning')}\n" f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}") try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.") except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}") def evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store): if not firebase_manager.signals_ref: return now_utc = pd.Timestamp.now(tz='UTC') try: all_signals = firebase_manager.signals_ref.get(); if not all_signals: return live_price_history = live_data_store.get_raw_price_data(num_bars=288) if live_price_history.empty: return for key, signal in all_signals.items(): if signal.get('reward') is not None: continue signal_time = pd.to_datetime(signal['timestamp']) if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue entry, sl, tp, action = float(signal['entry']), float(signal['stop_loss']), float(signal['take_profit']), signal['action'] relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time] if relevant_bars.empty: continue outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] for _, bar in relevant_bars.iterrows(): if action == 'BUY': if bar['low'] <= sl: exit_price, outcome_reason = sl, "SL"; break if bar['high'] >= tp: exit_price, outcome_reason = tp, "TP"; break elif action == 'SELL': if bar['high'] >= sl: exit_price, outcome_reason = sl, "SL"; break if bar['low'] <= tp: exit_price, outcome_reason = tp, "TP"; break pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price) reward = np.clip(pnl / 0.005, -1.0, 1.0) context_vector = np.array(signal['context_vector']) bandit.update(signal['strategy'], context_vector, reward) if change_detector.update(-reward): print("! CONCEPT DRIFT DETECTED !"); bandit.increase_exploration() firebase_manager.signals_ref.child(key).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome_reason}) print(f"EVALUATOR: Updated signal from {signal_time.isoformat()}. Outcome: {outcome_reason}, PnL: {pnl:.5f}") except Exception as e: print(f"EVALUATOR V2 ERROR: {e}"); traceback.print_exc() # --- MAIN WORKER FUNCTION --- def main_worker(): print("--- [Conscious Agent V20 - Definitive Fix] Worker Thread Started ---") api_key, hf_token, ntfy_topic = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V2') HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models" if not NLP_LIBRARIES_AVAILABLE: print("CRITICAL: Transformers or PyTorch not found. Self-evolution will be disabled.") finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert') finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) prediction_engine = PredictionCoreTransformer() context_preprocessor = ContextVectorPreprocessor() if prediction_engine.is_calibrated() and context_preprocessor.is_calibrated(): prediction_engine.load_calibrated_artifacts() context_preprocessor.load_calibrated_scaler() else: print("No calibrated artifacts found. Starting one-time production calibration...") base_model_path = download_models_from_hf(HF_REPO_ID, hf_token) calibration_price_data = fetch_twelvedata_prices(api_key, output_size=5000) if calibration_price_data.empty or len(calibration_price_data) < 500: print("FATAL: Could not fetch enough data for calibration. Worker stopping."); return calibration_events_data = fetch_live_events_with_cache() calibration_features, _ = create_feature_set_for_inference(calibration_price_data, calibration_events_data, finbert_tokenizer, finbert_model) prediction_engine.calibrate(base_model_path, calibration_features.copy()) causal_engine_for_calibration = CausalReasoningNetwork(calibration_features.copy()) calibration_features_with_regime = causal_engine_for_calibration.identify_volatility_regimes() context_preprocessor.calibrate(calibration_features_with_regime.copy()) firebase_manager = FirebaseManager() live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model) live_data_store.start() situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) strategy_manager = StrategyManager(situation_room, prediction_engine) strategy_manager.load_evolved_strategies(firebase_manager) bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) change_detector = PageHinkley() cognitive_core = CognitiveCore(firebase_manager) strategy_lab = LocalStrategyLab() last_evolution_check = time.time() EVOLUTION_CYCLE_SECONDS = 86400 print("--- WORKER V20: Initialization Complete. Starting main adaptive loop. ---") while True: try: print(f"WORKER V20: [{pd.Timestamp.now(tz='UTC')}] Waking up...") features, next_event_info = live_data_store.get_latest_data() if features.empty or len(features) < prediction_engine.sequence_length: print("WORKER V20: Not enough data. Waiting..."); time.sleep(300); continue causal_engine = CausalReasoningNetwork(features.copy()) features_with_regime = causal_engine.identify_volatility_regimes() input_sequence = features_with_regime.iloc[-prediction_engine.sequence_length:] ctx_vec = context_preprocessor.build_context_vector(input_sequence) chosen_strategy_name = bandit.select(ctx_vec) trade_thesis, preds_str = strategy_manager.list_strategies()[chosen_strategy_name](input_sequence) current_regime = input_sequence.iloc[-1]['regime'] is_tradeable = MarketRegimeFilter().should_trade(current_regime, trade_thesis) final_action = trade_thesis['action'] if is_tradeable else "NO TRADE" if final_action in ["BUY", "SELL"]: ts = pd.Timestamp.now(tz='UTC').isoformat() firebase_manager.log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'], trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec, current_regime, trade_thesis['confidence'], preds_str) send_ntfy_notification(ntfy_topic, trade_thesis) evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store) final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}'" if trade_thesis['action'] != 'NO TRADE': final_reasoning += f" -> {'✅ EXECUTABLE.' if is_tradeable else f'❌ REJECTED by Regime Filter for {current_regime}.'}" status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": current_regime, "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info['time_str']})"} with open('status.json', 'w') as f: json.dump(status, f) print(f"WORKER V20: Analysis complete. Signal: {final_action}. Sleeping.") if (time.time() - last_evolution_check) > EVOLUTION_CYCLE_SECONDS: print("\n--- INITIATING NIGHTLY SELF-EVOLUTION CYCLE ---") memo = cognitive_core.analyze_performance_history() if memo: new_code = strategy_lab.generate_new_strategy_code(memo) if new_code: new_strategy_name = f"evolved_strategy_{int(time.time())}" firebase_manager.save_strategy(new_strategy_name, new_code) print("EVOLUTION: New strategy saved. It will be active after the next restart.") last_evolution_check = time.time() print("--- SELF-EVOLUTION CYCLE COMPLETE ---\n") time.sleep(300) except Exception as e: print(f"WORKER V20 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60) # --- GRADIO DASHBOARD --- def get_latest_status(): if not os.path.exists('status.json'): return "Agent is starting...", "", "", "", "", "", "" with open('status.json', 'r') as f: status = json.load(f) return (f"Status from worker at: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A')) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧠 V20 Conscious Agent (Definitive Fix)") gr.Markdown("This agent uses a reliable calibration flow and a persistent, database-driven memory for self-evolution.") all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V2', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]) secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING. Check all required secrets." gr.Markdown(f"**Secrets Status:** {secret_status}") refresh_btn = gr.Button("Refresh Status", variant="primary") status_output = gr.Textbox(label="Status", interactive=False) gr.Markdown("## Agent's Last Analysis") with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime") with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event") action_output = gr.Textbox(label="Final Signal / Action") reasoning_output = gr.Textbox(label="Full Reasoning", lines=3) refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output]) if __name__ == "__main__": if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN']]): print("FATAL: Core secrets (API Key, HF Token) are missing. Worker cannot start.") else: worker_thread = threading.Thread(target=main_worker, daemon=True) worker_thread.start() demo.launch()