Spaces:
Sleeping
Sleeping
| # app.py (Version FINAL - All Bugs Fixed) | |
| # This is the complete, final, and fully-functional code, built on a stable Keras 2 architecture. | |
| # --- Core Libraries & Setup --- | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| import joblib | |
| import json | |
| import os | |
| import gradio as gr | |
| import requests | |
| import time | |
| from datetime import datetime | |
| import pytz | |
| import threading | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| import firebase_admin | |
| from firebase_admin import credentials, db | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.cluster import DBSCAN | |
| from groq import Groq | |
| # --- Environment Configuration --- | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Suppress verbose logs | |
| warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| # --- ML/DL & NLP Libraries (STABLE TENSORFLOW.KERAS) --- | |
| import tensorflow as tf | |
| from sklearn.preprocessing import MinMaxScaler | |
| from tensorflow.keras.models import Model, load_model | |
| from tensorflow.keras.utils import Sequence | |
| from transformers import BertTokenizer, TFBertModel | |
| # --- Live Data & Cache Configuration --- | |
| from twelvedata import TDClient | |
| EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" | |
| CACHE_DURATION_SECONDS = 600 | |
| _EVENT_CACHE = {"data": None, "timestamp": 0} | |
| # ============================================================================== | |
| # --- V17 EVOLUTION ENGINE --- | |
| # ============================================================================== | |
| class ConceptMiner: | |
| def __init__(self, log_path='signals_v17', min_trades_for_analysis=20): | |
| self.log_path = log_path; self.min_trades = min_trades_for_analysis; self.analyzed_trade_keys = set() | |
| def run_analysis(self): | |
| print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing performance from '{self.log_path}'.") | |
| ref = db.reference(self.log_path); all_data = ref.get() | |
| if not all_data: print("EVOLUTION_ENGINE: [ConceptMiner] No trade logs found yet."); return [] | |
| new_trades = {k: v for k, v in all_data.items() if v.get('reward') is not None and k not in self.analyzed_trade_keys} | |
| if len(new_trades) < self.min_trades: print(f"EVOLUTION_ENGINE: [ConceptMiner] Not enough new trades ({len(new_trades)}/{self.min_trades})."); return [] | |
| print(f"EVOLUTION_ENGINE: [ConceptMiner] Analyzing {len(new_trades)} new trades.") | |
| df = pd.DataFrame.from_dict(new_trades, orient='index') | |
| context_df = pd.DataFrame(df['context_vector'].tolist(), index=df.index) | |
| feature_names = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day', 'regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING'] | |
| context_df.columns = feature_names[:len(context_df.columns)] | |
| full_df = pd.concat([df[['reward', 'strategy']], context_df], axis=1) | |
| features = full_df.drop(columns=['reward', 'strategy']).values | |
| scaled_features = StandardScaler().fit_transform(features) | |
| dbscan = DBSCAN(eps=1.0, min_samples=4); clusters = dbscan.fit_predict(scaled_features) | |
| full_df['cluster'] = clusters | |
| significant_clusters = full_df[full_df['cluster'] != -1] | |
| if significant_clusters.empty: self.analyzed_trade_keys.update(new_trades.keys()); return [] | |
| losing_clusters = significant_clusters.groupby('cluster')['reward'].mean() | |
| losing_clusters = losing_clusters[losing_clusters < -0.25] | |
| patterns = [] | |
| for cid in losing_clusters.index: | |
| c_data = significant_clusters[significant_clusters['cluster'] == cid] | |
| avg_ctx = c_data.drop(columns=['reward', 'strategy', 'cluster']).mean() | |
| patterns.append({"source_cluster_id": int(cid), "trade_count": len(c_data), "average_reward": c_data['reward'].mean(), "dominant_strategy": c_data['strategy'].mode()[0], "dominant_regime": avg_ctx.filter(like='regime').idxmax().split('_')[1], "avg_market_conditions": avg_ctx.to_dict()}) | |
| print(f"EVOLUTION_ENGINE: [ConceptMiner] π₯ Discovered new weakness (Cluster #{cid}).") | |
| self.analyzed_trade_keys.update(new_trades.keys()); return patterns | |
| class ConceptNamer: | |
| def __init__(self, api_key): self.client = Groq(api_key=api_key) | |
| def name_new_concept(self, pattern): | |
| print(f"EVOLUTION_ENGINE: [ConceptNamer] Naming weakness from Cluster #{pattern['source_cluster_id']}...") | |
| system_prompt = "You are an expert market strategist AI. Your job is to analyze a data pattern representing a trading agent's weakness and turn it into a memorable, actionable 'Concept'. The output must be a clean JSON object and nothing else." | |
| atr_threshold = round(pattern['avg_market_conditions']['ATR'] + 0.1, 2) | |
| human_prompt = f""" | |
| Analyze this data pattern where a trading agent consistently loses money. Create a new 'Concept' for it. | |
| **Pattern Fingerprint:** | |
| - **Dominant Market Regime:** {pattern['dominant_regime']} | |
| - **The Strategy That Fails Here:** {pattern['dominant_strategy']} | |
| - **Average Reward (from -1 to 1):** {pattern['average_reward']:.2f} | |
| - **Key Market Conditions (scaled values):** | |
| - Volatility (ATR): {pattern['avg_market_conditions']['ATR']:.4f} | |
| **Your Task:** | |
| Generate a JSON object with the following structure. | |
| {{ | |
| "concept_name": "A short, memorable name for this market trap (e.g., 'Momentum Mirage', 'Volatility Void').", | |
| "description_agent": "A concise, one-sentence explanation of this trap for the agent's logs.", | |
| "description_human": "A human-readable explanation of why this is a trap for the agent.", | |
| "actionable_rule": {{ | |
| "type": "veto_trade", | |
| "conditions": [ | |
| {{"variable": "regime", "operator": "==", "value": "{pattern['dominant_regime']}"}}, | |
| {{"variable": "ATR", "operator": "<", "value": {atr_threshold} }} | |
| ], | |
| "target_strategy": "{pattern['dominant_strategy']}" | |
| }} | |
| }} | |
| """ | |
| try: | |
| chat_completion = self.client.chat.completions.create(messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": human_prompt}], model="llama3-8b-8192", temperature=0.6) | |
| response_text = chat_completion.choices[0].message.content | |
| if response_text.startswith("```json"): response_text = response_text[7:-3].strip() | |
| return json.loads(response_text) | |
| except Exception as e: print(f"EVOLUTION_ENGINE: [ConceptNamer] β Groq API Error: {e}"); return None | |
| class ConceptVetoSystem: | |
| def __init__(self): self.concepts = {}; self.lock = threading.Lock(); self.load_concepts_from_firebase() | |
| def load_concepts_from_firebase(self): | |
| with self.lock: | |
| print("--- V17: (Re)Loading concepts from Firebase... ---") | |
| try: | |
| if not firebase_admin._apps: return | |
| ref = db.reference('concepts/'); concepts_data = ref.get() | |
| if concepts_data: self.concepts = concepts_data; print(f"β V17: Loaded {len(self.concepts)} concepts.") | |
| else: print("π‘ V17: No concepts found in Firebase yet.") | |
| except Exception as e: print(f"β V17: ERROR - Could not load concepts: {e}") | |
| def check_for_veto(self, current_context, chosen_strategy_name): | |
| with self.lock: | |
| for concept_id, concept in self.concepts.items(): | |
| rule = concept.get('actionable_rule', {}) | |
| if rule.get('type') != 'veto_trade' or rule.get('target_strategy') != chosen_strategy_name: continue | |
| conditions_met = True | |
| for condition in rule.get('conditions', []): | |
| variable, operator, value = condition['variable'], condition['operator'], condition['value'] | |
| context_value = current_context.get(variable) | |
| if context_value is None: conditions_met = False; break | |
| if operator == '==' and not context_value == value: conditions_met = False; break | |
| if operator == '<' and not context_value < value: conditions_met = False; break | |
| if conditions_met: print(f"π¨ V17 VETO: Conditions for '{concept.get('concept_name')}' met."); return True, concept.get('concept_name') | |
| return False, None | |
| # ============================================================================== | |
| # --- CORE AGENT LOGIC (FIXED AND READABLE) --- | |
| # ============================================================================== | |
| class CausalReasoningNetwork: | |
| def __init__(self, processed_data): | |
| self.data = processed_data.copy() | |
| def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): | |
| atr = self.data[volatility_indicator] | |
| low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66) | |
| ema_slope = self.data[trend_indicator].diff(periods=3) | |
| regimes = [] | |
| for i in range(len(self.data)): | |
| atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 | |
| if atr_val > high_vol_threshold: regimes.append('TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT') | |
| elif atr_val < low_vol_threshold: regimes.append('RANGING') | |
| else: regimes.append('CHOPPY') | |
| self.data['regime'] = regimes | |
| return self.data | |
| class RuleBasedSituationRoom: | |
| def __init__(self, params): | |
| self.params = params | |
| def generate_thesis(self, predictions, sequence_df): | |
| latest_data = sequence_df.iloc[-1]; current_price = latest_data['close'] | |
| if not predictions or any(k not in predictions for k in ['5m', '15m', '1h']): | |
| action = "BUY" if current_price > latest_data['EMA_20'] else "SELL" | |
| confidence, strategy, reasoning = "LOW", "Trend Following", f"Simple EMA Crossover ({action})." | |
| else: | |
| dir_5m = "BUY" if predictions['5m'] > current_price else "SELL" | |
| dir_15m = "BUY" if predictions['15m'] > current_price else "SELL" | |
| dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" | |
| if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following" | |
| elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp" | |
| elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following" | |
| else: action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play" | |
| if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning} | |
| atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 | |
| sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) | |
| if confidence == "MEDIUM": tp_mult *= 0.75 | |
| elif confidence == "LOW": tp_mult *= 0.5 | |
| if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) | |
| else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) | |
| return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy} | |
| class MarketRegimeFilter: | |
| def __init__(self): | |
| self.allowed_strategies = {'TRENDING': ['Trend Following'], 'BREAKOUT': ['Trend Following', 'Scalp'], 'CHOPPY': ['Scalp'], 'RANGING': []} | |
| def should_trade(self, current_regime, trade_thesis): | |
| if trade_thesis['action'] == 'NO_TRADE': return False | |
| return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, []) | |
| class PredictionCoreTransformer: | |
| def __init__(self, sequence_length=48): | |
| self.scaler, self.model, self.sequence_length, self.feature_names = None, None, sequence_length, None | |
| self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path = 'calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json' | |
| def is_calibrated(self): | |
| return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) | |
| def load_calibrated_artifacts(self): | |
| print("--- Loading pre-calibrated artifacts ---") | |
| self.model = load_model(self.calibrated_model_path) | |
| self.scaler = joblib.load(self.calibrated_scaler_path) | |
| with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) | |
| print("--- Instant startup successful ---"); return True | |
| def calibrate(self, base_model_path, calibration_data): | |
| print("--- STARTING ONE-TIME AGENT CALIBRATION ---") | |
| self.feature_names = [col for col in calibration_data.columns if col not in ['regime','close','open','high','low','event','impact','title','detail','country','date','currency','actual','forecast','previous']] | |
| self.scaler = MinMaxScaler(feature_range=(0,1)); self.scaler.fit(calibration_data[self.feature_names]) | |
| self.model = load_model(base_model_path); print("Pre-trained model loaded.") | |
| class CalibrationGenerator(Sequence): | |
| def __init__(self, data, scaler, feature_names, seq_len): | |
| self.data, self.scaler, self.feature_names, self.seq_len = data.copy(), scaler, feature_names, seq_len | |
| self.data['target_5m']=self.data['open'].shift(-1); self.data['target_15m']=self.data['open'].shift(-3); self.data['target_1h']=self.data['open'].shift(-12); self.data.dropna(inplace=True) | |
| self.features_df, self.targets_df = self.data[self.feature_names], self.data[['target_5m', 'target_15m', 'target_1h']] | |
| self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len | |
| def __len__(self): return self.n_samples | |
| def __getitem__(self, idx): | |
| seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names)) | |
| y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']] | |
| return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])} | |
| calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length) | |
| self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output':'mean_squared_error', '15m_output':'mean_squared_error', '1h_output':'mean_squared_error'}) | |
| self.model.fit(calibration_generator, epochs=3, verbose=1) | |
| self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path) | |
| with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f) | |
| print("--- AGENT RE-CALIBRATION COMPLETE ---"); return True | |
| def predict_single(self, input_sequence): | |
| if not self.scaler: raise RuntimeError("Agent not calibrated.") | |
| features = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features) | |
| reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)) | |
| predictions = self.model.predict(reshaped, verbose=0) | |
| pred_5m = predictions['5m_output'] | |
| pred_15m = predictions['15m_output'] | |
| pred_1h = predictions['1h_output'] | |
| preds_dict = {"5m": pred_5m, "15m": pred_15m, "1h": pred_1h} | |
| preds_str = f"5m: {pred_5m:.5f} | 15m: {pred_15m:.5f} | 1h: {pred_1h:.5f}" | |
| return preds_dict, preds_str | |
| class LinUCBBandit: | |
| def __init__(self, strategies, d, alpha=1.0, regularization=1.0): | |
| self.strategies, self.d, self.alpha, self.reg = list(strategies), d, alpha, regularization | |
| self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies}; self.b = {s: np.zeros(self.d) for s in self.strategies} | |
| def select(self, context_vector): | |
| scores = {} | |
| for s in self.strategies: | |
| A_inv = np.linalg.inv(self.A[s]); theta = A_inv.dot(self.b[s]) | |
| mean_reward = theta.dot(context_vector); uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector)) | |
| scores[s] = mean_reward + uncertainty | |
| return max(scores, key=scores.get) | |
| def update(self, strategy, context_vector, reward): | |
| x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x | |
| def increase_exploration(self): | |
| self.alpha *= 2.0 | |
| print(f"Bandit: Increased exploration alpha to {self.alpha}") | |
| class RTDBLoggerV2: | |
| def __init__(self, db_ref_name='signals_v2'): | |
| self.ref = None | |
| try: | |
| sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') | |
| cred = credentials.Certificate(json.loads(sa_key_json)) | |
| if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url}) | |
| self.ref = db.reference(db_ref_name) | |
| except Exception as e: print(f"RTDB Logger Failed: {e}") | |
| def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector): | |
| if self.ref: self.ref.push({"timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry), "stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()], "pnl": None, "reward": None, "outcome_reason": None}) | |
| class PageHinkley: | |
| def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3): | |
| self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0 | |
| def update(self, x): | |
| self.mean = self.mean * self.alpha + x * (1 - self.alpha); self.cumulative = max(0, self.cumulative + x - self.mean - self.delta) | |
| if self.cumulative > self.lambda_: self.cumulative = 0; return True | |
| return False | |
| class StrategyManager: | |
| def __init__(self, situation_room, prediction_engine): | |
| self.situation_room, self.prediction_engine = situation_room, prediction_engine | |
| def list_strategies(self): | |
| def predictive_strategy(seq): | |
| preds, s = self.prediction_engine.predict_single(seq) | |
| return self.situation_room.generate_thesis(preds, seq), s | |
| def ema_crossover_strategy(seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA)" | |
| return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy} | |
| CONTEXT_FEATURES=['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day']; REGIME_COLS=['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING']; CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS) | |
| class ContextVectorPreprocessor: | |
| def __init__(self): | |
| self.scaler = MinMaxScaler(feature_range=(-1, 1)); self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib' | |
| def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path) | |
| def load_calibrated_scaler(self): self.scaler = joblib.load(self.calibrated_context_scaler_path); return True | |
| def calibrate(self, data): self.scaler.fit(data[CONTEXT_FEATURES]); joblib.dump(self.scaler, self.calibrated_context_scaler_path) | |
| def build_context_vector(self, df): | |
| df_copy = df.copy(); df_copy[CONTEXT_FEATURES] = df_copy[CONTEXT_FEATURES].astype(float) | |
| last_row = df_copy[CONTEXT_FEATURES].iloc[-1:].values; scaled_vec = self.scaler.transform(last_row).flatten() | |
| last_regime = df_copy.iloc[-1]['regime']; regime_vec = np.zeros(len(REGIME_COLS)) | |
| try: regime_vec[REGIME_COLS.index(f"regime_{last_regime}")] = 1 | |
| except ValueError: pass | |
| return np.concatenate([scaled_vec, regime_vec]) | |
| class LiveDataStore: | |
| def __init__(self, api_key, tokenizer, model, interval=120): | |
| self.api_key, self.tokenizer, self.model, self.interval = api_key, tokenizer, model, interval | |
| self._data_lock = threading.Lock(); self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "N/A", "time_str": "N/A"} | |
| self._stop = threading.Event(); self._thread = None | |
| def _fetch_and_update(self): | |
| try: | |
| price_data = fetch_twelvedata_prices(self.api_key, output_size=500) | |
| if not price_data.empty: | |
| features, next_event = create_feature_set_for_inference(price_data, fetch_live_events_with_cache(), self.tokenizer, self.model) | |
| with self._data_lock: self._latest_features, self._next_event_info = features, next_event | |
| except Exception as e: print(f"LiveDataStore Error: {e}") | |
| def _update_loop(self): | |
| while not self._stop.is_set(): self._fetch_and_update(); self._stop.wait(self.interval) | |
| def start(self): | |
| self._stop.clear(); self._thread = threading.Thread(target=self._update_loop, daemon=True); self._thread.start(); self._fetch_and_update() | |
| def get_latest_data(self, num_bars=None): | |
| with self._data_lock: return (self._latest_features.iloc[-num_bars:].copy() if num_bars and not self._latest_features.empty else self._latest_features.copy()), self._next_event_info | |
| def get_raw_price_data(self, num_bars=None): | |
| with self._data_lock: | |
| if self._latest_features.empty: return pd.DataFrame() | |
| df = self._latest_features[['open', 'high', 'low', 'close']].copy() | |
| if num_bars: df = df.iloc[-num_bars:] | |
| df.reset_index(inplace=True); df['Datetime'] = df['Datetime'].dt.tz_localize('UTC') if df['Datetime'].dt.tz is None else df['Datetime'].dt.tz_convert('UTC'); return df | |
| def fetch_live_events_with_cache(): | |
| if _EVENT_CACHE.get("data") and (time.time() - _EVENT_CACHE.get("timestamp", 0) < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"] | |
| try: | |
| r = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V17-Agent/1.0"}, timeout=10); r.raise_for_status(); data = r.json() | |
| _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, time.time(); return data | |
| except requests.RequestException: return _EVENT_CACHE.get("data", []) | |
| def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): | |
| try: | |
| ts = TDClient(apikey=api_key).time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") | |
| df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df | |
| except Exception: return pd.DataFrame() | |
| def create_feature_set_for_inference(price_df, events_json, tokenizer, model): | |
| price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) | |
| price_features = price_features.tz_localize('UTC') if price_features.index.tz is None else price_features.tz_convert('UTC') | |
| price_features.rename(columns={'close': 'Price', 'open': 'Open', 'high': 'High', 'low': 'Low'}, inplace=True); delta = price_features['Price'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean() | |
| high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()) | |
| tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean() | |
| price_features.rename(columns={'Price': 'close', 'Open': 'open', 'High': 'high', 'Low': 'low'}, inplace=True); events = pd.DataFrame(events_json) | |
| next_event_info = {"title": "None within 24h", "time_str": "N/A"}; processed_events = pd.DataFrame() | |
| if not events.empty and 'date' in events.columns: | |
| events = events[events['country'].isin(['USD', 'EUR'])].copy(); events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) | |
| inputs = tokenizer(events['title'].fillna('').tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = model(inputs).last_hidden_state[:, 0, :].numpy() | |
| # FIXED BUG #1: Use embeddings.shape | |
| processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape)], index=events.index)], axis=1) | |
| merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) | |
| all_high_impact = events[events['impact'] == 'High'] if 'impact' in events.columns and not events.empty else pd.DataFrame() | |
| if not all_high_impact.empty: | |
| upcoming = all_high_impact[all_high_impact.index > merged_data.index[-1]] | |
| if not upcoming.empty: | |
| # FIXED BUG #2: Use .iloc to select the first row as a Series | |
| next_event = upcoming.iloc | |
| time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0 | |
| merged_data['time_to_event'] = time_to_next | |
| next_event_info = {"title": f"{next_event['country']} {next_event['title']}", "time_str": f"in {time_to_next:.2f}h"} | |
| else: merged_data['time_to_event'] = 9999 | |
| # FIXED BUG #3: Convert pandas index to numpy array before multi-dimensional indexing | |
| df_idx = merged_data.index.to_numpy().astype(np.int64) // 10**9 | |
| evt_times = all_high_impact.index.to_numpy().astype(np.int64) // 10**9 | |
| time_diffs = df_idx[:, None] - evt_times[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 | |
| else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 | |
| merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek | |
| finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; merged_data[finbert_cols] = merged_data[finbert_cols].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True); return merged_data, next_event_info | |
| def evaluate_pending_signals_v2(perf_logger, bandit, change_detector, live_data_store): | |
| if not perf_logger.ref: return; now_utc = pd.Timestamp.now(tz='UTC') | |
| try: | |
| all_signals = perf_logger.ref.get() | |
| if not all_signals: return; live_prices = live_data_store.get_raw_price_data(num_bars=288) | |
| if live_prices.empty: return | |
| for k, s in all_signals.items(): | |
| if s.get('reward') is not None: continue; sig_time = pd.to_datetime(s['timestamp']) | |
| if now_utc < (sig_time + pd.Timedelta(minutes=5)): continue | |
| entry, sl, tp, action = float(s['entry']), float(s['stop_loss']), float(s['take_profit']), s['action']; relevant_bars = live_prices[live_prices['Datetime'] > sig_time] | |
| if relevant_bars.empty: continue; outcome, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] | |
| for _, bar in relevant_bars.iterrows(): | |
| if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome = sl, "SL"; break | |
| if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome = tp, "TP"; break | |
| pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price); reward = np.clip(pnl / 0.005, -1.0, 1.0); ctx = np.array(s['context_vector']); bandit.update(s['strategy'], ctx, reward) | |
| if change_detector.update(-reward): bandit.increase_exploration() | |
| perf_logger.ref.child(k).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome}) | |
| except Exception as e: print(f"Evaluator Error: {e}") | |
| def send_ntfy_notification(topic, thesis): | |
| if topic: | |
| message = (f"Strategy: {thesis.get('strategy_type')} ({thesis.get('confidence')})\n" | |
| f"Reason: {thesis.get('reasoning')}\n" | |
| f"Entry: {thesis.get('entry')} | SL: {thesis.get('stop_loss')} | TP: {thesis.get('take_profit')}") | |
| headers = {"Title": f"V17 Signal: {thesis.get('action')} EUR/USD"} | |
| try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers=headers) | |
| except requests.RequestException: pass | |
| def download_models_from_hf(repo_id, token): | |
| return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=token) | |
| # ============================================================================== | |
| # --- MAIN WORKER & ORCHESTRATOR --- | |
| # ============================================================================== | |
| def main_worker(): | |
| print("--- [Persistent Agent V17 - The Evolution Agent] Worker Thread Started ---") | |
| api_key, hf_token, ntfy_topic, groq_key = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V17'), os.environ.get('GROQ_API_KEY') | |
| if not groq_key: print("FATAL: GROQ_API_KEY not set."); return | |
| tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert') | |
| model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) | |
| pred_engine = PredictionCoreTransformer() | |
| ctx_preprocessor = ContextVectorPreprocessor() | |
| live_store = LiveDataStore(api_key, tokenizer, model) | |
| live_store.start() | |
| if not pred_engine.is_calibrated() or not ctx_preprocessor.is_calibrated(): | |
| print("Calibrating agent for the first time...") | |
| base_model_path = download_models_from_hf("Badumetsibb/conscious-trading-agent-models", hf_token) | |
| calib_prices = fetch_twelvedata_prices(api_key, output_size=5000) | |
| if len(calib_prices) < 500: print("FATAL: Not enough data for calibration."); return | |
| calib_features, _ = create_feature_set_for_inference(calib_prices, fetch_live_events_with_cache(), tokenizer, model) | |
| pred_engine.calibrate(base_model_path, calib_features.copy()) | |
| calib_with_regime = CausalReasoningNetwork(calib_features.copy()).identify_volatility_regimes() | |
| ctx_preprocessor.calibrate(calib_with_regime) | |
| else: | |
| pred_engine.load_calibrated_artifacts() | |
| ctx_preprocessor.load_calibrated_scaler() | |
| miner = ConceptMiner(log_path='signals_v17') | |
| namer = ConceptNamer(api_key=groq_key) | |
| veto_system = ConceptVetoSystem() | |
| logger = RTDBLoggerV2(db_ref_name='signals_v17') | |
| sit_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) | |
| regime_filter = MarketRegimeFilter() | |
| strat_manager = StrategyManager(sit_room, pred_engine) | |
| bandit = LinUCBBandit(strat_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) | |
| change_detector = PageHinkley() | |
| def evolution_cycle(): | |
| time.sleep(1800) | |
| while True: | |
| try: | |
| new_patterns = miner.run_analysis() | |
| if new_patterns: | |
| for p in new_patterns: | |
| if all(c.get('source_cluster_id') != p['source_cluster_id'] for c in veto_system.concepts.values()): | |
| new_concept = namer.name_new_concept(p) | |
| if new_concept: | |
| cid = new_concept['concept_name'].lower().replace(' ', '_') + f"_c{p['source_cluster_id']}" | |
| new_concept['source_cluster_id'] = p['source_cluster_id'] | |
| db.reference(f"concepts/{cid}").set(new_concept) | |
| print(f"EVOLUTION_ENGINE: β New concept '{new_concept['concept_name']}' learned!") | |
| veto_system.load_concepts_from_firebase() | |
| except Exception as e: print(f"Evolution Cycle Error: {e}") | |
| time.sleep(7200) | |
| threading.Thread(target=evolution_cycle, daemon=True).start() | |
| print("--- WORKER V17: Initialization Complete. ---") | |
| while True: | |
| try: | |
| features, next_event = live_store.get_latest_data(num_bars=pred_engine.sequence_length) | |
| if features.empty or len(features) < pred_engine.sequence_length: | |
| time.sleep(60); continue | |
| features_with_regime = CausalReasoningNetwork(features.copy()).identify_volatility_regimes() | |
| input_seq = features_with_regime.iloc[-pred_engine.sequence_length:] | |
| latest_ctx = input_seq.iloc[-1] | |
| ctx_vec = ctx_preprocessor.build_context_vector(input_seq) | |
| chosen_strat = bandit.select(ctx_vec) | |
| thesis, preds_str = strat_manager.list_strategies()[chosen_strat](input_seq) | |
| is_vetoed, veto_reason = veto_system.check_for_veto(latest_ctx, chosen_strat) | |
| is_tradeable = regime_filter.should_trade(latest_ctx['regime'], thesis) | |
| final_action, final_reasoning = thesis['action'], f"Bandit chose '{chosen_strat}'. Thesis: '{thesis['reasoning']}'" | |
| if is_vetoed: | |
| final_action, final_reasoning = "NO TRADE", final_reasoning + f" -> β VETOED by Concept: '{veto_reason}'." | |
| elif not is_tradeable and final_action != "NO TRADE": | |
| final_action, final_reasoning = "NO TRADE", final_reasoning + " -> β REJECTED by Regime Filter." | |
| elif final_action != "NO TRADE": | |
| final_reasoning += " -> β EXECUTABLE." | |
| if final_action in ["BUY", "SELL"]: | |
| ts = pd.Timestamp.now(tz='UTC').isoformat() | |
| logger.log_signal(ts, chosen_strat, final_action, thesis['entry'], thesis['stop_loss'], thesis['take_profit'], ctx_vec) | |
| send_ntfy_notification(ntfy_topic, thesis) | |
| evaluate_pending_signals_v2(logger, bandit, change_detector, live_store) | |
| status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{latest_ctx['close']:.5f}", "market_regime": latest_ctx['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event['title']} ({next_event['time_str']})"} | |
| with open('status_v17.json', 'w') as f: json.dump(status, f) | |
| print(f"WORKER V17: Cycle complete. Signal: {final_action}. Sleeping.") | |
| time.sleep(300) | |
| except Exception as e: | |
| print(f"Main Loop Error: {e}"); time.sleep(60) | |
| # ============================================================================== | |
| # --- GRADIO DASHBOARD AND STARTUP --- | |
| # ============================================================================== | |
| def get_latest_status_v17(): | |
| if not os.path.exists('status_v17.json'): | |
| return "Initializing...", "N/A", "N/A", "N/A", "Waiting for first cycle.", "N/A", "N/A" | |
| try: | |
| with open('status_v17.json', 'r') as f: s = json.load(f) | |
| return (f"Status at: {s.get('last_checked', 'N/A')}", s.get('market_price', 'N/A'), s.get('market_regime', 'N/A'), s.get('signal', 'N/A'), s.get('reasoning', 'N/A'), s.get('predictions', 'N/A'), s.get('next_event', 'N/A')) | |
| except (json.JSONDecodeError, IOError): | |
| return "Error reading status.", "N/A", "N/A", "N/A", "File may be in use.", "N/A", "N/A" | |
| if __name__ == "__main__": | |
| required_secrets = ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'GROQ_API_KEY', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL', 'NTFY_TOPIC_V17'] | |
| if not all(os.environ.get(k) for k in required_secrets): | |
| print(f"FATAL: Missing secrets. Please set: {required_secrets}") | |
| exit() | |
| try: | |
| sa_key = json.loads(os.environ.get('FIRESTORE_SA_KEY')) | |
| cred = credentials.Certificate(sa_key) | |
| if not firebase_admin._apps: | |
| firebase_admin.initialize_app(cred, {'databaseURL': os.environ.get('FIREBASE_DB_URL')}) | |
| print("β Firebase connection established.") | |
| except Exception as e: | |
| print(f"FATAL: Firebase init failed: {e}") | |
| exit() | |
| worker_thread = threading.Thread(target=main_worker, daemon=True) | |
| worker_thread.start() | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π§ V17 Evolution Agent") | |
| gr.Markdown("**Secrets Status:** β All required secrets appear to be set.") | |
| refresh_btn = gr.Button("Refresh Status", variant="primary") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| gr.Markdown("## Agent's Last Analysis") | |
| with gr.Row(): | |
| price_output = gr.Textbox(label="Last Market Price") | |
| regime_output = gr.Textbox(label="Last Market Regime") | |
| with gr.Row(): | |
| predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)") | |
| event_output = gr.Textbox(label="Next High-Impact Event") | |
| action_output = gr.Textbox(label="Final Signal / Action") | |
| reasoning_output = gr.Textbox(label="Full Reasoning", lines=3) | |
| refresh_btn.click(fn=get_latest_status_v17, inputs=[], outputs=[ | |
| status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output | |
| ]) | |
| demo.launch() |