Spaces:
Sleeping
Sleeping
| # app_v20_complete_and_final.py - The full, working script with the definitive Keras fix. | |
| # --- Core Libraries --- | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| import joblib | |
| import json | |
| import os | |
| import gradio as gr | |
| import requests | |
| import time | |
| from datetime import datetime | |
| import pytz | |
| import threading | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| import firebase_admin | |
| from firebase_admin import credentials, db | |
| import traceback | |
| # --- Environment and Dependencies --- | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') | |
| # --- CRITICAL FIX: Force the entire environment to use the Keras 2 compatibility layer --- | |
| # This line MUST come before the tensorflow and transformers imports. | |
| os.environ['TF_USE_LEGACY_KERAS'] = '1' | |
| # --- Machine Learning & Deep Learning Libraries --- | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from keras import layers, Model | |
| from sklearn.preprocessing import MinMaxScaler | |
| from keras.utils import Sequence | |
| # --- NLP & Self-Evolution Libraries --- | |
| try: | |
| from transformers import BertTokenizer, TFBertModel, pipeline | |
| import torch | |
| NLP_LIBRARIES_AVAILABLE = True | |
| except ImportError: | |
| NLP_LIBRARIES_AVAILABLE = False | |
| # --- Live Data Fetching Configuration --- | |
| from twelvedata import TDClient | |
| EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" | |
| CACHE_DURATION_SECONDS = 600 | |
| _EVENT_CACHE = {"data": None, "timestamp": 0} | |
| # --- ALL CLASS AND FUNCTION DEFINITIONS --- | |
| class CausalReasoningNetwork: | |
| def __init__(self, processed_data): | |
| self.data = processed_data.copy() | |
| def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): | |
| atr = self.data[volatility_indicator] | |
| low_vol_threshold = atr.quantile(0.33) | |
| high_vol_threshold = atr.quantile(0.66) | |
| ema_slope = self.data[trend_indicator].diff(periods=3) | |
| regimes = [] | |
| for i in range(len(self.data)): | |
| atr_val = atr.iloc[i] | |
| slope_val = ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 | |
| if atr_val > high_vol_threshold: | |
| regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT' | |
| regimes.append(regime) | |
| elif atr_val < low_vol_threshold: | |
| regimes.append('RANGING') | |
| else: | |
| regimes.append('CHOPPY') | |
| self.data['regime'] = regimes | |
| return self.data | |
| class RuleBasedSituationRoom: | |
| def __init__(self, params): | |
| self.params = params | |
| def generate_thesis(self, predictions, sequence_df): | |
| latest_data = sequence_df.iloc[-1] | |
| current_price = latest_data['close'] | |
| if not predictions: | |
| dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL" | |
| action, confidence, strategy, reasoning = dir_ema, "LOW", "Trend Following", f"Simple EMA Crossover ({dir_ema})." | |
| else: | |
| dir_5m = "BUY" if predictions['5m'] > current_price else "SELL" | |
| dir_15m = "BUY" if predictions['15m'] > current_price else "SELL" | |
| dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" | |
| action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play" | |
| if dir_5m == dir_15m == dir_1h: | |
| action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following" | |
| elif dir_5m == dir_15m: | |
| action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp" | |
| elif dir_15m == dir_1h: | |
| action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following" | |
| if action == "NO_TRADE": | |
| return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning} | |
| atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 | |
| sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) | |
| if confidence == "MEDIUM": tp_mult *= 0.75 | |
| if confidence == "LOW": tp_mult *= 0.5 | |
| if action == "BUY": | |
| entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) | |
| else: | |
| entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) | |
| return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy} | |
| class MarketRegimeFilter: | |
| def __init__(self): | |
| self.allowed_strategies = { | |
| 'TRENDING': ['Trend Following'], | |
| 'BREAKOUT': ['Trend Following', 'Scalp'], | |
| 'CHOPPY': ['Scalp', 'Mean Reversion'], | |
| 'RANGING': ['Mean Reversion'] | |
| } | |
| def should_trade(self, current_regime, trade_thesis): | |
| if trade_thesis['action'] == 'NO_TRADE': | |
| return False | |
| return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, []) | |
| def fetch_live_events_with_cache(): | |
| current_time = time.time() | |
| if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS): | |
| return _EVENT_CACHE["data"] | |
| try: | |
| response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V20-Agent/1.0"}, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time | |
| return data | |
| except requests.RequestException as e: | |
| print(f"Error fetching event data: {e}") | |
| return _EVENT_CACHE.get("data", []) | |
| def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): | |
| try: | |
| td = TDClient(apikey=api_key) | |
| ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") | |
| df = ts.as_pandas().sort_index(ascending=True) | |
| df.index.name = 'Datetime' | |
| df.reset_index(inplace=True) | |
| return df | |
| except Exception as e: | |
| print(f"Error fetching price data: {e}") | |
| return pd.DataFrame() | |
| def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model): | |
| price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) | |
| if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer') | |
| else: price_features = price_features.tz_convert('UTC') | |
| price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True) | |
| events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"} | |
| if not events.empty and 'date' in events.columns: | |
| RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy() | |
| events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) | |
| def parse_financial_number(s): | |
| if not isinstance(s, str) or not s: return np.nan | |
| s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s | |
| if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]] | |
| else: multiplier = 1.0 | |
| try: return float(val_str) * multiplier | |
| except (ValueError, TypeError): return np.nan | |
| if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0) | |
| else: events['surprise'] = 0 | |
| events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('') | |
| if not events.empty and NLP_LIBRARIES_AVAILABLE: | |
| inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy() | |
| processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1) | |
| merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) | |
| all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame() | |
| if not all_high_impact_events.empty: | |
| upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]] | |
| if not upcoming_events.empty: | |
| next_event = upcoming_events.iloc[0]; next_event_time = next_event.name; time_to_next = (next_event_time - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"} | |
| else: merged_data['time_to_event'] = 9999 | |
| df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9; time_diffs = df_index_sec[:, None] - event_times_sec[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 | |
| else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 | |
| merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int) | |
| finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True) | |
| return merged_data, next_event_info | |
| class PredictionCoreTransformer: | |
| def __init__(self, sequence_length=48): | |
| self.scaler = None; self.model = None; self.sequence_length = sequence_length; self.feature_names = None | |
| self.calibrated_model_path = 'calibrated_model.keras'; self.calibrated_scaler_path = 'calibrated_scaler.joblib'; self.calibrated_features_path = 'calibrated_features.json' | |
| def is_calibrated(self): | |
| return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) | |
| def load_calibrated_artifacts(self): | |
| print("--- Loading pre-calibrated artifacts for instant startup ---") | |
| self.model = keras.models.load_model(self.calibrated_model_path) | |
| self.scaler = joblib.load(self.calibrated_scaler_path) | |
| with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) | |
| print("--- Instant startup successful ---"); return True | |
| def calibrate(self, base_model_path, calibration_data): | |
| print("--- STARTING ONE-TIME AGENT CALIBRATION ---"); print(f"Loaded calibration data with {len(calibration_data)} rows.") | |
| self.feature_names = [col for col in calibration_data.columns if col not in ['regime', 'close', 'open', 'high', 'low', 'event', 'impact', 'title', 'detail', 'country', 'date', 'currency', 'actual', 'forecast', 'previous']] | |
| print(f"Identified {len(self.feature_names)} features for scaling and training.") | |
| self.scaler = MinMaxScaler(feature_range=(0, 1)); features_to_scale_df = calibration_data[self.feature_names]; self.scaler.fit(features_to_scale_df) | |
| print("New, compatible scaler has been fitted.") | |
| self.model = keras.models.load_model(base_model_path) | |
| print("Pre-trained model loaded successfully.") | |
| print("Starting fine-tuning process..."); | |
| class CalibrationGenerator(Sequence): | |
| def __init__(self, data, scaler, feature_names, seq_len): | |
| self.data = data.copy(); self.scaler = scaler; self.feature_names = feature_names; self.seq_len = seq_len | |
| self.data['target_5m'] = self.data['open'].shift(-1); self.data['target_15m'] = self.data['open'].shift(-3); self.data['target_1h'] = self.data['open'].shift(-12) | |
| self.data.dropna(inplace=True); self.features_df = self.data[self.feature_names]; self.targets_df = self.data[['target_5m', 'target_15m', 'target_1h']]; self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len | |
| def __len__(self): return self.n_samples | |
| def __getitem__(self, idx): | |
| seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names)) | |
| y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']] | |
| return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])} | |
| calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length) | |
| self.model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output': 'mean_squared_error', '15m_output': 'mean_squared_error', '1h_output': 'mean_squared_error'}) | |
| self.model.fit(calibration_generator, epochs=3, verbose=1) | |
| self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path) | |
| with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f) | |
| print("--- AGENT RE-CALIBRATION AND ARTIFACT SAVING COMPLETE ---"); return True | |
| def predict_single(self, input_sequence): | |
| if not self.scaler or not self.model: raise RuntimeError("Agent has not been calibrated. Cannot make predictions.") | |
| features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model); reshaped_sequence = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped_sequence, verbose=0) | |
| real_price_predictions = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {real_price_predictions['5m']:.5f} | 15m: {real_price_predictions['15m']:.5f} | 1h: {real_price_predictions['1h']:.5f}" | |
| return real_price_predictions, preds_str | |
| class LinUCBBandit: | |
| def __init__(self, strategies, d, alpha=1.0, regularization=1.0): | |
| self.strategies = list(strategies) | |
| self.d = d | |
| self.alpha = alpha | |
| self.reg = regularization | |
| self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies} | |
| self.b = {s: np.zeros(self.d) for s in self.strategies} | |
| def select(self, context_vector): | |
| scores = {} | |
| for s in self.strategies: | |
| A_inv = np.linalg.inv(self.A[s]) | |
| theta = A_inv.dot(self.b[s]) | |
| mean_reward = theta.dot(context_vector) | |
| uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector)) | |
| scores[s] = mean_reward + uncertainty | |
| return max(scores, key=scores.get) | |
| def update(self, strategy, context_vector, reward): | |
| x = context_vector.reshape(-1) | |
| if strategy not in self.A: | |
| self.A[strategy] = (self.reg * np.eye(self.d)) | |
| self.b[strategy] = np.zeros(self.d) | |
| self.A[strategy] += np.outer(x, x) | |
| self.b[strategy] += reward * x | |
| def increase_exploration(self, factor=1.2): | |
| self.alpha *= factor | |
| print(f"CONCEPT DRIFT ACTION: Increased bandit exploration alpha to {self.alpha:.2f}") | |
| class PageHinkley: | |
| def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3): | |
| self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0 | |
| def update(self, x): | |
| self.mean = self.mean * self.alpha + x * (1 - self.alpha) | |
| self.cumulative = max(0, self.cumulative + x - self.mean - self.delta) | |
| if self.cumulative > self.lambda_: | |
| self.cumulative = 0 | |
| return True | |
| return False | |
| class FirebaseManager: | |
| def __init__(self, db_signals_ref_name='signals_v20', db_strategies_ref_name='generated_strategies_v20'): | |
| self.signals_ref = None | |
| self.strategies_ref = None | |
| try: | |
| sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') | |
| if not all([sa_key_json, db_url]): | |
| print("FIREBASE MANAGER: Secrets not set. Logger disabled.") | |
| return | |
| cred = credentials.Certificate(json.loads(sa_key_json)) | |
| if not firebase_admin._apps: | |
| firebase_admin.initialize_app(cred, {'databaseURL': db_url}) | |
| self.signals_ref = db.reference(db_signals_ref_name) | |
| self.strategies_ref = db.reference(db_strategies_ref_name) | |
| print(f"FIREBASE MANAGER: Successfully connected. Signals: '{db_signals_ref_name}', Strategies: '{db_strategies_ref_name}'.") | |
| except Exception as e: | |
| print(f"FIREBASE MANAGER: CRITICAL ERROR - Failed to initialize: {e}") | |
| def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, market_regime, confidence, predictions_str): | |
| if not self.signals_ref: return | |
| try: | |
| signal_data = { | |
| "timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry), | |
| "stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()], | |
| "market_regime_at_trade": market_regime, "confidence_at_trade": confidence, | |
| "predictions_at_trade": predictions_str, "pnl": None, "reward": None, "outcome_reason": None | |
| } | |
| self.signals_ref.push(signal_data) | |
| except Exception as e: | |
| print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not write signal: {e}") | |
| def save_strategy(self, strategy_name, code_string): | |
| if not self.strategies_ref: return | |
| try: | |
| self.strategies_ref.child(strategy_name).set({"code": code_string, "created_at": time.time()}) | |
| print(f"FIREBASE MANAGER: Saved new strategy '{strategy_name}' to database.") | |
| except Exception as e: | |
| print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not save strategy: {e}") | |
| def load_all_strategies(self): | |
| if not self.strategies_ref: return {} | |
| try: | |
| return self.strategies_ref.get() or {} | |
| except Exception as e: | |
| print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not load strategies: {e}") | |
| return {} | |
| class StrategyManager: | |
| def __init__(self, situation_room, prediction_engine): | |
| self.situation_room = situation_room | |
| self.prediction_engine = prediction_engine | |
| self._strategies = self._initialize_base_strategies() | |
| def _initialize_base_strategies(self): | |
| def predictive_strategy(seq): | |
| preds_dict, preds_str = self.prediction_engine.predict_single(seq) | |
| return self.situation_room.generate_thesis(preds_dict, seq), preds_str | |
| def ema_crossover_strategy(seq): | |
| return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)" | |
| return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy} | |
| def list_strategies(self): | |
| return self._strategies | |
| def add_strategy(self, name, function_obj): | |
| print(f"STRATEGY MANAGER: Adding new strategy '{name}' to the active pool.") | |
| self._strategies[name] = function_obj | |
| def load_evolved_strategies(self, firebase_manager): | |
| print("STRATEGY MANAGER: Checking for evolved strategies in the database...") | |
| evolved_strategies = firebase_manager.load_all_strategies() | |
| count = 0 | |
| for name, data in evolved_strategies.items(): | |
| code = data.get("code") | |
| if code: | |
| try: | |
| namespace = {} | |
| exec(code, globals(), namespace) | |
| strategy_func = namespace['generated_strategy'] | |
| def wrapper(func): | |
| def inner(sequence_df): | |
| thesis = func(sequence_df) | |
| return thesis, "N/A (Evolved)" | |
| return inner | |
| self.add_strategy(name, wrapper(strategy_func)) | |
| count += 1 | |
| except Exception as e: | |
| print(f"STRATEGY MANAGER: ERROR - Failed to load evolved strategy '{name}'. Reason: {e}") | |
| print(f"STRATEGY MANAGER: Loaded {count} evolved strategies from the database.") | |
| class CognitiveCore: | |
| def __init__(self, firebase_manager): | |
| self.firebase_manager = firebase_manager | |
| def analyze_performance_history(self): | |
| print("COGNITIVE CORE: Analyzing historical performance...") | |
| if not self.firebase_manager.signals_ref: return None | |
| all_signals = self.firebase_manager.signals_ref.get() | |
| if not all_signals: | |
| print("COGNITIVE CORE: No history to analyze."); return None | |
| df = pd.DataFrame.from_dict(all_signals, orient='index') | |
| df = df.dropna(subset=['reward', 'market_regime_at_trade']) | |
| if df.empty or len(df) < 20: | |
| print(f"COGNITIVE CORE: Not enough completed trades ({len(df)}) to analyze. Need at least 20."); return None | |
| regime_performance = df.groupby('market_regime_at_trade')['reward'].mean() | |
| if regime_performance.empty: return None | |
| worst_regime = regime_performance.idxmin() | |
| worst_regime_score = regime_performance.min() | |
| if worst_regime_score >= -0.1: | |
| print(f"COGNITIVE CORE INSIGHT: Performance is acceptable. No evolution needed."); return None | |
| memo = f""" | |
| **To:** Strategy Lab | |
| **From:** Cognitive Core | |
| **Subject:** Performance Analysis & Strategic Gaps | |
| **Key Findings:** A critical weakness has been identified in the **'{worst_regime.upper()}'** market regime, with an average reward of {worst_regime_score:.2f}. | |
| **Strategic Directive:** Generate a new **mean-reversion** strategy for **'{worst_regime.upper()}'** markets. The strategy should SELL when RSI is overbought (e.g., > 70) and BUY when RSI is oversold (e.g., < 30). Use a tight stop-loss of 1.5 times the ATR. | |
| """ | |
| print(f"COGNITIVE CORE INSIGHT: Generated memo targeting poor performance in '{worst_regime}' regime.") | |
| return memo | |
| class LocalStrategyLab: | |
| def __init__(self, model_name="microsoft/Phi-3-mini-4k-instruct"): | |
| self.generator = None | |
| if not NLP_LIBRARIES_AVAILABLE: | |
| print("STRATEGY LAB: WARNING - Transformers/Torch not installed. Strategy generation disabled.") | |
| return | |
| print(f"STRATEGY LAB: Initializing local LLM: {model_name}...") | |
| try: | |
| self.generator = pipeline( | |
| "text-generation", model=model_name, torch_dtype=torch.bfloat16, device_map="auto" | |
| ) | |
| print("STRATEGY LAB: LLM Initialized successfully.") | |
| except Exception as e: | |
| print(f"STRATEGY LAB: CRITICAL ERROR - Failed to initialize LLM pipeline: {e}") | |
| def _create_prompt(self, performance_memo): | |
| return f"""You are an expert quantitative trading strategist. Your task is to design a new, syntactically correct Python trading strategy function based on a performance analysis memo. | |
| **Performance Memo:** | |
| --- | |
| {performance_memo} | |
| --- | |
| **Instructions:** | |
| 1. Write a single Python function. The function name MUST be `generated_strategy`. | |
| 2. The function must accept one argument: `sequence_df`, a pandas DataFrame. The last row (`sequence_df.iloc[-1]`) is the current timestep. | |
| 3. The function MUST return a Python dictionary with the trade thesis. | |
| 4. The dictionary keys must include: "action", "entry", "stop_loss", "take_profit", "confidence", "reasoning", and "strategy_type". | |
| 5. For "action", use "BUY", "SELL", or "NO_TRADE". | |
| 6. Base your strategy logic directly on the 'Strategic Directive' in the memo. Use standard indicators available in the DataFrame like 'RSI', 'EMA_20', and 'ATR'. | |
| 7. The value for "strategy_type" should be "Mean Reversion". | |
| Begin the Python code block now. | |
| ```python | |
| """ | |
| def generate_new_strategy_code(self, memo): | |
| if not self.generator or not memo: return None | |
| prompt = self._create_prompt(memo) | |
| outputs = self.generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.6, top_k=50, top_p=0.95) | |
| generated_text = outputs['generated_text'] | |
| try: | |
| code = generated_text.split("```python")[1].split("```").strip() | |
| print("STRATEGY LAB: Successfully generated new strategy code.") | |
| return code | |
| except IndexError: | |
| print("STRATEGY LAB: ERROR - Could not extract Python code from LLM response."); return None | |
| CONTEXT_FEATURES = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day'] | |
| REGIME_COLS = ['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING'] | |
| CONTEXT_DIMENSION = len(CONTEXT_FEATURES) + len(REGIME_COLS) | |
| class ContextVectorPreprocessor: | |
| def __init__(self): | |
| self.scaler = MinMaxScaler(feature_range=(-1, 1)) | |
| self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib' | |
| self.feature_names = CONTEXT_FEATURES | |
| self.regime_cols = REGIME_COLS | |
| def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path) | |
| def load_calibrated_scaler(self): | |
| if self.is_calibrated(): | |
| print(f"--- Loading pre-calibrated context scaler ---") | |
| self.scaler = joblib.load(self.calibrated_context_scaler_path) | |
| return True | |
| return False | |
| def calibrate(self, calibration_data_for_context): | |
| print("--- Fitting Context Scaler ---") | |
| self.scaler.fit(calibration_data_for_context[self.feature_names]) | |
| joblib.dump(self.scaler, self.calibrated_context_scaler_path) | |
| print("--- Context Scaler calibration complete ---") | |
| def build_context_vector(self, df_with_regime): | |
| if 'regime' not in df_with_regime.columns: raise ValueError("DataFrame must contain a 'regime' column.") | |
| df_copy = df_with_regime.copy() | |
| df_copy[self.feature_names] = df_copy[self.feature_names].astype(float) | |
| last_row_features = df_copy[self.feature_names].iloc[-1:].values | |
| scaled_numeric_vec = self.scaler.transform(last_row_features).flatten() | |
| last_regime = df_copy.iloc[-1]['regime'] | |
| regime_vec = np.zeros(len(self.regime_cols)) | |
| try: | |
| regime_idx = self.regime_cols.index(f"regime_{last_regime}") | |
| regime_vec[regime_idx] = 1 | |
| except ValueError: | |
| print(f"Warning: Regime '{last_regime}' not found. Setting regime vector to zeros.") | |
| return np.concatenate([scaled_numeric_vec, regime_vec]) | |
| class LiveDataStore: | |
| def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120): | |
| self.api_key = api_key; self.finbert_tokenizer = finbert_tokenizer; self.finbert_model = finbert_model | |
| self.update_interval_seconds = update_interval_seconds; self._data_lock = threading.Lock() | |
| self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "None within 24h", "time_str": "N/A"} | |
| self._stop_event = threading.Event(); self._update_thread = None; self._last_update_time = None | |
| def _fetch_and_update(self, output_size=500): | |
| start_time = time.time() | |
| try: | |
| price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size) | |
| if price_data.empty: return | |
| events_data = fetch_live_events_with_cache() | |
| features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model) | |
| with self._data_lock: self._latest_features, self._next_event_info, self._last_update_time = features, next_event_info, pd.Timestamp.now(tz='UTC') | |
| except Exception as e: print(f"LiveDataStore ERROR during update: {e}") | |
| elapsed_time = time.time() - start_time | |
| sleep_duration = max(0, self.update_interval_seconds - elapsed_time) | |
| if sleep_duration > 0: self._stop_event.wait(sleep_duration) | |
| def _update_loop(self): | |
| while not self._stop_event.is_set(): self._fetch_and_update() | |
| def start(self): | |
| if self._update_thread is None or not self._update_thread.is_alive(): | |
| print("LiveDataStore: Starting background update thread.") | |
| self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True) | |
| self._update_thread.start(); print("LiveDataStore: Performing initial data fetch."); self._fetch_and_update() | |
| def get_latest_data(self, num_bars=None): | |
| with self._data_lock: | |
| if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info | |
| return self._latest_features.copy(), self._next_event_info | |
| def get_raw_price_data(self, num_bars=None): | |
| with self._data_lock: | |
| if self._latest_features.empty: return pd.DataFrame() | |
| df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy() | |
| if num_bars: df_slice = df_slice.iloc[-num_bars:] | |
| df_slice.reset_index(inplace=True) | |
| if df_slice['Datetime'].dt.tz is None: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_localize('UTC') | |
| else: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_convert('UTC') | |
| return df_slice | |
| def download_models_from_hf(repo_id, hf_token): | |
| print("Downloading base agent model from Hugging Face Hub...") | |
| try: return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=hf_token) | |
| except Exception as e: print(f"FATAL: Failed to download base model: {e}"); raise | |
| def send_ntfy_notification(topic, trade_thesis): | |
| if not topic: return | |
| title = f"V20 Signal: {trade_thesis.get('action')} EUR/USD" | |
| message = (f"Strategy: {trade_thesis.get('strategy_type')} ({trade_thesis.get('confidence')})\n" | |
| f"Reason: {trade_thesis.get('reasoning')}\n" | |
| f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}") | |
| try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.") | |
| except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}") | |
| def evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store): | |
| if not firebase_manager.signals_ref: return | |
| now_utc = pd.Timestamp.now(tz='UTC') | |
| try: | |
| all_signals = firebase_manager.signals_ref.get(); | |
| if not all_signals: return | |
| live_price_history = live_data_store.get_raw_price_data(num_bars=288) | |
| if live_price_history.empty: return | |
| for key, signal in all_signals.items(): | |
| if signal.get('reward') is not None: continue | |
| signal_time = pd.to_datetime(signal['timestamp']) | |
| if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue | |
| entry, sl, tp, action = float(signal['entry']), float(signal['stop_loss']), float(signal['take_profit']), signal['action'] | |
| relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time] | |
| if relevant_bars.empty: continue | |
| outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] | |
| for _, bar in relevant_bars.iterrows(): | |
| if action == 'BUY': | |
| if bar['low'] <= sl: exit_price, outcome_reason = sl, "SL"; break | |
| if bar['high'] >= tp: exit_price, outcome_reason = tp, "TP"; break | |
| elif action == 'SELL': | |
| if bar['high'] >= sl: exit_price, outcome_reason = sl, "SL"; break | |
| if bar['low'] <= tp: exit_price, outcome_reason = tp, "TP"; break | |
| pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price) | |
| reward = np.clip(pnl / 0.005, -1.0, 1.0) | |
| context_vector = np.array(signal['context_vector']) | |
| bandit.update(signal['strategy'], context_vector, reward) | |
| if change_detector.update(-reward): print("! CONCEPT DRIFT DETECTED !"); bandit.increase_exploration() | |
| firebase_manager.signals_ref.child(key).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome_reason}) | |
| print(f"EVALUATOR: Updated signal from {signal_time.isoformat()}. Outcome: {outcome_reason}, PnL: {pnl:.5f}") | |
| except Exception as e: print(f"EVALUATOR V2 ERROR: {e}"); traceback.print_exc() | |
| # --- MAIN WORKER FUNCTION --- | |
| def main_worker(): | |
| print("--- [Conscious Agent V20 - Definitive Fix] Worker Thread Started ---") | |
| api_key, hf_token, ntfy_topic = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V2') | |
| HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models" | |
| if not NLP_LIBRARIES_AVAILABLE: print("CRITICAL: Transformers or PyTorch not found. Self-evolution will be disabled.") | |
| finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert') | |
| finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) | |
| prediction_engine = PredictionCoreTransformer() | |
| context_preprocessor = ContextVectorPreprocessor() | |
| if prediction_engine.is_calibrated() and context_preprocessor.is_calibrated(): | |
| prediction_engine.load_calibrated_artifacts() | |
| context_preprocessor.load_calibrated_scaler() | |
| else: | |
| print("No calibrated artifacts found. Starting one-time production calibration...") | |
| base_model_path = download_models_from_hf(HF_REPO_ID, hf_token) | |
| calibration_price_data = fetch_twelvedata_prices(api_key, output_size=5000) | |
| if calibration_price_data.empty or len(calibration_price_data) < 500: | |
| print("FATAL: Could not fetch enough data for calibration. Worker stopping."); return | |
| calibration_events_data = fetch_live_events_with_cache() | |
| calibration_features, _ = create_feature_set_for_inference(calibration_price_data, calibration_events_data, finbert_tokenizer, finbert_model) | |
| prediction_engine.calibrate(base_model_path, calibration_features.copy()) | |
| causal_engine_for_calibration = CausalReasoningNetwork(calibration_features.copy()) | |
| calibration_features_with_regime = causal_engine_for_calibration.identify_volatility_regimes() | |
| context_preprocessor.calibrate(calibration_features_with_regime.copy()) | |
| firebase_manager = FirebaseManager() | |
| live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model) | |
| live_data_store.start() | |
| situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) | |
| strategy_manager = StrategyManager(situation_room, prediction_engine) | |
| strategy_manager.load_evolved_strategies(firebase_manager) | |
| bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) | |
| change_detector = PageHinkley() | |
| cognitive_core = CognitiveCore(firebase_manager) | |
| strategy_lab = LocalStrategyLab() | |
| last_evolution_check = time.time() | |
| EVOLUTION_CYCLE_SECONDS = 86400 | |
| print("--- WORKER V20: Initialization Complete. Starting main adaptive loop. ---") | |
| while True: | |
| try: | |
| print(f"WORKER V20: [{pd.Timestamp.now(tz='UTC')}] Waking up...") | |
| features, next_event_info = live_data_store.get_latest_data() | |
| if features.empty or len(features) < prediction_engine.sequence_length: | |
| print("WORKER V20: Not enough data. Waiting..."); time.sleep(300); continue | |
| causal_engine = CausalReasoningNetwork(features.copy()) | |
| features_with_regime = causal_engine.identify_volatility_regimes() | |
| input_sequence = features_with_regime.iloc[-prediction_engine.sequence_length:] | |
| ctx_vec = context_preprocessor.build_context_vector(input_sequence) | |
| chosen_strategy_name = bandit.select(ctx_vec) | |
| trade_thesis, preds_str = strategy_manager.list_strategies()[chosen_strategy_name](input_sequence) | |
| current_regime = input_sequence.iloc[-1]['regime'] | |
| is_tradeable = MarketRegimeFilter().should_trade(current_regime, trade_thesis) | |
| final_action = trade_thesis['action'] if is_tradeable else "NO TRADE" | |
| if final_action in ["BUY", "SELL"]: | |
| ts = pd.Timestamp.now(tz='UTC').isoformat() | |
| firebase_manager.log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'], | |
| trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec, | |
| current_regime, trade_thesis['confidence'], preds_str) | |
| send_ntfy_notification(ntfy_topic, trade_thesis) | |
| evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store) | |
| final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}'" | |
| if trade_thesis['action'] != 'NO TRADE': | |
| final_reasoning += f" -> {'✅ EXECUTABLE.' if is_tradeable else f'❌ REJECTED by Regime Filter for {current_regime}.'}" | |
| status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": current_regime, "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info['time_str']})"} | |
| with open('status.json', 'w') as f: json.dump(status, f) | |
| print(f"WORKER V20: Analysis complete. Signal: {final_action}. Sleeping.") | |
| if (time.time() - last_evolution_check) > EVOLUTION_CYCLE_SECONDS: | |
| print("\n--- INITIATING NIGHTLY SELF-EVOLUTION CYCLE ---") | |
| memo = cognitive_core.analyze_performance_history() | |
| if memo: | |
| new_code = strategy_lab.generate_new_strategy_code(memo) | |
| if new_code: | |
| new_strategy_name = f"evolved_strategy_{int(time.time())}" | |
| firebase_manager.save_strategy(new_strategy_name, new_code) | |
| print("EVOLUTION: New strategy saved. It will be active after the next restart.") | |
| last_evolution_check = time.time() | |
| print("--- SELF-EVOLUTION CYCLE COMPLETE ---\n") | |
| time.sleep(300) | |
| except Exception as e: | |
| print(f"WORKER V20 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60) | |
| # --- GRADIO DASHBOARD --- | |
| def get_latest_status(): | |
| if not os.path.exists('status.json'): return "Agent is starting...", "", "", "", "", "", "" | |
| with open('status.json', 'r') as f: status = json.load(f) | |
| return (f"Status from worker at: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A')) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🧠 V20 Conscious Agent (Definitive Fix)") | |
| gr.Markdown("This agent uses a reliable calibration flow and a persistent, database-driven memory for self-evolution.") | |
| all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V2', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]) | |
| secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING. Check all required secrets." | |
| gr.Markdown(f"**Secrets Status:** {secret_status}") | |
| refresh_btn = gr.Button("Refresh Status", variant="primary") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| gr.Markdown("## Agent's Last Analysis") | |
| with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime") | |
| with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event") | |
| action_output = gr.Textbox(label="Final Signal / Action") | |
| reasoning_output = gr.Textbox(label="Full Reasoning", lines=3) | |
| refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output]) | |
| if __name__ == "__main__": | |
| if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN']]): | |
| print("FATAL: Core secrets (API Key, HF Token) are missing. Worker cannot start.") | |
| else: | |
| worker_thread = threading.Thread(target=main_worker, daemon=True) | |
| worker_thread.start() | |
| demo.launch() |