# app_v23_deep_mind.py - An RMRL agent with a persistent, deep-reasoning mind. # --- Core Libraries --- import pandas as pd import numpy as np import warnings import joblib import json import os import gradio as gr import requests import time from datetime import datetime, timezone, timedelta import pytz import threading import math from huggingface_hub import hf_hub_download import firebase_admin from firebase_admin import credentials, db from collections import defaultdict import traceback import itertools # --- Environment and Dependencies --- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore", category=UserWarning, module='sklearn') # --- ML & DL Libraries --- import tensorflow as tf from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Model, load_model from tensorflow.keras.utils import Sequence from transformers import BertTokenizer, TFBertModel # --- Live Data Fetching --- from twelvedata import TDClient EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json" CACHE_DURATION_SECONDS = 600 _EVENT_CACHE = {"data": None, "timestamp": 0} # --- UTILITY FUNCTIONS --- def fetch_live_events_with_cache(): current_time = time.time() if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"] try: response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V23-Agent/1.0"}, timeout=10) response.raise_for_status(); data = response.json(); _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time; return data except requests.RequestException as e: print(f"Error fetching event data: {e}"); return _EVENT_CACHE.get("data", []) def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200): try: td = TDClient(apikey=api_key); ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC") df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df except Exception as e: print(f"Error fetching price data: {e}"); return pd.DataFrame() def send_ntfy_notification(topic, trade_thesis): if not topic: return title = f"V23 Signal: {trade_thesis.get('action')} EUR/USD"; message = (f"Strategy: {trade_thesis.get('strategy')} ({trade_thesis.get('confidence')})\n" f"Reason: {trade_thesis.get('reasoning')}\n" f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}") try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.") except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}") def sanitize_for_json(obj): """Recursively converts numpy types to native Python types for JSON serialization.""" if isinstance(obj, dict): return {k: sanitize_for_json(v) for k, v in obj.items()} elif isinstance(obj, list): return [sanitize_for_json(elem) for elem in obj] elif isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() return obj def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model): price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True) if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer') else: price_features = price_features.tz_convert('UTC') price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True) events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"} if not events.empty and 'date' in events.columns: RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy() events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True) def parse_financial_number(s): if not isinstance(s, str) or not s: return np.nan s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]] else: multiplier = 1.0 try: return float(val_str) * multiplier except (ValueError, TypeError): return np.nan if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0) else: events['surprise'] = 0 events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('') if not events.empty: inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy() processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1) merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60)) all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame() if not all_high_impact_events.empty: upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]] if not upcoming_events.empty: next_event = upcoming_events.iloc[0]; time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"} else: merged_data['time_to_event'] = 9999 df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9 time_diffs = df_index_sec.reshape(-1, 1) - event_times_sec[None, :] merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600 else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999 merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int) finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True) return merged_data, next_event_info class CausalReasoningNetwork: def __init__(self, processed_data): self.data = processed_data.copy() def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'): if len(self.data) < 20: self.data['regime'] = 'N/A'; return self.data atr = self.data[volatility_indicator]; low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66) ema_slope = self.data[trend_indicator].diff(periods=3) regimes = [] for i in range(len(self.data)): atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0 if pd.isna(atr_val) or pd.isna(low_vol_threshold) or pd.isna(high_vol_threshold): regimes.append('N/A'); continue if atr_val > high_vol_threshold: regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT' elif atr_val < low_vol_threshold: regime = 'RANGING' else: regime = 'CHOPPY' regimes.append(regime) self.data['regime'] = regimes; return self.data class RuleBasedSituationRoom: def __init__(self, params): self.params = params def generate_thesis(self, predictions, sequence_df): latest_data = sequence_df.iloc[-1]; current_price = latest_data['close'] if not predictions: dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL" action, confidence, strategy, reasoning = dir_ema, "LOW", "ema_crossover", f"Simple EMA Crossover ({dir_ema})." else: dir_5m = "BUY" if predictions['5m'] > current_price else "SELL"; dir_15m = "BUY" if predictions['15m'] > current_price else "SELL"; dir_1h = "BUY" if predictions['1h'] > current_price else "SELL" action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "predictive_rule_based", "Prediction horizons diverge." if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence ({dir_5m}).", "predictive_rule_based" elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "predictive_rule_based" elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "predictive_rule_based" if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy": strategy, "reasoning": reasoning} atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001 sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0) if confidence == "MEDIUM": tp_mult *= 0.75 if confidence == "LOW": tp_mult *= 0.5 if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr) else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr) return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy":strategy} ### V23 EVOLUTION: The PersistentDeepMind ### class PersistentDeepMind: def __init__(self, memory): self.memory = memory self.causal_rulebook = self.load_verified_rules() print(f"✅ PersistentDeepMind initialized. Loaded {len(self.causal_rulebook)} verified causal rules.") def load_verified_rules(self): verified_rules = self.memory.get_verified_rules() return verified_rules if verified_rules else {} def review_trade(self, trade_thesis, current_context_df): action, strategy = trade_thesis.get('action'), trade_thesis.get('strategy') current_context = self._discretize_context(current_context_df.iloc[-1], current_context_df) current_context['strategy'] = strategy if action == 'NO_TRADE': return True, "Thesis is NO_TRADE.", 0.0 if not current_context.get('regime') or current_context['regime'] == 'N/A': return False, f"❌ VETOED: 'Regime Blindness'.", 0.0 causal_alignment_bonus = 0.0 for rule_id, rule in self.causal_rulebook.items(): if self._context_matches_rule(current_context, rule): if rule['outcome'] == 'success': causal_alignment_bonus = rule.get('confidence', 0.1) print(f"MIND: Aligning with positive rule: {rule['text']}") elif rule['outcome'] == 'failure' and rule.get('confidence', 0.8): # Veto only for very high confidence failures return False, f"❌ VETOED: Matches high-confidence failure rule: {rule['text']}", 0.0 return True, f"✅ APPROVED: No high-confidence negative rules matched.", causal_alignment_bonus def _context_matches_rule(self, current_context, rule): ### V23 FIX: Correctly check the top-level strategy ### if current_context.get('strategy') != rule.get('strategy'): return False for feature, condition in rule['conditions'].items(): if current_context.get(feature) != condition.get('eq'): return False return True def _discretize_context(self, context_series, historical_df): context = context_series.to_dict() context['rsi_state'] = "overbought" if context.get('RSI', 50) > 70 else "oversold" if context.get('RSI', 50) < 30 else "neutral" if not historical_df.empty and 'ATR' in historical_df.columns: low_thresh, high_thresh = historical_df['ATR'].quantile(0.33), historical_df['ATR'].quantile(0.66) current_atr = context.get('ATR', 0) if current_atr < low_thresh: context['atr_quantile'] = 'low' elif current_atr > high_thresh: context['atr_quantile'] = 'high' else: context['atr_quantile'] = 'medium' else: context['atr_quantile'] = 'unknown' context['is_trading_hours'] = 1 if context.get('session_ny', 0) == 1 or context.get('session_london', 0) == 1 else 0 context['has_upcoming_event'] = 1 if context.get('time_to_event', 9999) < 1 else 0 return context def introspect_episode(self, evaluated_signal, last_context_df): if last_context_df.empty: return context_snapshot = self._discretize_context(last_context_df.iloc[-1], last_context_df) pnl, strategy = evaluated_signal.get('pnl'), evaluated_signal.get('strategy_chosen') if pnl is None or not strategy: return hypothesis_data = {"pnl": pnl, "strategy": strategy, **context_snapshot} self.memory.log_hypothesis(hypothesis_data) def test_hypotheses(self): print("MIND: Starting Deep Knowledge Consolidation..."); all_episodes = self.memory.get_all_hypotheses() if not all_episodes or len(all_episodes) < 30: # Higher data requirement for deep analysis print("MIND: Not enough historical data for robust deep analysis."); return df = pd.DataFrame(all_episodes) if 'strategy' not in df.columns or 'regime' not in df.columns: return df.dropna(subset=['strategy', 'regime', 'pnl'], inplace=True) INTERESTING_FEATURES = ['rsi_state', 'atr_quantile', 'is_trading_hours', 'has_upcoming_event'] current_rules = self.load_verified_rules() newly_discovered_patterns = {} # --- V23 EVOLUTION: Deep Causal Discovery (testing combinations) --- for r in range(1, 3): # Test single features (r=1) and pairs of features (r=2) for combo in itertools.combinations(INTERESTING_FEATURES, r): grouping_keys = ['strategy', 'regime'] + list(combo) if not all(key in df.columns for key in grouping_keys): continue base_groups = df.groupby(grouping_keys) for keys, group_df in base_groups: if len(group_df) < 7: continue # Need sufficient evidence for multi-feature rules mean_pnl = group_df['pnl'].mean() if abs(mean_pnl) > 0.00015: # Stricter threshold for significance # Construct a unique ID for this combination rule_id = "|".join(map(str, keys)) newly_discovered_patterns[rule_id] = { 'mean_pnl': mean_pnl, 'evidence_count': len(group_df), 'keys': keys, 'combo': combo } now_iso = datetime.now(timezone.utc).isoformat() for rule_id, new_evidence in newly_discovered_patterns.items(): impact = new_evidence['mean_pnl'] ### V23 FIX: Principled Confidence Scaling [0, 1] ### REASONABLE_MAX_IMPACT = 0.001 # A 5-pip move, a strong signal confidence = min(abs(impact) / REASONABLE_MAX_IMPACT, 1.0) if rule_id in current_rules: # Reinforce existing rule rule = current_rules[rule_id] rule['evidence_count'] += new_evidence['evidence_count'] rule['total_impact'] = (rule['total_impact'] * rule['evidence_count'] + impact * new_evidence['evidence_count']) / rule['evidence_count'] rule['confidence'] = (rule['confidence'] * 0.8) + (confidence * 0.2) # Smoothed update rule['last_validated'] = now_iso print(f"MIND: REINFORCED rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})") else: # Discover new rule outcome = "success" if impact > 0 else "failure" strategy, regime = new_evidence['keys'][0], new_evidence['keys'][1] conditions = {'regime': {'eq': regime}} text_conditions = [] for i, feature_name in enumerate(new_evidence['combo']): feature_value = new_evidence['keys'][2+i] conditions[feature_name] = {'eq': feature_value} text_conditions.append(f"'{feature_name}' is '{feature_value}'") rule_text = f"'{strategy}' in '{regime}' when {' AND '.join(text_conditions)} -> {outcome}" rule = {'rule_id': rule_id, 'text': rule_text, 'strategy': strategy, 'outcome': outcome, 'confidence': confidence, 'evidence_count': new_evidence['evidence_count'], 'total_impact': impact * new_evidence['evidence_count'], 'conditions': conditions, 'last_validated': now_iso} current_rules[rule_id] = rule print(f"MIND: DISCOVERED DEEP rule: {rule['text']}") ### V23 FIX: Patient Forgetting (25-hour cycle) ### rules_to_prune = [] CONFIDENCE_DECAY_PERIOD_SECONDS = 90000 # ~25 hours for rule_id, rule in current_rules.items(): last_seen = datetime.fromisoformat(rule['last_validated']) if (datetime.now(timezone.utc) - last_seen).total_seconds() > CONFIDENCE_DECAY_PERIOD_SECONDS: rule['confidence'] *= 0.5 # Harsher decay for truly old rules print(f"MIND: Decaying confidence for old rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})") if rule['confidence'] < 0.05: rules_to_prune.append(rule_id) for rule_id in rules_to_prune: print(f"MIND: PRUNED obsolete rule: {current_rules[rule_id]['text']}") del current_rules[rule_id] self.memory.save_verified_rules(current_rules); self.causal_rulebook = self.load_verified_rules() print(f"MIND: Deep Mind consolidated knowledge base ({len(current_rules)} rules).") class PredictionCoreTransformer: def __init__(self, sequence_length=48): self.scaler=None; self.model=None; self.sequence_length=sequence_length; self.feature_names=None; self.calibrated_model_path='calibrated_model.keras'; self.calibrated_scaler_path='calibrated_scaler.joblib'; self.calibrated_features_path='calibrated_features.json' def is_calibrated(self): return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path]) def load_calibrated_artifacts(self): print("--- Loading pre-calibrated artifacts ---"); self.model = load_model(self.calibrated_model_path); self.scaler = joblib.load(self.calibrated_scaler_path) with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f) print("--- Instant startup successful ---"); return True def predict_single(self, input_sequence): if not self.scaler: raise RuntimeError("Agent not calibrated.") features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model) reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped, verbose=0) preds = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {preds['5m']:.5f} | 15m: {preds['15m']:.5f} | 1h: {preds['1h']:.5f}" return preds, preds_str class LinUCBBandit: def __init__(self, strategies, d, alpha=1.0, regularization=1.0): self.strategies=list(strategies); self.d=d; self.alpha=alpha; self.reg=regularization; self.A={s:(self.reg*np.eye(self.d)) for s in self.strategies}; self.b={s:np.zeros(self.d) for s in self.strategies}; self._last_decision_info={} def select(self, context_vector): scores={}; uncertainties={}; mean_rewards={} for s in self.strategies: A_inv=np.linalg.inv(self.A[s]); theta=A_inv.dot(self.b[s]); mean_reward=theta.dot(context_vector) uncertainty=self.alpha*math.sqrt(context_vector.dot(A_inv).dot(context_vector)) scores[s]=mean_reward+uncertainty; uncertainties[s]=uncertainty; mean_rewards[s]=mean_reward chosen=max(scores,key=scores.get) self._last_decision_info={'scores':scores,'uncertainties':uncertainties,'mean_rewards':mean_rewards,'chosen':chosen}; return chosen def get_last_decision_info(self): return self._last_decision_info def update(self, strategy, context_vector, reward): x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x def increase_exploration(self, factor=1.2): self.alpha *= factor; print(f"CONCEPT DRIFT: Increased alpha to {self.alpha:.2f}") class IntrinsicRewardGeneratorV2: def __init__(self, bandit): self.bandit = bandit; print("✅ IntrinsicRewardGeneratorV2 initialized.") def calculate_reward(self, extrinsic_reward, chosen_strategy, causal_alignment_bonus): last_decision = self.bandit.get_last_decision_info() if not last_decision: return 0.0 uncertainty_reward = last_decision['uncertainties'].get(chosen_strategy, 0.0) predicted_reward = last_decision['mean_rewards'].get(chosen_strategy, 0.0) surprise_reward = abs(extrinsic_reward - predicted_reward) intrinsic_reward = (0.2 * uncertainty_reward) + (0.4 * surprise_reward) + (0.4 * causal_alignment_bonus) return np.clip(intrinsic_reward, 0, 0.5) class PersistentMemoryV5: def __init__(self, reflect_interval): self.db_refs = {}; self.reflect_interval = reflect_interval try: sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL') if not all([sa_key_json, db_url]): raise ValueError("Firebase secrets not set.") cred = credentials.Certificate(json.loads(sa_key_json)); if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url}) self.db_refs['signals']=db.reference('signals_v5'); self.db_refs['hypotheses']=db.reference('knowledge_base_v5/hypotheses'); self.db_refs['rules']=db.reference('knowledge_base_v5/verified_rules') print("✅ PersistentMemoryV5 initialized at /signals_v5.") except Exception as e: print(f"CRITICAL ERROR - PersistentMemoryV5 failed: {e}"); self.db_refs = {} def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, reasoning, context_snapshot): if 'signals' not in self.db_refs: return None try: signal_data = {"timestamp_entry":ts,"strategy_chosen":strategy,"action":action,"entry_price":float(entry),"stop_loss":float(sl),"take_profit":float(tp),"reasoning_initial":reasoning,"context_vector":[float(x) for x in context_vector.tolist()],"context_snapshot":context_snapshot} new_signal_ref = self.db_refs['signals'].push(signal_data); event_id=new_signal_ref.key; new_signal_ref.update({"event_id":event_id}) print(f"MEMORY V5: Logged '{action}' signal to signals_v5. ID: {event_id}"); return event_id except Exception as e: print(f"MEMORY V5 ERROR: {e}"); return None def log_hypothesis(self, hypothesis_data): if 'hypotheses' in self.db_refs: self.db_refs['hypotheses'].push(hypothesis_data) def get_all_hypotheses(self): data = self.db_refs.get('hypotheses').get() if 'hypotheses' in self.db_refs else None return list(data.values()) if data else [] def get_all_signals(self): return self.db_refs.get('signals').get() if 'signals' in self.db_refs else None def get_verified_rules(self): return self.db_refs.get('rules').get() if 'rules' in self.db_refs else None def save_verified_rules(self, rulebook): if 'rules' in self.db_refs: # --- FINAL FIX: Sanitize the rulebook before saving --- sanitized_rulebook = sanitize_for_json(rulebook) self.db_refs['rules'].set(sanitized_rulebook) print("MEMORY V5: Saved persistent rules to knowledge_base_v5.") def get_signal_ref(self): return self.db_refs.get('signals') class PageHinkley: def __init__(self, delta=0.005, lambda_=50, alpha=1-1e-3): self.mean,self.delta,self.lambda_,self.alpha,self.cumulative=0.0,delta,lambda_,alpha,0.0 def update(self, x): self.mean=self.mean*self.alpha+x*(1-self.alpha); self.cumulative=max(0,self.cumulative+x-self.mean-self.delta); return self.cumulative>self.lambda_ class StrategyManager: def __init__(self, situation_room, prediction_engine): self.situation_room=situation_room; self.prediction_engine=prediction_engine def list_strategies(self): return {"predictive_rule_based":self.predictive_strategy, "ema_crossover":self.ema_crossover_strategy} def predictive_strategy(self, seq): preds_dict, preds_str=self.prediction_engine.predict_single(seq); return self.situation_room.generate_thesis(preds_dict, seq), preds_str def ema_crossover_strategy(self, seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)" CONTEXT_FEATURES=['close','ATR','EMA_20','RSI','time_since_event','time_to_event','hour_of_day'] REGIME_COLS=['regime_TRENDING','regime_BREAKOUT','regime_CHOPPY','regime_RANGING'] CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS) class ContextVectorPreprocessor: def __init__(self): self.feature_names=CONTEXT_FEATURES; self.regime_cols=REGIME_COLS def build_context_vector(self, df_with_regime): if 'regime' not in df_with_regime.columns: raise ValueError("DF must have 'regime' column.") context_df=df_with_regime[self.feature_names]; scaler=MinMaxScaler(feature_range=(-1,1)); scaled_context_data=scaler.fit_transform(context_df) scaled_numeric_vec=scaled_context_data[-1].flatten(); last_regime=df_with_regime.iloc[-1]['regime']; regime_vec=np.zeros(len(self.regime_cols)) try: regime_idx=self.regime_cols.index(f"regime_{last_regime}"); regime_vec[regime_idx]=1 except ValueError: pass return np.concatenate([scaled_numeric_vec, regime_vec]) class LiveDataStore: def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120): self.api_key=api_key; self.finbert_tokenizer=finbert_tokenizer; self.finbert_model=finbert_model; self.update_interval_seconds=update_interval_seconds; self._data_lock=threading.Lock(); self._latest_features=pd.DataFrame(); self._next_event_info={"title": "N/A"}; self._stop_event=threading.Event(); self._update_thread=None def _fetch_and_update(self, output_size=1000): try: price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size) if price_data.empty: return events_data = fetch_live_events_with_cache() features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model) with self._data_lock: self._latest_features=features; self._next_event_info=next_event_info except Exception as e: print(f"LiveDataStore ERROR: {e}") self._stop_event.wait(self.update_interval_seconds) def _update_loop(self): while not self._stop_event.is_set(): self._fetch_and_update() def start(self): if self._update_thread is None or not self._update_thread.is_alive(): print("LiveDataStore: Starting background thread."); self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True); self._update_thread.start() print("LiveDataStore: Performing initial data fetch..."); self._fetch_and_update() def stop(self): if self._update_thread and self._update_thread.is_alive(): print("LiveDataStore: Stopping background thread."); self._stop_event.set(); self._update_thread.join() def get_latest_data(self, num_bars=None): with self._data_lock: if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info return self._latest_features.copy(), self._next_event_info def get_raw_price_data(self, num_bars=None): with self._data_lock: if self._latest_features.empty: return pd.DataFrame() df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy() if num_bars: df_slice = df_slice.iloc[-num_bars:] df_slice.reset_index(inplace=True); df_slice['Datetime'] = pd.to_datetime(df_slice['Datetime']).dt.tz_localize('UTC') if df_slice['Datetime'].dt.tz is None else df_slice['Datetime'].dt.tz_convert('UTC') return df_slice class AutonomyBridgeV3: def __init__(self, components, act_interval_sec=300, reflect_interval_sec=3600): self.comps=components; self.act_interval=act_interval_sec; self.reflect_interval=reflect_interval_sec; self.last_reflect_time=0; self.ntfy_topic=os.environ.get('NTFY_TOPIC_V5'); self._last_context_df=pd.DataFrame() def evaluate_and_introspect(self): print("BRIDGE V3: Evaluating signals for causal introspection..."); db_ref = self.comps['memory'].get_signal_ref() if not db_ref: return now_utc = pd.Timestamp.now(tz='UTC') try: all_signals = self.comps['memory'].get_all_signals(); if not all_signals: return live_price_history = self.comps['live_data'].get_raw_price_data(num_bars=288) if live_price_history.empty: return for key, signal in all_signals.items(): if signal.get('reward') is not None: continue if not (ts_str := signal.get('timestamp_entry')): continue signal_time = pd.to_datetime(ts_str) if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue entry,sl,tp,action = float(signal['entry_price']),float(signal['stop_loss']),float(signal['take_profit']),signal['action'] relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time] if relevant_bars.empty: continue outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close'] for _, bar in relevant_bars.iterrows(): if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome_reason = sl, "SL"; break if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome_reason = tp, "TP"; break extrinsic_reward = np.clip(((exit_price - entry) if action == 'BUY' else (entry - exit_price)) / 0.005, -1.0, 1.0) intrinsic_reward = self.comps['intrinsic_rewards'].calculate_reward(extrinsic_reward, signal['strategy_chosen'], 0.0) total_reward = extrinsic_reward + intrinsic_reward self.comps['bandit'].update(signal['strategy_chosen'], np.array(signal['context_vector']), total_reward) if self.comps['change_detector'].update(-extrinsic_reward): self.comps['bandit'].increase_exploration() pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price) update_payload = {'pnl':pnl,'reward':total_reward,'outcome_reason':outcome_reason,'exit_price':exit_price,'timestamp_exit':now_utc.isoformat()} db_ref.child(key).update(update_payload) context_df_for_introspection = pd.DataFrame([signal['context_snapshot']]) if 'context_snapshot' in signal else pd.DataFrame() self.comps['mind'].introspect_episode({**signal, **update_payload}, context_df_for_introspection) except Exception as e: print(f"BRIDGE V3 EVALUATOR ERROR: {e}"); traceback.print_exc() def act_cycle(self): print(f"BODY: [{pd.Timestamp.now(tz='UTC')}] Waking up..."); c = self.comps features, next_event_info = c['live_data'].get_latest_data(num_bars=c['pred_engine'].sequence_length + 50) if features.empty or len(features) < c['pred_engine'].sequence_length: print("BODY: Not enough data."); return causal_engine = CausalReasoningNetwork(features.copy()); features_with_regime = causal_engine.identify_volatility_regimes() ctx_vec = c['ctx_preprocessor'].build_context_vector(features_with_regime); self._last_context_df = features_with_regime.copy() input_sequence = features_with_regime.iloc[-c['pred_engine'].sequence_length:] chosen_strategy_name = c['bandit'].select(ctx_vec) trade_thesis, preds_str = c['strat_manager'].list_strategies()[chosen_strategy_name](input_sequence) is_tradeable, mind_reasoning, causal_bonus = c['mind'].review_trade(trade_thesis, self._last_context_df) final_action = trade_thesis['action'] if is_tradeable and trade_thesis['action'] != 'NO_TRADE' else "NO TRADE" if final_action in ["BUY", "SELL"]: ts = pd.Timestamp.now(tz='UTC').isoformat(); context_snapshot = self._last_context_df.iloc[-1].to_dict() c['memory'].log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'], trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec, trade_thesis['reasoning'], context_snapshot) send_ntfy_notification(self.ntfy_topic, trade_thesis) final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}' -> Mind Verdict: {mind_reasoning}" status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": input_sequence.iloc[-1]['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info.get('time_str', 'N/A')})"} with open('status.json', 'w') as f: json.dump(status, f) print(f"BODY: Analysis complete. Signal: {final_action}.") def reflect_cycle(self): print(f"MIND: [{pd.Timestamp.now(tz='UTC')}] Waking up for reflection cycle..."); self.evaluate_and_introspect() self.comps['mind'].test_hypotheses(); print("MIND: Reflection cycle complete."); self.last_reflect_time = time.time() def run(self): print("--- AUTONOMY BRIDGE V3 (RMRL): Starting main adaptive loop. ---") while True: try: self.act_cycle(); if (time.time() - self.last_reflect_time) > self.reflect_interval: self.reflect_cycle() print(f"BRIDGE V3: Sleeping for {self.act_interval} seconds...") time.sleep(self.act_interval) except Exception as e: print(f"AUTONOMY BRIDGE V3 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60) # --- MAIN WORKER LOOP --- def main_worker(): print("--- [Persistent Deep Mind Agent V23 (RMRL)] Worker Thread Started ---") api_key, hf_token = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN') REFLECT_INTERVAL_SECONDS = 3600 # 1 hour HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models" print("--- Downloading agent artifacts from Hugging Face Hub ---") try: for filename in ['calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json']: hf_hub_download(repo_id=HF_REPO_ID, filename=filename, token=hf_token, local_dir='.', local_dir_use_symlinks=False) print("--- All artifacts downloaded successfully. ---") except Exception as e: print(f"FATAL: Failed to download artifacts: {e}"); return print("Initializing FinBERT..."); finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert'); finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True) prediction_engine = PredictionCoreTransformer(); context_preprocessor = ContextVectorPreprocessor() live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120); live_data_store.start() if not prediction_engine.is_calibrated(): print("CRITICAL: Calibrated artifacts not found. Agent cannot run."); live_data_store.stop(); return prediction_engine.load_calibrated_artifacts() situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0}) strategy_manager = StrategyManager(situation_room, prediction_engine) memory = PersistentMemoryV5(reflect_interval=REFLECT_INTERVAL_SECONDS) mind = PersistentDeepMind(memory) bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5) change_detector = PageHinkley(); intrinsic_reward_gen = IntrinsicRewardGeneratorV2(bandit) components = {"live_data": live_data_store, "pred_engine": prediction_engine, "ctx_preprocessor": context_preprocessor, "strat_manager": strategy_manager, "bandit": bandit, "mind": mind, "memory": memory, "change_detector": change_detector, "intrinsic_rewards": intrinsic_reward_gen} bridge = AutonomyBridgeV3(components, reflect_interval_sec=REFLECT_INTERVAL_SECONDS); bridge.run() # --- GRADIO DASHBOARD AND STARTUP --- def get_latest_status(): if not os.path.exists('status.json'): return "Agent is calibrating...", "", "", "", "", "", "" with open('status.json', 'r') as f: status = json.load(f) return (f"Status: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A')) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧠 V23 Persistent Deep Mind (RMRL)") gr.Markdown("This agent has a **persistent memory** and discovers **deep causal links** by testing combinations of features. Its knowledge compounds, and it gracefully forgets outdated information over a 24-hour cycle.") all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V5', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]) secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING (check NTFY_TOPIC_V5)." gr.Markdown(f"**Secrets Status:** {secret_status}") refresh_btn = gr.Button("Refresh Status", variant="primary") status_output = gr.Textbox(label="Status", interactive=False) gr.Markdown("## Agent's Last Analysis (Body)") with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime") with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event") action_output = gr.Textbox(label="Final Signal / Action") reasoning_output = gr.Textbox(label="Full Reasoning (incl. Causal Mind's Verdict)", lines=4) refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output]) if __name__ == "__main__": if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]): print("FATAL: Core secrets are missing. Worker cannot start.") else: worker_thread = threading.Thread(target=main_worker, daemon=True); worker_thread.start() demo.launch()