Aurora / app.py
Badumetsibb's picture
Update app.py
3862789 verified
# app_v23_deep_mind.py - An RMRL agent with a persistent, deep-reasoning mind.
# --- Core Libraries ---
import pandas as pd
import numpy as np
import warnings
import joblib
import json
import os
import gradio as gr
import requests
import time
from datetime import datetime, timezone, timedelta
import pytz
import threading
import math
from huggingface_hub import hf_hub_download
import firebase_admin
from firebase_admin import credentials, db
from collections import defaultdict
import traceback
import itertools
# --- Environment and Dependencies ---
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')
# --- ML & DL Libraries ---
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import Sequence
from transformers import BertTokenizer, TFBertModel
# --- Live Data Fetching ---
from twelvedata import TDClient
EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json"
CACHE_DURATION_SECONDS = 600
_EVENT_CACHE = {"data": None, "timestamp": 0}
# --- UTILITY FUNCTIONS ---
def fetch_live_events_with_cache():
current_time = time.time()
if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS): return _EVENT_CACHE["data"]
try:
response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V23-Agent/1.0"}, timeout=10)
response.raise_for_status(); data = response.json(); _EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time; return data
except requests.RequestException as e: print(f"Error fetching event data: {e}"); return _EVENT_CACHE.get("data", [])
def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200):
try:
td = TDClient(apikey=api_key); ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC")
df = ts.as_pandas().sort_index(ascending=True); df.index.name = 'Datetime'; df.reset_index(inplace=True); return df
except Exception as e: print(f"Error fetching price data: {e}"); return pd.DataFrame()
def send_ntfy_notification(topic, trade_thesis):
if not topic: return
title = f"V23 Signal: {trade_thesis.get('action')} EUR/USD"; message = (f"Strategy: {trade_thesis.get('strategy')} ({trade_thesis.get('confidence')})\n" f"Reason: {trade_thesis.get('reasoning')}\n" f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}")
try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.")
except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}")
def sanitize_for_json(obj):
"""Recursively converts numpy types to native Python types for JSON serialization."""
if isinstance(obj, dict):
return {k: sanitize_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize_for_json(elem) for elem in obj]
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return obj
def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model):
price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True)
if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer')
else: price_features = price_features.tz_convert('UTC')
price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True)
events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"}
if not events.empty and 'date' in events.columns:
RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy()
events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True)
def parse_financial_number(s):
if not isinstance(s, str) or not s: return np.nan
s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s
if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]]
else: multiplier = 1.0
try: return float(val_str) * multiplier
except (ValueError, TypeError): return np.nan
if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0)
else: events['surprise'] = 0
events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('')
if not events.empty:
inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy()
processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1)
merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60))
all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame()
if not all_high_impact_events.empty:
upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]]
if not upcoming_events.empty:
next_event = upcoming_events.iloc[0]; time_to_next = (next_event.name - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"}
else: merged_data['time_to_event'] = 9999
df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9
time_diffs = df_index_sec.reshape(-1, 1) - event_times_sec[None, :]
merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600
else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999
merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int)
finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True)
return merged_data, next_event_info
class CausalReasoningNetwork:
def __init__(self, processed_data): self.data = processed_data.copy()
def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'):
if len(self.data) < 20: self.data['regime'] = 'N/A'; return self.data
atr = self.data[volatility_indicator]; low_vol_threshold, high_vol_threshold = atr.quantile(0.33), atr.quantile(0.66)
ema_slope = self.data[trend_indicator].diff(periods=3)
regimes = []
for i in range(len(self.data)):
atr_val, slope_val = atr.iloc[i], ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0
if pd.isna(atr_val) or pd.isna(low_vol_threshold) or pd.isna(high_vol_threshold): regimes.append('N/A'); continue
if atr_val > high_vol_threshold: regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT'
elif atr_val < low_vol_threshold: regime = 'RANGING'
else: regime = 'CHOPPY'
regimes.append(regime)
self.data['regime'] = regimes; return self.data
class RuleBasedSituationRoom:
def __init__(self, params): self.params = params
def generate_thesis(self, predictions, sequence_df):
latest_data = sequence_df.iloc[-1]; current_price = latest_data['close']
if not predictions:
dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL"
action, confidence, strategy, reasoning = dir_ema, "LOW", "ema_crossover", f"Simple EMA Crossover ({dir_ema})."
else:
dir_5m = "BUY" if predictions['5m'] > current_price else "SELL"; dir_15m = "BUY" if predictions['15m'] > current_price else "SELL"; dir_1h = "BUY" if predictions['1h'] > current_price else "SELL"
action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "predictive_rule_based", "Prediction horizons diverge."
if dir_5m == dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence ({dir_5m}).", "predictive_rule_based"
elif dir_5m == dir_15m: action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "predictive_rule_based"
elif dir_15m == dir_1h: action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "predictive_rule_based"
if action == "NO_TRADE": return {"action": "NO_TRADE", "confidence": "LOW", "strategy": strategy, "reasoning": reasoning}
atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001
sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0)
if confidence == "MEDIUM": tp_mult *= 0.75
if confidence == "LOW": tp_mult *= 0.5
if action == "BUY": entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr)
else: entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr)
return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy":strategy}
### V23 EVOLUTION: The PersistentDeepMind ###
class PersistentDeepMind:
def __init__(self, memory):
self.memory = memory
self.causal_rulebook = self.load_verified_rules()
print(f"✅ PersistentDeepMind initialized. Loaded {len(self.causal_rulebook)} verified causal rules.")
def load_verified_rules(self):
verified_rules = self.memory.get_verified_rules()
return verified_rules if verified_rules else {}
def review_trade(self, trade_thesis, current_context_df):
action, strategy = trade_thesis.get('action'), trade_thesis.get('strategy')
current_context = self._discretize_context(current_context_df.iloc[-1], current_context_df)
current_context['strategy'] = strategy
if action == 'NO_TRADE': return True, "Thesis is NO_TRADE.", 0.0
if not current_context.get('regime') or current_context['regime'] == 'N/A': return False, f"❌ VETOED: 'Regime Blindness'.", 0.0
causal_alignment_bonus = 0.0
for rule_id, rule in self.causal_rulebook.items():
if self._context_matches_rule(current_context, rule):
if rule['outcome'] == 'success':
causal_alignment_bonus = rule.get('confidence', 0.1)
print(f"MIND: Aligning with positive rule: {rule['text']}")
elif rule['outcome'] == 'failure' and rule.get('confidence', 0.8): # Veto only for very high confidence failures
return False, f"❌ VETOED: Matches high-confidence failure rule: {rule['text']}", 0.0
return True, f"✅ APPROVED: No high-confidence negative rules matched.", causal_alignment_bonus
def _context_matches_rule(self, current_context, rule):
### V23 FIX: Correctly check the top-level strategy ###
if current_context.get('strategy') != rule.get('strategy'):
return False
for feature, condition in rule['conditions'].items():
if current_context.get(feature) != condition.get('eq'): return False
return True
def _discretize_context(self, context_series, historical_df):
context = context_series.to_dict()
context['rsi_state'] = "overbought" if context.get('RSI', 50) > 70 else "oversold" if context.get('RSI', 50) < 30 else "neutral"
if not historical_df.empty and 'ATR' in historical_df.columns:
low_thresh, high_thresh = historical_df['ATR'].quantile(0.33), historical_df['ATR'].quantile(0.66)
current_atr = context.get('ATR', 0)
if current_atr < low_thresh: context['atr_quantile'] = 'low'
elif current_atr > high_thresh: context['atr_quantile'] = 'high'
else: context['atr_quantile'] = 'medium'
else: context['atr_quantile'] = 'unknown'
context['is_trading_hours'] = 1 if context.get('session_ny', 0) == 1 or context.get('session_london', 0) == 1 else 0
context['has_upcoming_event'] = 1 if context.get('time_to_event', 9999) < 1 else 0
return context
def introspect_episode(self, evaluated_signal, last_context_df):
if last_context_df.empty: return
context_snapshot = self._discretize_context(last_context_df.iloc[-1], last_context_df)
pnl, strategy = evaluated_signal.get('pnl'), evaluated_signal.get('strategy_chosen')
if pnl is None or not strategy: return
hypothesis_data = {"pnl": pnl, "strategy": strategy, **context_snapshot}
self.memory.log_hypothesis(hypothesis_data)
def test_hypotheses(self):
print("MIND: Starting Deep Knowledge Consolidation..."); all_episodes = self.memory.get_all_hypotheses()
if not all_episodes or len(all_episodes) < 30: # Higher data requirement for deep analysis
print("MIND: Not enough historical data for robust deep analysis."); return
df = pd.DataFrame(all_episodes)
if 'strategy' not in df.columns or 'regime' not in df.columns: return
df.dropna(subset=['strategy', 'regime', 'pnl'], inplace=True)
INTERESTING_FEATURES = ['rsi_state', 'atr_quantile', 'is_trading_hours', 'has_upcoming_event']
current_rules = self.load_verified_rules()
newly_discovered_patterns = {}
# --- V23 EVOLUTION: Deep Causal Discovery (testing combinations) ---
for r in range(1, 3): # Test single features (r=1) and pairs of features (r=2)
for combo in itertools.combinations(INTERESTING_FEATURES, r):
grouping_keys = ['strategy', 'regime'] + list(combo)
if not all(key in df.columns for key in grouping_keys): continue
base_groups = df.groupby(grouping_keys)
for keys, group_df in base_groups:
if len(group_df) < 7: continue # Need sufficient evidence for multi-feature rules
mean_pnl = group_df['pnl'].mean()
if abs(mean_pnl) > 0.00015: # Stricter threshold for significance
# Construct a unique ID for this combination
rule_id = "|".join(map(str, keys))
newly_discovered_patterns[rule_id] = {
'mean_pnl': mean_pnl,
'evidence_count': len(group_df),
'keys': keys,
'combo': combo
}
now_iso = datetime.now(timezone.utc).isoformat()
for rule_id, new_evidence in newly_discovered_patterns.items():
impact = new_evidence['mean_pnl']
### V23 FIX: Principled Confidence Scaling [0, 1] ###
REASONABLE_MAX_IMPACT = 0.001 # A 5-pip move, a strong signal
confidence = min(abs(impact) / REASONABLE_MAX_IMPACT, 1.0)
if rule_id in current_rules: # Reinforce existing rule
rule = current_rules[rule_id]
rule['evidence_count'] += new_evidence['evidence_count']
rule['total_impact'] = (rule['total_impact'] * rule['evidence_count'] + impact * new_evidence['evidence_count']) / rule['evidence_count']
rule['confidence'] = (rule['confidence'] * 0.8) + (confidence * 0.2) # Smoothed update
rule['last_validated'] = now_iso
print(f"MIND: REINFORCED rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})")
else: # Discover new rule
outcome = "success" if impact > 0 else "failure"
strategy, regime = new_evidence['keys'][0], new_evidence['keys'][1]
conditions = {'regime': {'eq': regime}}
text_conditions = []
for i, feature_name in enumerate(new_evidence['combo']):
feature_value = new_evidence['keys'][2+i]
conditions[feature_name] = {'eq': feature_value}
text_conditions.append(f"'{feature_name}' is '{feature_value}'")
rule_text = f"'{strategy}' in '{regime}' when {' AND '.join(text_conditions)} -> {outcome}"
rule = {'rule_id': rule_id, 'text': rule_text, 'strategy': strategy, 'outcome': outcome, 'confidence': confidence, 'evidence_count': new_evidence['evidence_count'], 'total_impact': impact * new_evidence['evidence_count'], 'conditions': conditions, 'last_validated': now_iso}
current_rules[rule_id] = rule
print(f"MIND: DISCOVERED DEEP rule: {rule['text']}")
### V23 FIX: Patient Forgetting (25-hour cycle) ###
rules_to_prune = []
CONFIDENCE_DECAY_PERIOD_SECONDS = 90000 # ~25 hours
for rule_id, rule in current_rules.items():
last_seen = datetime.fromisoformat(rule['last_validated'])
if (datetime.now(timezone.utc) - last_seen).total_seconds() > CONFIDENCE_DECAY_PERIOD_SECONDS:
rule['confidence'] *= 0.5 # Harsher decay for truly old rules
print(f"MIND: Decaying confidence for old rule: {rule['text']} (New Confidence: {rule['confidence']:.2f})")
if rule['confidence'] < 0.05:
rules_to_prune.append(rule_id)
for rule_id in rules_to_prune:
print(f"MIND: PRUNED obsolete rule: {current_rules[rule_id]['text']}")
del current_rules[rule_id]
self.memory.save_verified_rules(current_rules); self.causal_rulebook = self.load_verified_rules()
print(f"MIND: Deep Mind consolidated knowledge base ({len(current_rules)} rules).")
class PredictionCoreTransformer:
def __init__(self, sequence_length=48): self.scaler=None; self.model=None; self.sequence_length=sequence_length; self.feature_names=None; self.calibrated_model_path='calibrated_model.keras'; self.calibrated_scaler_path='calibrated_scaler.joblib'; self.calibrated_features_path='calibrated_features.json'
def is_calibrated(self): return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path])
def load_calibrated_artifacts(self):
print("--- Loading pre-calibrated artifacts ---"); self.model = load_model(self.calibrated_model_path); self.scaler = joblib.load(self.calibrated_scaler_path)
with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f)
print("--- Instant startup successful ---"); return True
def predict_single(self, input_sequence):
if not self.scaler: raise RuntimeError("Agent not calibrated.")
features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model)
reshaped = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped, verbose=0)
preds = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {preds['5m']:.5f} | 15m: {preds['15m']:.5f} | 1h: {preds['1h']:.5f}"
return preds, preds_str
class LinUCBBandit:
def __init__(self, strategies, d, alpha=1.0, regularization=1.0): self.strategies=list(strategies); self.d=d; self.alpha=alpha; self.reg=regularization; self.A={s:(self.reg*np.eye(self.d)) for s in self.strategies}; self.b={s:np.zeros(self.d) for s in self.strategies}; self._last_decision_info={}
def select(self, context_vector):
scores={}; uncertainties={}; mean_rewards={}
for s in self.strategies:
A_inv=np.linalg.inv(self.A[s]); theta=A_inv.dot(self.b[s]); mean_reward=theta.dot(context_vector)
uncertainty=self.alpha*math.sqrt(context_vector.dot(A_inv).dot(context_vector))
scores[s]=mean_reward+uncertainty; uncertainties[s]=uncertainty; mean_rewards[s]=mean_reward
chosen=max(scores,key=scores.get)
self._last_decision_info={'scores':scores,'uncertainties':uncertainties,'mean_rewards':mean_rewards,'chosen':chosen}; return chosen
def get_last_decision_info(self): return self._last_decision_info
def update(self, strategy, context_vector, reward): x = context_vector.reshape(-1); self.A[strategy] += np.outer(x, x); self.b[strategy] += reward * x
def increase_exploration(self, factor=1.2): self.alpha *= factor; print(f"CONCEPT DRIFT: Increased alpha to {self.alpha:.2f}")
class IntrinsicRewardGeneratorV2:
def __init__(self, bandit): self.bandit = bandit; print("✅ IntrinsicRewardGeneratorV2 initialized.")
def calculate_reward(self, extrinsic_reward, chosen_strategy, causal_alignment_bonus):
last_decision = self.bandit.get_last_decision_info()
if not last_decision: return 0.0
uncertainty_reward = last_decision['uncertainties'].get(chosen_strategy, 0.0)
predicted_reward = last_decision['mean_rewards'].get(chosen_strategy, 0.0)
surprise_reward = abs(extrinsic_reward - predicted_reward)
intrinsic_reward = (0.2 * uncertainty_reward) + (0.4 * surprise_reward) + (0.4 * causal_alignment_bonus)
return np.clip(intrinsic_reward, 0, 0.5)
class PersistentMemoryV5:
def __init__(self, reflect_interval):
self.db_refs = {}; self.reflect_interval = reflect_interval
try:
sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL')
if not all([sa_key_json, db_url]): raise ValueError("Firebase secrets not set.")
cred = credentials.Certificate(json.loads(sa_key_json));
if not firebase_admin._apps: firebase_admin.initialize_app(cred, {'databaseURL': db_url})
self.db_refs['signals']=db.reference('signals_v5'); self.db_refs['hypotheses']=db.reference('knowledge_base_v5/hypotheses'); self.db_refs['rules']=db.reference('knowledge_base_v5/verified_rules')
print("✅ PersistentMemoryV5 initialized at /signals_v5.")
except Exception as e: print(f"CRITICAL ERROR - PersistentMemoryV5 failed: {e}"); self.db_refs = {}
def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, reasoning, context_snapshot):
if 'signals' not in self.db_refs: return None
try:
signal_data = {"timestamp_entry":ts,"strategy_chosen":strategy,"action":action,"entry_price":float(entry),"stop_loss":float(sl),"take_profit":float(tp),"reasoning_initial":reasoning,"context_vector":[float(x) for x in context_vector.tolist()],"context_snapshot":context_snapshot}
new_signal_ref = self.db_refs['signals'].push(signal_data); event_id=new_signal_ref.key; new_signal_ref.update({"event_id":event_id})
print(f"MEMORY V5: Logged '{action}' signal to signals_v5. ID: {event_id}"); return event_id
except Exception as e: print(f"MEMORY V5 ERROR: {e}"); return None
def log_hypothesis(self, hypothesis_data):
if 'hypotheses' in self.db_refs: self.db_refs['hypotheses'].push(hypothesis_data)
def get_all_hypotheses(self):
data = self.db_refs.get('hypotheses').get() if 'hypotheses' in self.db_refs else None
return list(data.values()) if data else []
def get_all_signals(self): return self.db_refs.get('signals').get() if 'signals' in self.db_refs else None
def get_verified_rules(self): return self.db_refs.get('rules').get() if 'rules' in self.db_refs else None
def save_verified_rules(self, rulebook):
if 'rules' in self.db_refs:
# --- FINAL FIX: Sanitize the rulebook before saving ---
sanitized_rulebook = sanitize_for_json(rulebook)
self.db_refs['rules'].set(sanitized_rulebook)
print("MEMORY V5: Saved persistent rules to knowledge_base_v5.")
def get_signal_ref(self): return self.db_refs.get('signals')
class PageHinkley:
def __init__(self, delta=0.005, lambda_=50, alpha=1-1e-3): self.mean,self.delta,self.lambda_,self.alpha,self.cumulative=0.0,delta,lambda_,alpha,0.0
def update(self, x): self.mean=self.mean*self.alpha+x*(1-self.alpha); self.cumulative=max(0,self.cumulative+x-self.mean-self.delta); return self.cumulative>self.lambda_
class StrategyManager:
def __init__(self, situation_room, prediction_engine): self.situation_room=situation_room; self.prediction_engine=prediction_engine
def list_strategies(self): return {"predictive_rule_based":self.predictive_strategy, "ema_crossover":self.ema_crossover_strategy}
def predictive_strategy(self, seq): preds_dict, preds_str=self.prediction_engine.predict_single(seq); return self.situation_room.generate_thesis(preds_dict, seq), preds_str
def ema_crossover_strategy(self, seq): return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)"
CONTEXT_FEATURES=['close','ATR','EMA_20','RSI','time_since_event','time_to_event','hour_of_day']
REGIME_COLS=['regime_TRENDING','regime_BREAKOUT','regime_CHOPPY','regime_RANGING']
CONTEXT_DIMENSION=len(CONTEXT_FEATURES)+len(REGIME_COLS)
class ContextVectorPreprocessor:
def __init__(self): self.feature_names=CONTEXT_FEATURES; self.regime_cols=REGIME_COLS
def build_context_vector(self, df_with_regime):
if 'regime' not in df_with_regime.columns: raise ValueError("DF must have 'regime' column.")
context_df=df_with_regime[self.feature_names]; scaler=MinMaxScaler(feature_range=(-1,1)); scaled_context_data=scaler.fit_transform(context_df)
scaled_numeric_vec=scaled_context_data[-1].flatten(); last_regime=df_with_regime.iloc[-1]['regime']; regime_vec=np.zeros(len(self.regime_cols))
try: regime_idx=self.regime_cols.index(f"regime_{last_regime}"); regime_vec[regime_idx]=1
except ValueError: pass
return np.concatenate([scaled_numeric_vec, regime_vec])
class LiveDataStore:
def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120): self.api_key=api_key; self.finbert_tokenizer=finbert_tokenizer; self.finbert_model=finbert_model; self.update_interval_seconds=update_interval_seconds; self._data_lock=threading.Lock(); self._latest_features=pd.DataFrame(); self._next_event_info={"title": "N/A"}; self._stop_event=threading.Event(); self._update_thread=None
def _fetch_and_update(self, output_size=1000):
try:
price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size)
if price_data.empty: return
events_data = fetch_live_events_with_cache()
features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model)
with self._data_lock: self._latest_features=features; self._next_event_info=next_event_info
except Exception as e: print(f"LiveDataStore ERROR: {e}")
self._stop_event.wait(self.update_interval_seconds)
def _update_loop(self):
while not self._stop_event.is_set(): self._fetch_and_update()
def start(self):
if self._update_thread is None or not self._update_thread.is_alive(): print("LiveDataStore: Starting background thread."); self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True); self._update_thread.start()
print("LiveDataStore: Performing initial data fetch..."); self._fetch_and_update()
def stop(self):
if self._update_thread and self._update_thread.is_alive(): print("LiveDataStore: Stopping background thread."); self._stop_event.set(); self._update_thread.join()
def get_latest_data(self, num_bars=None):
with self._data_lock:
if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info
return self._latest_features.copy(), self._next_event_info
def get_raw_price_data(self, num_bars=None):
with self._data_lock:
if self._latest_features.empty: return pd.DataFrame()
df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy()
if num_bars: df_slice = df_slice.iloc[-num_bars:]
df_slice.reset_index(inplace=True); df_slice['Datetime'] = pd.to_datetime(df_slice['Datetime']).dt.tz_localize('UTC') if df_slice['Datetime'].dt.tz is None else df_slice['Datetime'].dt.tz_convert('UTC')
return df_slice
class AutonomyBridgeV3:
def __init__(self, components, act_interval_sec=300, reflect_interval_sec=3600):
self.comps=components; self.act_interval=act_interval_sec; self.reflect_interval=reflect_interval_sec; self.last_reflect_time=0; self.ntfy_topic=os.environ.get('NTFY_TOPIC_V5'); self._last_context_df=pd.DataFrame()
def evaluate_and_introspect(self):
print("BRIDGE V3: Evaluating signals for causal introspection..."); db_ref = self.comps['memory'].get_signal_ref()
if not db_ref: return
now_utc = pd.Timestamp.now(tz='UTC')
try:
all_signals = self.comps['memory'].get_all_signals();
if not all_signals: return
live_price_history = self.comps['live_data'].get_raw_price_data(num_bars=288)
if live_price_history.empty: return
for key, signal in all_signals.items():
if signal.get('reward') is not None: continue
if not (ts_str := signal.get('timestamp_entry')): continue
signal_time = pd.to_datetime(ts_str)
if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue
entry,sl,tp,action = float(signal['entry_price']),float(signal['stop_loss']),float(signal['take_profit']),signal['action']
relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time]
if relevant_bars.empty: continue
outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close']
for _, bar in relevant_bars.iterrows():
if (action == 'BUY' and bar['low'] <= sl) or (action == 'SELL' and bar['high'] >= sl): exit_price, outcome_reason = sl, "SL"; break
if (action == 'BUY' and bar['high'] >= tp) or (action == 'SELL' and bar['low'] <= tp): exit_price, outcome_reason = tp, "TP"; break
extrinsic_reward = np.clip(((exit_price - entry) if action == 'BUY' else (entry - exit_price)) / 0.005, -1.0, 1.0)
intrinsic_reward = self.comps['intrinsic_rewards'].calculate_reward(extrinsic_reward, signal['strategy_chosen'], 0.0)
total_reward = extrinsic_reward + intrinsic_reward
self.comps['bandit'].update(signal['strategy_chosen'], np.array(signal['context_vector']), total_reward)
if self.comps['change_detector'].update(-extrinsic_reward): self.comps['bandit'].increase_exploration()
pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price)
update_payload = {'pnl':pnl,'reward':total_reward,'outcome_reason':outcome_reason,'exit_price':exit_price,'timestamp_exit':now_utc.isoformat()}
db_ref.child(key).update(update_payload)
context_df_for_introspection = pd.DataFrame([signal['context_snapshot']]) if 'context_snapshot' in signal else pd.DataFrame()
self.comps['mind'].introspect_episode({**signal, **update_payload}, context_df_for_introspection)
except Exception as e: print(f"BRIDGE V3 EVALUATOR ERROR: {e}"); traceback.print_exc()
def act_cycle(self):
print(f"BODY: [{pd.Timestamp.now(tz='UTC')}] Waking up..."); c = self.comps
features, next_event_info = c['live_data'].get_latest_data(num_bars=c['pred_engine'].sequence_length + 50)
if features.empty or len(features) < c['pred_engine'].sequence_length: print("BODY: Not enough data."); return
causal_engine = CausalReasoningNetwork(features.copy()); features_with_regime = causal_engine.identify_volatility_regimes()
ctx_vec = c['ctx_preprocessor'].build_context_vector(features_with_regime); self._last_context_df = features_with_regime.copy()
input_sequence = features_with_regime.iloc[-c['pred_engine'].sequence_length:]
chosen_strategy_name = c['bandit'].select(ctx_vec)
trade_thesis, preds_str = c['strat_manager'].list_strategies()[chosen_strategy_name](input_sequence)
is_tradeable, mind_reasoning, causal_bonus = c['mind'].review_trade(trade_thesis, self._last_context_df)
final_action = trade_thesis['action'] if is_tradeable and trade_thesis['action'] != 'NO_TRADE' else "NO TRADE"
if final_action in ["BUY", "SELL"]:
ts = pd.Timestamp.now(tz='UTC').isoformat(); context_snapshot = self._last_context_df.iloc[-1].to_dict()
c['memory'].log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'], trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec, trade_thesis['reasoning'], context_snapshot)
send_ntfy_notification(self.ntfy_topic, trade_thesis)
final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}' -> Mind Verdict: {mind_reasoning}"
status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": input_sequence.iloc[-1]['regime'], "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info.get('time_str', 'N/A')})"}
with open('status.json', 'w') as f: json.dump(status, f)
print(f"BODY: Analysis complete. Signal: {final_action}.")
def reflect_cycle(self):
print(f"MIND: [{pd.Timestamp.now(tz='UTC')}] Waking up for reflection cycle..."); self.evaluate_and_introspect()
self.comps['mind'].test_hypotheses(); print("MIND: Reflection cycle complete."); self.last_reflect_time = time.time()
def run(self):
print("--- AUTONOMY BRIDGE V3 (RMRL): Starting main adaptive loop. ---")
while True:
try:
self.act_cycle();
if (time.time() - self.last_reflect_time) > self.reflect_interval: self.reflect_cycle()
print(f"BRIDGE V3: Sleeping for {self.act_interval} seconds...")
time.sleep(self.act_interval)
except Exception as e: print(f"AUTONOMY BRIDGE V3 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60)
# --- MAIN WORKER LOOP ---
def main_worker():
print("--- [Persistent Deep Mind Agent V23 (RMRL)] Worker Thread Started ---")
api_key, hf_token = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN')
REFLECT_INTERVAL_SECONDS = 3600 # 1 hour
HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models"
print("--- Downloading agent artifacts from Hugging Face Hub ---")
try:
for filename in ['calibrated_model.keras', 'calibrated_scaler.joblib', 'calibrated_features.json']: hf_hub_download(repo_id=HF_REPO_ID, filename=filename, token=hf_token, local_dir='.', local_dir_use_symlinks=False)
print("--- All artifacts downloaded successfully. ---")
except Exception as e: print(f"FATAL: Failed to download artifacts: {e}"); return
print("Initializing FinBERT..."); finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert'); finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True)
prediction_engine = PredictionCoreTransformer(); context_preprocessor = ContextVectorPreprocessor()
live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120); live_data_store.start()
if not prediction_engine.is_calibrated(): print("CRITICAL: Calibrated artifacts not found. Agent cannot run."); live_data_store.stop(); return
prediction_engine.load_calibrated_artifacts()
situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0})
strategy_manager = StrategyManager(situation_room, prediction_engine)
memory = PersistentMemoryV5(reflect_interval=REFLECT_INTERVAL_SECONDS)
mind = PersistentDeepMind(memory)
bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5)
change_detector = PageHinkley(); intrinsic_reward_gen = IntrinsicRewardGeneratorV2(bandit)
components = {"live_data": live_data_store, "pred_engine": prediction_engine, "ctx_preprocessor": context_preprocessor, "strat_manager": strategy_manager, "bandit": bandit, "mind": mind, "memory": memory, "change_detector": change_detector, "intrinsic_rewards": intrinsic_reward_gen}
bridge = AutonomyBridgeV3(components, reflect_interval_sec=REFLECT_INTERVAL_SECONDS); bridge.run()
# --- GRADIO DASHBOARD AND STARTUP ---
def get_latest_status():
if not os.path.exists('status.json'): return "Agent is calibrating...", "", "", "", "", "", ""
with open('status.json', 'r') as f: status = json.load(f)
return (f"Status: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A'))
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🧠 V23 Persistent Deep Mind (RMRL)")
gr.Markdown("This agent has a **persistent memory** and discovers **deep causal links** by testing combinations of features. Its knowledge compounds, and it gracefully forgets outdated information over a 24-hour cycle.")
all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V5', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']])
secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING (check NTFY_TOPIC_V5)."
gr.Markdown(f"**Secrets Status:** {secret_status}")
refresh_btn = gr.Button("Refresh Status", variant="primary")
status_output = gr.Textbox(label="Status", interactive=False)
gr.Markdown("## Agent's Last Analysis (Body)")
with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime")
with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event")
action_output = gr.Textbox(label="Final Signal / Action")
reasoning_output = gr.Textbox(label="Full Reasoning (incl. Causal Mind's Verdict)", lines=4)
refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output])
if __name__ == "__main__":
if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']]):
print("FATAL: Core secrets are missing. Worker cannot start.")
else:
worker_thread = threading.Thread(target=main_worker, daemon=True); worker_thread.start()
demo.launch()