Spaces:
Sleeping
Sleeping
| # app_v23_deep_mind.py - An RMRL agent with a persistent, deep-reasoning mind. | |
| # --- Core Libraries --- | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| import joblib | |
| import json | |
| import os | |
| import gradio as gr | |
| import requests | |
| import time | |
| from datetime import datetime, timezone, timedelta | |
| import pytz | |
| import threading | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| import firebase_admin | |
| from firebase_admin import credentials, db | |
| from collections import defaultdict | |
| import traceback | |
| import itertools | |
| # --- Environment and Dependencies --- | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') | |
| # --- ML & DL Libraries --- | |
| import tensorflow as tf | |
| from sklearn.preprocessing import MinMaxScaler | |
| from tensorflow.keras.models import Model, load_model | |
| from tensorflow.keras.utils import Sequence | |
| from transformers import BertTokenizer, TFBertModel | |
| # --- Live Data Fetching --- | |
| from twelvedata import TDClient | |
| EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" | |
| CACHE_DURATION_SECONDS = 600 | |
| _EVENT_CACHE = {"data": None, "timestamp": 0} | |
| # --- UTILITY FUNCTIONS --- | |
| def fetch_live_events_with_cache(): | |
| current_time = time.time() | |
| if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"] | |
| try: | |
| response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V23-Agent/1.0"}, timeout=10) | |
| response.raise_for_status(); data = response.json(); _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time; return data | |
| except requests.RequestException as e: print(f"Error fetching event data: {e}"); return _EVENT_CACHE.get("data", []) | |
| def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): | |
| try: | |
| td = TDClient(apikey=api_key); ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") | |
| df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df | |
| except Exception as e: print(f"Error fetching price data: {e}"); return pd.DataFrame() | |
| def send_ntfy_notification(topic, trade_thesis): | |
| if not topic: return | |
| title = f"V23 Signal: {trade_thesis.get('action')} EUR/USD"; message = (f"Strategy: {trade_thesis.get('strategy')} ({trade_thesis.get('confidence')})\n" f"Reason: {trade_thesis.get('reasoning')}\n" f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}") | |
| try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.") | |
| except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}") | |
| def sanitize_for_json(obj): | |
| """Recursively converts numpy types to native Python types for JSON serialization.""" | |
| if isinstance(obj, dict): | |
| return {k: sanitize_for_json(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [sanitize_for_json(elem) for elem in obj] | |
| elif isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return obj | |
| def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model): | |
| price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) | |
| if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer') | |
| else: price_features = price_features.tz_convert('UTC') | |
| price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True) | |
| events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"} | |
| if not events.empty and 'date' in events.columns: | |
| RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy() | |
| events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) | |
| def parse_financial_number(s): | |
| if not isinstance(s, str) or not s: return np.nan | |
| s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s | |
| if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]] | |
| else: multiplier = 1.0 | |
| try: return float(val_str) * multiplier | |
| except (ValueError, TypeError): return np.nan | |
| if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0) | |
| else: events['surprise'] = 0 | |
| events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('') | |
| if not events.empty: | |
| inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy() | |
| processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1) | |
| merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) | |
| all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame() | |
| if not all_high_impact_events.empty: | |
| upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]] | |
| if not upcoming_events.empty: | |
| next_event = upcoming_events.iloc[0]; time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"} | |
| else: merged_data['time_to_event'] = 9999 | |
| df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9 | |
| time_diffs = df_index_sec.reshape(-1, 1) - event_times_sec[None, :] | |
| merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 | |
| else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 | |
| merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int) | |
| finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True) | |
| return merged_data, next_event_info | |
| class CausalReasoningNetwork: | |
| def __init__(self, processed_data): self.data = processed_data.copy() | |
| def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): | |
| if len(self.data) < 20: self.data['regime'] = 'N/A'; return self.data | |
| atr = self.data[volatility_indicator]; low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66) | |
| ema_slope = self.data[trend_indicator].diff(periods=3) | |
| regimes = [] | |
| for i in range(len(self.data)): | |
| atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 | |
| if pd.isna(atr_val) or pd.isna(low_vol_threshold) or pd.isna(high_vol_threshold): regimes.append('N/A'); continue | |
| if atr_val > high_vol_threshold: regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT' | |
| elif atr_val < low_vol_threshold: regime = 'RANGING' | |
| else: regime = 'CHOPPY' | |
| regimes.append(regime) | |
| self.data['regime'] = regimes; return self.data | |
| class RuleBasedSituationRoom: | |
| def __init__(self, params): self.params = params | |
| def generate_thesis(self, predictions, sequence_df): | |
| latest_data = sequence_df.iloc[-1]; current_price = latest_data['close'] | |
| if not predictions: | |
| dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL" | |
| action, confidence, strategy, reasoning = dir_ema, "LOW", "ema_crossover", f"Simple EMA Crossover ({dir_ema})." | |
| else: | |
| dir_5m = "BUY" if predictions['5m'] > current_price else "SELL"; dir_15m = "BUY" if predictions['15m'] > current_price else "SELL"; dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" | |
| action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "predictive_rule_based", "Prediction horizons diverge." | |
| if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence ({dir_5m}).", "predictive_rule_based" | |
| elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "predictive_rule_based" | |
| elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "predictive_rule_based" | |
| if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy": strategy, "reasoning": reasoning} | |
| atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 | |
| sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) | |
| if confidence == "MEDIUM": tp_mult *= 0.75 | |
| if confidence == "LOW": tp_mult *= 0.5 | |
| if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) | |
| else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) | |
| return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy":strategy} | |
| ### V23 EVOLUTION: The PersistentDeepMind ### | |
| class PersistentDeepMind: | |
| def __init__(self, memory): | |
| self.memory = memory | |
| self.causal_rulebook = self.load_verified_rules() | |
| print(f"✅ PersistentDeepMind initialized. Loaded {len(self.causal_rulebook)} verified causal rules.") | |
| def load_verified_rules(self): | |
| verified_rules = self.memory.get_verified_rules() | |
| return verified_rules if verified_rules else {} | |
| def review_trade(self, trade_thesis, current_context_df): | |
| action, strategy = trade_thesis.get('action'), trade_thesis.get('strategy') | |
| current_context = self._discretize_context(current_context_df.iloc[-1], current_context_df) | |
| current_context['strategy'] = strategy | |
| if action == 'NO_TRADE': return True, "Thesis is NO_TRADE.", 0.0 | |
| if not current_context.get('regime') or current_context['regime'] == 'N/A': return False, f"❌ VETOED: 'Regime Blindness'.", 0.0 | |
| causal_alignment_bonus = 0.0 | |
| for rule_id, rule in self.causal_rulebook.items(): | |
| if self._context_matches_rule(current_context, rule): | |
| if rule['outcome'] == 'success': | |
| causal_alignment_bonus = rule.get('confidence', 0.1) | |
| print(f"MIND: Aligning with positive rule: {rule['text']}") | |
| elif rule['outcome'] == 'failure' and rule.get('confidence', 0.8): # Veto only for very high confidence failures | |
| return False, f"❌ VETOED: Matches high-confidence failure rule: {rule['text']}", 0.0 | |
| return True, f"✅ APPROVED: No high-confidence negative rules matched.", causal_alignment_bonus | |
| def _context_matches_rule(self, current_context, rule): | |
| ### V23 FIX: Correctly check the top-level strategy ### | |
| if current_context.get('strategy') != rule.get('strategy'): | |
| return False | |
| for feature, condition in rule['conditions'].items(): | |
| if current_context.get(feature) != condition.get('eq'): return False | |
| return True | |
| def _discretize_context(self, context_series, historical_df): | |
| context = context_series.to_dict() | |
| context['rsi_state'] = "overbought" if context.get('RSI', 50) > 70 else "oversold" if context.get('RSI', 50) < 30 else "neutral" | |
| if not historical_df.empty and 'ATR' in historical_df.columns: | |
| low_thresh, high_thresh = historical_df['ATR'].quantile(0.33), historical_df['ATR'].quantile(0.66) | |
| current_atr = context.get('ATR', 0) | |
| if current_atr < low_thresh: context['atr_quantile'] = 'low' | |
| elif current_atr > high_thresh: context['atr_quantile'] = 'high' | |
| else: context['atr_quantile'] = 'medium' | |
| else: context['atr_quantile'] = 'unknown' | |
| context['is_trading_hours'] = 1 if context.get('session_ny', 0) == 1 or context.get('session_london', 0) == 1 else 0 | |
| context['has_upcoming_event'] = 1 if context.get('time_to_event', 9999) < 1 else 0 | |
| return context | |
| def introspect_episode(self, evaluated_signal, last_context_df): | |
| if last_context_df.empty: return | |
| context_snapshot = self._discretize_context(last_context_df.iloc[-1], last_context_df) | |
| pnl, strategy = evaluated_signal.get('pnl'), evaluated_signal.get('strategy_chosen') | |
| if pnl is None or not strategy: return | |
| hypothesis_data = {"pnl": pnl, "strategy": strategy, **context_snapshot} | |
| self.memory.log_hypothesis(hypothesis_data) | |
| def test_hypotheses(self): | |
| print("MIND: Starting Deep Knowledge Consolidation..."); all_episodes = self.memory.get_all_hypotheses() | |
| if not all_episodes or len(all_episodes) < 30: # Higher data requirement for deep analysis | |
| print("MIND: Not enough historical data for robust deep analysis."); return | |
| df = pd.DataFrame(all_episodes) | |
| if 'strategy' not in df.columns or 'regime' not in df.columns: return | |
| df.dropna(subset=['strategy', 'regime', 'pnl'], inplace=True) | |
| INTERESTING_FEATURES = ['rsi_state', 'atr_quantile', 'is_trading_hours', 'has_upcoming_event'] | |
| current_rules = self.load_verified_rules() | |
| newly_discovered_patterns = {} | |
| # --- V23 EVOLUTION: Deep Causal Discovery (testing combinations) --- | |
| for r in range(1, 3): # Test single features (r=1) and pairs of features (r=2) | |
| for combo in itertools.combinations(INTERESTING_FEATURES, r): | |
| grouping_keys = ['strategy', 'regime'] + list(combo) | |
| if not all(key in df.columns for key in grouping_keys): continue | |
| base_groups = df.groupby(grouping_keys) | |
| for keys, group_df in base_groups: | |
| if len(group_df) < 7: continue # Need sufficient evidence for multi-feature rules | |
| mean_pnl = group_df['pnl'].mean() | |
| if abs(mean_pnl) > 0.00015: # Stricter threshold for significance | |
| # Construct a unique ID for this combination | |
| rule_id = "|".join(map(str, keys)) | |
| newly_discovered_patterns[rule_id] = { | |
| 'mean_pnl': mean_pnl, | |
| 'evidence_count': len(group_df), | |
| 'keys': keys, | |
| 'combo': combo | |
| } | |
| now_iso = datetime.now(timezone.utc).isoformat() | |
| for rule_id, new_evidence in newly_discovered_patterns.items(): | |
| impact = new_evidence['mean_pnl'] | |
| ### V23 FIX: Principled Confidence Scaling [0, 1] ### | |
| REASONABLE_MAX_IMPACT = 0.001 # A 5-pip move, a strong signal | |
| confidence = min(abs(impact) / REASONABLE_MAX_IMPACT, 1.0) | |
| if rule_id in current_rules: # Reinforce existing rule | |
| rule = current_rules[rule_id] | |
| rule['evidence_count'] += new_evidence['evidence_count'] | |
| rule['total_impact'] = (rule['total_impact'] * rule['evidence_count'] + impact * new_evidence['evidence_count']) / rule['evidence_count'] | |
| rule['confidence'] = (rule['confidence'] * 0.8) + (confidence * 0.2) # Smoothed update | |
| rule['last_validated'] = now_iso | |
| print(f"MIND: REINFORCED rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})") | |
| else: # Discover new rule | |
| outcome = "success" if impact > 0 else "failure" | |
| strategy, regime = new_evidence['keys'][0], new_evidence['keys'][1] | |
| conditions = {'regime': {'eq': regime}} | |
| text_conditions = [] | |
| for i, feature_name in enumerate(new_evidence['combo']): | |
| feature_value = new_evidence['keys'][2+i] | |
| conditions[feature_name] = {'eq': feature_value} | |
| text_conditions.append(f"'{feature_name}' is '{feature_value}'") | |
| rule_text = f"'{strategy}' in '{regime}' when {' AND '.join(text_conditions)} -> {outcome}" | |
| rule = {'rule_id': rule_id, 'text': rule_text, 'strategy': strategy, 'outcome': outcome, 'confidence': confidence, 'evidence_count': new_evidence['evidence_count'], 'total_impact': impact * new_evidence['evidence_count'], 'conditions': conditions, 'last_validated': now_iso} | |
| current_rules[rule_id] = rule | |
| print(f"MIND: DISCOVERED DEEP rule: {rule['text']}") | |
| ### V23 FIX: Patient Forgetting (25-hour cycle) ### | |
| rules_to_prune = [] | |
| CONFIDENCE_DECAY_PERIOD_SECONDS = 90000 # ~25 hours | |
| for rule_id, rule in current_rules.items(): | |
| last_seen = datetime.fromisoformat(rule['last_validated']) | |
| if (datetime.now(timezone.utc) - last_seen).total_seconds() > CONFIDENCE_DECAY_PERIOD_SECONDS: | |
| rule['confidence'] *= 0.5 # Harsher decay for truly old rules | |
| print(f"MIND: Decaying confidence for old rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})") | |
| if rule['confidence'] < 0.05: | |
| rules_to_prune.append(rule_id) | |
| for rule_id in rules_to_prune: | |
| print(f"MIND: PRUNED obsolete rule: {current_rules[rule_id]['text']}") | |
| del current_rules[rule_id] | |
| self.memory.save_verified_rules(current_rules); self.causal_rulebook = self.load_verified_rules() | |
| print(f"MIND: Deep Mind consolidated knowledge base ({len(current_rules)} rules).") | |
| class PredictionCoreTransformer: | |
| def __init__(self, sequence_length=48): self.scaler=None; self.model=None; self.sequence_length=sequence_length; self.feature_names=None; self.calibrated_model_path='calibrated_model.keras'; self.calibrated_scaler_path='calibrated_scaler.joblib'; self.calibrated_features_path='calibrated_features.json' | |
| def is_calibrated(self): return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) | |
| def load_calibrated_artifacts(self): | |
| print("--- Loading pre-calibrated artifacts ---"); self.model = load_model(self.calibrated_model_path); self.scaler = joblib.load(self.calibrated_scaler_path) | |
| with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) | |
| print("--- Instant startup successful ---"); return True | |
| def predict_single(self, input_sequence): | |
| if not self.scaler: raise RuntimeError("Agent not calibrated.") | |
| features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model) | |
| reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped, verbose=0) | |
| preds = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {preds['5m']:.5f} | 15m: {preds['15m']:.5f} | 1h: {preds['1h']:.5f}" | |
| return preds, preds_str | |
| class LinUCBBandit: | |
| def __init__(self, strategies, d, alpha=1.0, regularization=1.0): self.strategies=list(strategies); self.d=d; self.alpha=alpha; self.reg=regularization; self.A={s:(self.reg*np.eye(self.d)) for s in self.strategies}; self.b={s:np.zeros(self.d) for s in self.strategies}; self._last_decision_info={} | |
| def select(self, context_vector): | |
| scores={}; uncertainties={}; mean_rewards={} | |
| for s in self.strategies: | |
| A_inv=np.linalg.inv(self.A[s]); theta=A_inv.dot(self.b[s]); mean_reward=theta.dot(context_vector) | |
| uncertainty=self.alpha*math.sqrt(context_vector.dot(A_inv).dot(context_vector)) | |
| scores[s]=mean_reward+uncertainty; uncertainties[s]=uncertainty; mean_rewards[s]=mean_reward | |
| chosen=max(scores,key=scores.get) | |
| self._last_decision_info={'scores':scores,'uncertainties':uncertainties,'mean_rewards':mean_rewards,'chosen':chosen}; return chosen | |
| def get_last_decision_info(self): return self._last_decision_info | |
| def update(self, strategy, context_vector, reward): x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x | |
| def increase_exploration(self, factor=1.2): self.alpha *= factor; print(f"CONCEPT DRIFT: Increased alpha to {self.alpha:.2f}") | |
| class IntrinsicRewardGeneratorV2: | |
| def __init__(self, bandit): self.bandit = bandit; print("✅ IntrinsicRewardGeneratorV2 initialized.") | |
| def calculate_reward(self, extrinsic_reward, chosen_strategy, causal_alignment_bonus): | |
| last_decision = self.bandit.get_last_decision_info() | |
| if not last_decision: return 0.0 | |
| uncertainty_reward = last_decision['uncertainties'].get(chosen_strategy, 0.0) | |
| predicted_reward = last_decision['mean_rewards'].get(chosen_strategy, 0.0) | |
| surprise_reward = abs(extrinsic_reward - predicted_reward) | |
| intrinsic_reward = (0.2 * uncertainty_reward) + (0.4 * surprise_reward) + (0.4 * causal_alignment_bonus) | |
| return np.clip(intrinsic_reward, 0, 0.5) | |
| class PersistentMemoryV5: | |
| def __init__(self, reflect_interval): | |
| self.db_refs = {}; self.reflect_interval = reflect_interval | |
| try: | |
| sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') | |
| if not all([sa_key_json, db_url]): raise ValueError("Firebase secrets not set.") | |
| cred = credentials.Certificate(json.loads(sa_key_json)); | |
| if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url}) | |
| self.db_refs['signals']=db.reference('signals_v5'); self.db_refs['hypotheses']=db.reference('knowledge_base_v5/hypotheses'); self.db_refs['rules']=db.reference('knowledge_base_v5/verified_rules') | |
| print("✅ PersistentMemoryV5 initialized at /signals_v5.") | |
| except Exception as e: print(f"CRITICAL ERROR - PersistentMemoryV5 failed: {e}"); self.db_refs = {} | |
| def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, reasoning, context_snapshot): | |
| if 'signals' not in self.db_refs: return None | |
| try: | |
| signal_data = {"timestamp_entry":ts,"strategy_chosen":strategy,"action":action,"entry_price":float(entry),"stop_loss":float(sl),"take_profit":float(tp),"reasoning_initial":reasoning,"context_vector":[float(x) for x in context_vector.tolist()],"context_snapshot":context_snapshot} | |
| new_signal_ref = self.db_refs['signals'].push(signal_data); event_id=new_signal_ref.key; new_signal_ref.update({"event_id":event_id}) | |
| print(f"MEMORY V5: Logged '{action}' signal to signals_v5. ID: {event_id}"); return event_id | |
| except Exception as e: print(f"MEMORY V5 ERROR: {e}"); return None | |
| def log_hypothesis(self, hypothesis_data): | |
| if 'hypotheses' in self.db_refs: self.db_refs['hypotheses'].push(hypothesis_data) | |
| def get_all_hypotheses(self): | |
| data = self.db_refs.get('hypotheses').get() if 'hypotheses' in self.db_refs else None | |
| return list(data.values()) if data else [] | |
| def get_all_signals(self): return self.db_refs.get('signals').get() if 'signals' in self.db_refs else None | |
| def get_verified_rules(self): return self.db_refs.get('rules').get() if 'rules' in self.db_refs else None | |
| def save_verified_rules(self, rulebook): | |
| if 'rules' in self.db_refs: | |
| # --- FINAL FIX: Sanitize the rulebook before saving --- | |
| sanitized_rulebook = sanitize_for_json(rulebook) | |
| self.db_refs['rules'].set(sanitized_rulebook) | |
| print("MEMORY V5: Saved persistent rules to knowledge_base_v5.") | |
| def get_signal_ref(self): return self.db_refs.get('signals') | |
| class PageHinkley: | |
| def __init__(self, delta=0.005, lambda_=50, alpha=1-1e-3): self.mean,self.delta,self.lambda_,self.alpha,self.cumulative=0.0,delta,lambda_,alpha,0.0 | |
| def update(self, x): self.mean=self.mean*self.alpha+x*(1-self.alpha); self.cumulative=max(0,self.cumulative+x-self.mean-self.delta); return self.cumulative>self.lambda_ | |
| class StrategyManager: | |
| def __init__(self, situation_room, prediction_engine): self.situation_room=situation_room; self.prediction_engine=prediction_engine | |
| def list_strategies(self): return {"predictive_rule_based":self.predictive_strategy, "ema_crossover":self.ema_crossover_strategy} | |
| def predictive_strategy(self, seq): preds_dict, preds_str=self.prediction_engine.predict_single(seq); return self.situation_room.generate_thesis(preds_dict, seq), preds_str | |
| def ema_crossover_strategy(self, seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)" | |
| CONTEXT_FEATURES=['close','ATR','EMA_20','RSI','time_since_event','time_to_event','hour_of_day'] | |
| REGIME_COLS=['regime_TRENDING','regime_BREAKOUT','regime_CHOPPY','regime_RANGING'] | |
| CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS) | |
| class ContextVectorPreprocessor: | |
| def __init__(self): self.feature_names=CONTEXT_FEATURES; self.regime_cols=REGIME_COLS | |
| def build_context_vector(self, df_with_regime): | |
| if 'regime' not in df_with_regime.columns: raise ValueError("DF must have 'regime' column.") | |
| context_df=df_with_regime[self.feature_names]; scaler=MinMaxScaler(feature_range=(-1,1)); scaled_context_data=scaler.fit_transform(context_df) | |
| scaled_numeric_vec=scaled_context_data[-1].flatten(); last_regime=df_with_regime.iloc[-1]['regime']; regime_vec=np.zeros(len(self.regime_cols)) | |
| try: regime_idx=self.regime_cols.index(f"regime_{last_regime}"); regime_vec[regime_idx]=1 | |
| except ValueError: pass | |
| return np.concatenate([scaled_numeric_vec, regime_vec]) | |
| class LiveDataStore: | |
| def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120): self.api_key=api_key; self.finbert_tokenizer=finbert_tokenizer; self.finbert_model=finbert_model; self.update_interval_seconds=update_interval_seconds; self._data_lock=threading.Lock(); self._latest_features=pd.DataFrame(); self._next_event_info={"title": "N/A"}; self._stop_event=threading.Event(); self._update_thread=None | |
| def _fetch_and_update(self, output_size=1000): | |
| try: | |
| price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size) | |
| if price_data.empty: return | |
| events_data = fetch_live_events_with_cache() | |
| features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model) | |
| with self._data_lock: self._latest_features=features; self._next_event_info=next_event_info | |
| except Exception as e: print(f"LiveDataStore ERROR: {e}") | |
| self._stop_event.wait(self.update_interval_seconds) | |
| def _update_loop(self): | |
| while not self._stop_event.is_set(): self._fetch_and_update() | |
| def start(self): | |
| if self._update_thread is None or not self._update_thread.is_alive(): print("LiveDataStore: Starting background thread."); self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True); self._update_thread.start() | |
| print("LiveDataStore: Performing initial data fetch..."); self._fetch_and_update() | |
| def stop(self): | |
| if self._update_thread and self._update_thread.is_alive(): print("LiveDataStore: Stopping background thread."); self._stop_event.set(); self._update_thread.join() | |
| def get_latest_data(self, num_bars=None): | |
| with self._data_lock: | |
| if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info | |
| return self._latest_features.copy(), self._next_event_info | |
| def get_raw_price_data(self, num_bars=None): | |
| with self._data_lock: | |
| if self._latest_features.empty: return pd.DataFrame() | |
| df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy() | |
| if num_bars: df_slice = df_slice.iloc[-num_bars:] | |
| df_slice.reset_index(inplace=True); df_slice['Datetime'] = pd.to_datetime(df_slice['Datetime']).dt.tz_localize('UTC') if df_slice['Datetime'].dt.tz is None else df_slice['Datetime'].dt.tz_convert('UTC') | |
| return df_slice | |
| class AutonomyBridgeV3: | |
| def __init__(self, components, act_interval_sec=300, reflect_interval_sec=3600): | |
| self.comps=components; self.act_interval=act_interval_sec; self.reflect_interval=reflect_interval_sec; self.last_reflect_time=0; self.ntfy_topic=os.environ.get('NTFY_TOPIC_V5'); self._last_context_df=pd.DataFrame() | |
| def evaluate_and_introspect(self): | |
| print("BRIDGE V3: Evaluating signals for causal introspection..."); db_ref = self.comps['memory'].get_signal_ref() | |
| if not db_ref: return | |
| now_utc = pd.Timestamp.now(tz='UTC') | |
| try: | |
| all_signals = self.comps['memory'].get_all_signals(); | |
| if not all_signals: return | |
| live_price_history = self.comps['live_data'].get_raw_price_data(num_bars=288) | |
| if live_price_history.empty: return | |
| for key, signal in all_signals.items(): | |
| if signal.get('reward') is not None: continue | |
| if not (ts_str := signal.get('timestamp_entry')): continue | |
| signal_time = pd.to_datetime(ts_str) | |
| if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue | |
| entry,sl,tp,action = float(signal['entry_price']),float(signal['stop_loss']),float(signal['take_profit']),signal['action'] | |
| relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time] | |
| if relevant_bars.empty: continue | |
| outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] | |
| for _, bar in relevant_bars.iterrows(): | |
| if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome_reason = sl, "SL"; break | |
| if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome_reason = tp, "TP"; break | |
| extrinsic_reward = np.clip(((exit_price - entry) if action == 'BUY' else (entry - exit_price)) / 0.005, -1.0, 1.0) | |
| intrinsic_reward = self.comps['intrinsic_rewards'].calculate_reward(extrinsic_reward, signal['strategy_chosen'], 0.0) | |
| total_reward = extrinsic_reward + intrinsic_reward | |
| self.comps['bandit'].update(signal['strategy_chosen'], np.array(signal['context_vector']), total_reward) | |
| if self.comps['change_detector'].update(-extrinsic_reward): self.comps['bandit'].increase_exploration() | |
| pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price) | |
| update_payload = {'pnl':pnl,'reward':total_reward,'outcome_reason':outcome_reason,'exit_price':exit_price,'timestamp_exit':now_utc.isoformat()} | |
| db_ref.child(key).update(update_payload) | |
| context_df_for_introspection = pd.DataFrame([signal['context_snapshot']]) if 'context_snapshot' in signal else pd.DataFrame() | |
| self.comps['mind'].introspect_episode({**signal, **update_payload}, context_df_for_introspection) | |
| except Exception as e: print(f"BRIDGE V3 EVALUATOR ERROR: {e}"); traceback.print_exc() | |
| def act_cycle(self): | |
| print(f"BODY: [{pd.Timestamp.now(tz='UTC')}] Waking up..."); c = self.comps | |
| features, next_event_info = c['live_data'].get_latest_data(num_bars=c['pred_engine'].sequence_length + 50) | |
| if features.empty or len(features) < c['pred_engine'].sequence_length: print("BODY: Not enough data."); return | |
| causal_engine = CausalReasoningNetwork(features.copy()); features_with_regime = causal_engine.identify_volatility_regimes() | |
| ctx_vec = c['ctx_preprocessor'].build_context_vector(features_with_regime); self._last_context_df = features_with_regime.copy() | |
| input_sequence = features_with_regime.iloc[-c['pred_engine'].sequence_length:] | |
| chosen_strategy_name = c['bandit'].select(ctx_vec) | |
| trade_thesis, preds_str = c['strat_manager'].list_strategies()[chosen_strategy_name](input_sequence) | |
| is_tradeable, mind_reasoning, causal_bonus = c['mind'].review_trade(trade_thesis, self._last_context_df) | |
| final_action = trade_thesis['action'] if is_tradeable and trade_thesis['action'] != 'NO_TRADE' else "NO TRADE" | |
| if final_action in ["BUY", "SELL"]: | |
| ts = pd.Timestamp.now(tz='UTC').isoformat(); context_snapshot = self._last_context_df.iloc[-1].to_dict() | |
| c['memory'].log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'], trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec, trade_thesis['reasoning'], context_snapshot) | |
| send_ntfy_notification(self.ntfy_topic, trade_thesis) | |
| final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}' -> Mind Verdict: {mind_reasoning}" | |
| status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": input_sequence.iloc[-1]['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info.get('time_str', 'N/A')})"} | |
| with open('status.json', 'w') as f: json.dump(status, f) | |
| print(f"BODY: Analysis complete. Signal: {final_action}.") | |
| def reflect_cycle(self): | |
| print(f"MIND: [{pd.Timestamp.now(tz='UTC')}] Waking up for reflection cycle..."); self.evaluate_and_introspect() | |
| self.comps['mind'].test_hypotheses(); print("MIND: Reflection cycle complete."); self.last_reflect_time = time.time() | |
| def run(self): | |
| print("--- AUTONOMY BRIDGE V3 (RMRL): Starting main adaptive loop. ---") | |
| while True: | |
| try: | |
| self.act_cycle(); | |
| if (time.time() - self.last_reflect_time) > self.reflect_interval: self.reflect_cycle() | |
| print(f"BRIDGE V3: Sleeping for {self.act_interval} seconds...") | |
| time.sleep(self.act_interval) | |
| except Exception as e: print(f"AUTONOMY BRIDGE V3 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60) | |
| # --- MAIN WORKER LOOP --- | |
| def main_worker(): | |
| print("--- [Persistent Deep Mind Agent V23 (RMRL)] Worker Thread Started ---") | |
| api_key, hf_token = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN') | |
| REFLECT_INTERVAL_SECONDS = 3600 # 1 hour | |
| HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models" | |
| print("--- Downloading agent artifacts from Hugging Face Hub ---") | |
| try: | |
| for filename in ['calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json']: hf_hub_download(repo_id=HF_REPO_ID, filename=filename, token=hf_token, local_dir='.', local_dir_use_symlinks=False) | |
| print("--- All artifacts downloaded successfully. ---") | |
| except Exception as e: print(f"FATAL: Failed to download artifacts: {e}"); return | |
| print("Initializing FinBERT..."); finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert'); finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) | |
| prediction_engine = PredictionCoreTransformer(); context_preprocessor = ContextVectorPreprocessor() | |
| live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120); live_data_store.start() | |
| if not prediction_engine.is_calibrated(): print("CRITICAL: Calibrated artifacts not found. Agent cannot run."); live_data_store.stop(); return | |
| prediction_engine.load_calibrated_artifacts() | |
| situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) | |
| strategy_manager = StrategyManager(situation_room, prediction_engine) | |
| memory = PersistentMemoryV5(reflect_interval=REFLECT_INTERVAL_SECONDS) | |
| mind = PersistentDeepMind(memory) | |
| bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) | |
| change_detector = PageHinkley(); intrinsic_reward_gen = IntrinsicRewardGeneratorV2(bandit) | |
| components = {"live_data": live_data_store, "pred_engine": prediction_engine, "ctx_preprocessor": context_preprocessor, "strat_manager": strategy_manager, "bandit": bandit, "mind": mind, "memory": memory, "change_detector": change_detector, "intrinsic_rewards": intrinsic_reward_gen} | |
| bridge = AutonomyBridgeV3(components, reflect_interval_sec=REFLECT_INTERVAL_SECONDS); bridge.run() | |
| # --- GRADIO DASHBOARD AND STARTUP --- | |
| def get_latest_status(): | |
| if not os.path.exists('status.json'): return "Agent is calibrating...", "", "", "", "", "", "" | |
| with open('status.json', 'r') as f: status = json.load(f) | |
| return (f"Status: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A')) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🧠 V23 Persistent Deep Mind (RMRL)") | |
| gr.Markdown("This agent has a **persistent memory** and discovers **deep causal links** by testing combinations of features. Its knowledge compounds, and it gracefully forgets outdated information over a 24-hour cycle.") | |
| all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V5', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]) | |
| secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING (check NTFY_TOPIC_V5)." | |
| gr.Markdown(f"**Secrets Status:** {secret_status}") | |
| refresh_btn = gr.Button("Refresh Status", variant="primary") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| gr.Markdown("## Agent's Last Analysis (Body)") | |
| with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime") | |
| with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event") | |
| action_output = gr.Textbox(label="Final Signal / Action") | |
| reasoning_output = gr.Textbox(label="Full Reasoning (incl. Causal Mind's Verdict)", lines=4) | |
| refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output]) | |
| if __name__ == "__main__": | |
| if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]): | |
| print("FATAL: Core secrets are missing. Worker cannot start.") | |
| else: | |
| worker_thread = threading.Thread(target=main_worker, daemon=True); worker_thread.start() | |
| demo.launch() |