aurora_v3 / app.py
Badumetsibb's picture
Update app.py
a2d93a8 verified
# app_v20_complete_and_final.py - The full, working script with the definitive Keras fix.
# --- Core Libraries ---
import pandas as pd
import numpy as np
import warnings
import joblib
import json
import os
import gradio as gr
import requests
import time
from datetime import datetime
import pytz
import threading
import math
from huggingface_hub import hf_hub_download
import firebase_admin
from firebase_admin import credentials, db
import traceback
# --- Environment and Dependencies ---
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')
# --- CRITICAL FIX: Force the entire environment to use the Keras 2 compatibility layer ---
# This line MUST come before the tensorflow and transformers imports.
os.environ['TF_USE_LEGACY_KERAS'] = '1'
# --- Machine Learning & Deep Learning Libraries ---
import tensorflow as tf
from tensorflow import keras
from keras import layers, Model
from sklearn.preprocessing import MinMaxScaler
from keras.utils import Sequence
# --- NLP & Self-Evolution Libraries ---
try:
from transformers import BertTokenizer, TFBertModel, pipeline
import torch
NLP_LIBRARIES_AVAILABLE = True
except ImportError:
NLP_LIBRARIES_AVAILABLE = False
# --- Live Data Fetching Configuration ---
from twelvedata import TDClient
EVENT_JSON_URL = "https://nfs.faireconomy.media/ff_calendar_thisweek.json"
CACHE_DURATION_SECONDS = 600
_EVENT_CACHE = {"data": None, "timestamp": 0}
# --- ALL CLASS AND FUNCTION DEFINITIONS ---
class CausalReasoningNetwork:
def __init__(self, processed_data):
self.data = processed_data.copy()
def identify_volatility_regimes(self, volatility_indicator='ATR', trend_indicator='EMA_20'):
atr = self.data[volatility_indicator]
low_vol_threshold = atr.quantile(0.33)
high_vol_threshold = atr.quantile(0.66)
ema_slope = self.data[trend_indicator].diff(periods=3)
regimes = []
for i in range(len(self.data)):
atr_val = atr.iloc[i]
slope_val = ema_slope.iloc[i] if pd.notna(ema_slope.iloc[i]) else 0
if atr_val > high_vol_threshold:
regime = 'TRENDING' if abs(slope_val) > ema_slope.quantile(0.75) else 'BREAKOUT'
regimes.append(regime)
elif atr_val < low_vol_threshold:
regimes.append('RANGING')
else:
regimes.append('CHOPPY')
self.data['regime'] = regimes
return self.data
class RuleBasedSituationRoom:
def __init__(self, params):
self.params = params
def generate_thesis(self, predictions, sequence_df):
latest_data = sequence_df.iloc[-1]
current_price = latest_data['close']
if not predictions:
dir_ema = "BUY" if current_price > latest_data['EMA_20'] else "SELL"
action, confidence, strategy, reasoning = dir_ema, "LOW", "Trend Following", f"Simple EMA Crossover ({dir_ema})."
else:
dir_5m = "BUY" if predictions['5m'] > current_price else "SELL"
dir_15m = "BUY" if predictions['15m'] > current_price else "SELL"
dir_1h = "BUY" if predictions['1h'] > current_price else "SELL"
action, confidence, reasoning, strategy = "NO_TRADE", "LOW", "Prediction horizons diverge.", "Range Play"
if dir_5m == dir_15m == dir_1h:
action, confidence, reasoning, strategy = dir_5m, "HIGH", f"Strong confluence across all horizons ({dir_5m}).", "Trend Following"
elif dir_5m == dir_15m:
action, confidence, reasoning, strategy = dir_5m, "MEDIUM", f"Short/Medium-term confluence ({dir_5m}).", "Scalp"
elif dir_15m == dir_1h:
action, confidence, reasoning, strategy = dir_15m, "LOW", f"Medium/Long-term confluence ({dir_15m}).", "Trend Following"
if action == "NO_TRADE":
return {"action": "NO_TRADE", "confidence": "LOW", "strategy_type": strategy, "reasoning": reasoning}
atr = latest_data['ATR'] if pd.notna(latest_data['ATR']) and latest_data['ATR'] > 0 else 0.0001
sl_mult, tp_mult = self.params.get('sl_atr_multiplier', 2.0), self.params.get('tp_atr_multiplier', 4.0)
if confidence == "MEDIUM": tp_mult *= 0.75
if confidence == "LOW": tp_mult *= 0.5
if action == "BUY":
entry, stop_loss, take_profit = current_price, current_price - (sl_mult * atr), current_price + (tp_mult * atr)
else:
entry, stop_loss, take_profit = current_price, current_price + (sl_mult * atr), current_price - (tp_mult * atr)
return {"action":action, "entry":f"{entry:.5f}", "stop_loss":f"{stop_loss:.5f}", "take_profit":f"{take_profit:.5f}", "confidence":confidence, "reasoning":reasoning, "strategy_type":strategy}
class MarketRegimeFilter:
def __init__(self):
self.allowed_strategies = {
'TRENDING': ['Trend Following'],
'BREAKOUT': ['Trend Following', 'Scalp'],
'CHOPPY': ['Scalp', 'Mean Reversion'],
'RANGING': ['Mean Reversion']
}
def should_trade(self, current_regime, trade_thesis):
if trade_thesis['action'] == 'NO_TRADE':
return False
return trade_thesis['strategy_type'] in self.allowed_strategies.get(current_regime, [])
def fetch_live_events_with_cache():
current_time = time.time()
if _EVENT_CACHE["data"] and (current_time - _EVENT_CACHE["timestamp"] < CACHE_DURATION_SECONDS):
return _EVENT_CACHE["data"]
try:
response = requests.get(EVENT_JSON_URL, headers={"User-Agent": "V20-Agent/1.0"}, timeout=10)
response.raise_for_status()
data = response.json()
_EVENT_CACHE["data"], _EVENT_CACHE["timestamp"] = data, current_time
return data
except requests.RequestException as e:
print(f"Error fetching event data: {e}")
return _EVENT_CACHE.get("data", [])
def fetch_twelvedata_prices(api_key, symbol='EUR/USD', interval='5min', output_size=200):
try:
td = TDClient(apikey=api_key)
ts = td.time_series(symbol=symbol, interval=interval, outputsize=output_size, timezone="UTC")
df = ts.as_pandas().sort_index(ascending=True)
df.index.name = 'Datetime'
df.reset_index(inplace=True)
return df
except Exception as e:
print(f"Error fetching price data: {e}")
return pd.DataFrame()
def create_feature_set_for_inference(price_df, events_json, finbert_tokenizer, finbert_model):
price_features = price_df.copy(); price_features['Datetime'] = pd.to_datetime(price_features['Datetime']); price_features.set_index('Datetime', inplace=True)
if price_features.index.tz is None: price_features = price_features.tz_localize('UTC', ambiguous='infer')
else: price_features = price_features.tz_convert('UTC')
price_features.rename(columns={'close': 'Price', 'open':'Open', 'high':'High', 'low':'Low'}, inplace=True); delta = price_features['Price'].diff(); gain = (delta.where(delta > 0, 0)).rolling(window=14).mean(); loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean(); price_features['RSI'] = 100 - (100 / (1 + (gain / loss))); price_features['EMA_20'] = price_features['Price'].ewm(span=20, adjust=False).mean(); high_low = price_features['High'] - price_features['Low']; high_close = np.abs(price_features['High'] - price_features['Price'].shift()); low_close = np.abs(price_features['Low'] - price_features['Price'].shift()); tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1); price_features['ATR'] = tr.rolling(window=14).mean(); price_features.rename(columns={'Price':'close', 'Open':'open', 'High':'high', 'Low':'low'}, inplace=True)
events = pd.DataFrame(events_json); processed_events = pd.DataFrame(); next_event_info = {"title": "None within 24h", "time_str": "N/A"}
if not events.empty and 'date' in events.columns:
RELEVANT_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY']; events = events[events['country'].isin(RELEVANT_CURRENCIES)].copy()
events['datetime'] = pd.to_datetime(events['date'], utc=True, errors='coerce'); events.dropna(subset=['datetime'], inplace=True); events.set_index('datetime', inplace=True); events.sort_index(inplace=True)
def parse_financial_number(s):
if not isinstance(s, str) or not s: return np.nan
s = s.strip().upper(); multipliers = {'B': 1e9, 'M': 1e6, 'K': 1e3, '%': 0.01}; val_str = s
if s.endswith(tuple(multipliers.keys())): val_str = s[:-1]; multiplier = multipliers[s[-1]]
else: multiplier = 1.0
try: return float(val_str) * multiplier
except (ValueError, TypeError): return np.nan
if 'actual' in events.columns and 'forecast' in events.columns: events['surprise'] = (events['actual'].apply(parse_financial_number) - events['forecast'].apply(parse_financial_number)).fillna(0)
else: events['surprise'] = 0
events['detail'] = events['title'].fillna('') + ' ' + events['country'].fillna('')
if not events.empty and NLP_LIBRARIES_AVAILABLE:
inputs = finbert_tokenizer(events['detail'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=64); embeddings = finbert_model(inputs).last_hidden_state[:, 0, :].numpy()
processed_events = pd.concat([events, pd.DataFrame(embeddings, columns=[f'finbert_{i}' for i in range(embeddings.shape[1])], index=events.index)], axis=1)
merged_data = pd.merge_asof(left=price_features.sort_index(), right=processed_events.sort_index(), left_index=True, right_index=True, direction='backward', tolerance=pd.Timedelta(minutes=60))
all_high_impact_events = events[(events['impact'] == 'High')] if 'impact' in events.columns and not events.empty else pd.DataFrame()
if not all_high_impact_events.empty:
upcoming_events = all_high_impact_events[all_high_impact_events.index > merged_data.index[-1]]
if not upcoming_events.empty:
next_event = upcoming_events.iloc[0]; next_event_time = next_event.name; time_to_next = (next_event_time - merged_data.index[-1]).total_seconds() / 3600.0; merged_data['time_to_event'] = time_to_next; next_event_info = {"title": f"{next_event.country} {next_event.title}", "time_str": f"in {time_to_next:.2f}h"}
else: merged_data['time_to_event'] = 9999
df_index_sec = merged_data.index.astype(np.int64).to_numpy() // 10**9; event_times_sec = all_high_impact_events.index.astype(np.int64).to_numpy() // 10**9; time_diffs = df_index_sec[:, None] - event_times_sec[None, :]; merged_data['time_since_event'] = np.min(np.where(time_diffs >= 0, time_diffs, np.inf), axis=1) / 3600
else: merged_data['time_since_event'] = 9999; merged_data['time_to_event'] = 9999
merged_data.replace([np.inf, -np.inf], 9999, inplace=True); merged_data['hour_of_day'] = merged_data.index.hour; merged_data['day_of_week'] = merged_data.index.dayofweek; merged_data['session_london'] = ((merged_data['hour_of_day'] >= 7) & (merged_data['hour_of_day'] <= 16)).astype(int); merged_data['session_ny'] = ((merged_data['hour_of_day'] >= 12) & (merged_data['hour_of_day'] <= 21)).astype(int); merged_data['session_asian'] = ((merged_data['hour_of_day'] >= 22) | (merged_data['hour_of_day'] <= 7)).astype(int)
finbert_cols = [col for col in merged_data.columns if 'finbert_' in col]; cols_to_ffill = ['surprise', 'impact', 'title'] + finbert_cols; merged_data[cols_to_ffill] = merged_data[cols_to_ffill].ffill(); merged_data.fillna(0, inplace=True); merged_data.dropna(subset=['open', 'close', 'RSI'], inplace=True)
return merged_data, next_event_info
class PredictionCoreTransformer:
def __init__(self, sequence_length=48):
self.scaler = None; self.model = None; self.sequence_length = sequence_length; self.feature_names = None
self.calibrated_model_path = 'calibrated_model.keras'; self.calibrated_scaler_path = 'calibrated_scaler.joblib'; self.calibrated_features_path = 'calibrated_features.json'
def is_calibrated(self):
return all(os.path.exists(p) for p in [self.calibrated_model_path, self.calibrated_scaler_path, self.calibrated_features_path])
def load_calibrated_artifacts(self):
print("--- Loading pre-calibrated artifacts for instant startup ---")
self.model = keras.models.load_model(self.calibrated_model_path)
self.scaler = joblib.load(self.calibrated_scaler_path)
with open(self.calibrated_features_path, 'r') as f: self.feature_names = json.load(f)
print("--- Instant startup successful ---"); return True
def calibrate(self, base_model_path, calibration_data):
print("--- STARTING ONE-TIME AGENT CALIBRATION ---"); print(f"Loaded calibration data with {len(calibration_data)} rows.")
self.feature_names = [col for col in calibration_data.columns if col not in ['regime', 'close', 'open', 'high', 'low', 'event', 'impact', 'title', 'detail', 'country', 'date', 'currency', 'actual', 'forecast', 'previous']]
print(f"Identified {len(self.feature_names)} features for scaling and training.")
self.scaler = MinMaxScaler(feature_range=(0, 1)); features_to_scale_df = calibration_data[self.feature_names]; self.scaler.fit(features_to_scale_df)
print("New, compatible scaler has been fitted.")
self.model = keras.models.load_model(base_model_path)
print("Pre-trained model loaded successfully.")
print("Starting fine-tuning process...");
class CalibrationGenerator(Sequence):
def __init__(self, data, scaler, feature_names, seq_len):
self.data = data.copy(); self.scaler = scaler; self.feature_names = feature_names; self.seq_len = seq_len
self.data['target_5m'] = self.data['open'].shift(-1); self.data['target_15m'] = self.data['open'].shift(-3); self.data['target_1h'] = self.data['open'].shift(-12)
self.data.dropna(inplace=True); self.features_df = self.data[self.feature_names]; self.targets_df = self.data[['target_5m', 'target_15m', 'target_1h']]; self.scaled_features = self.scaler.transform(self.features_df); self.n_samples = len(self.scaled_features) - self.seq_len
def __len__(self): return self.n_samples
def __getitem__(self, idx):
seq_end = idx + self.seq_len; X = self.scaled_features[idx:seq_end].reshape(1, self.seq_len, len(self.feature_names))
y_5m, y_15m, y_1h = self.targets_df.iloc[seq_end - 1][['target_5m', 'target_15m', 'target_1h']]
return X, {'5m_output': np.array([y_5m]), '15m_output': np.array([y_15m]), '1h_output': np.array([y_1h])}
calibration_generator = CalibrationGenerator(calibration_data, self.scaler, self.feature_names, self.sequence_length)
self.model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), loss={'5m_output': 'mean_squared_error', '15m_output': 'mean_squared_error', '1h_output': 'mean_squared_error'})
self.model.fit(calibration_generator, epochs=3, verbose=1)
self.model.save(self.calibrated_model_path); joblib.dump(self.scaler, self.calibrated_scaler_path)
with open(self.calibrated_features_path, 'w') as f: json.dump(self.feature_names, f)
print("--- AGENT RE-CALIBRATION AND ARTIFACT SAVING COMPLETE ---"); return True
def predict_single(self, input_sequence):
if not self.scaler or not self.model: raise RuntimeError("Agent has not been calibrated. Cannot make predictions.")
features_for_model = input_sequence[self.feature_names]; scaled_features = self.scaler.transform(features_for_model); reshaped_sequence = scaled_features.reshape(1, self.sequence_length, len(self.feature_names)); predictions = self.model.predict(reshaped_sequence, verbose=0)
real_price_predictions = {"5m": predictions[0][0][0], "15m": predictions[1][0][0], "1h": predictions[2][0][0]}; preds_str = f"5m: {real_price_predictions['5m']:.5f} | 15m: {real_price_predictions['15m']:.5f} | 1h: {real_price_predictions['1h']:.5f}"
return real_price_predictions, preds_str
class LinUCBBandit:
def __init__(self, strategies, d, alpha=1.0, regularization=1.0):
self.strategies = list(strategies)
self.d = d
self.alpha = alpha
self.reg = regularization
self.A = {s: (self.reg * np.eye(self.d)) for s in self.strategies}
self.b = {s: np.zeros(self.d) for s in self.strategies}
def select(self, context_vector):
scores = {}
for s in self.strategies:
A_inv = np.linalg.inv(self.A[s])
theta = A_inv.dot(self.b[s])
mean_reward = theta.dot(context_vector)
uncertainty = self.alpha * math.sqrt(context_vector.dot(A_inv).dot(context_vector))
scores[s] = mean_reward + uncertainty
return max(scores, key=scores.get)
def update(self, strategy, context_vector, reward):
x = context_vector.reshape(-1)
if strategy not in self.A:
self.A[strategy] = (self.reg * np.eye(self.d))
self.b[strategy] = np.zeros(self.d)
self.A[strategy] += np.outer(x, x)
self.b[strategy] += reward * x
def increase_exploration(self, factor=1.2):
self.alpha *= factor
print(f"CONCEPT DRIFT ACTION: Increased bandit exploration alpha to {self.alpha:.2f}")
class PageHinkley:
def __init__(self, delta=0.005, lambda_=50, alpha=1 - 1e-3):
self.mean, self.delta, self.lambda_, self.alpha, self.cumulative = 0.0, delta, lambda_, alpha, 0.0
def update(self, x):
self.mean = self.mean * self.alpha + x * (1 - self.alpha)
self.cumulative = max(0, self.cumulative + x - self.mean - self.delta)
if self.cumulative > self.lambda_:
self.cumulative = 0
return True
return False
class FirebaseManager:
def __init__(self, db_signals_ref_name='signals_v20', db_strategies_ref_name='generated_strategies_v20'):
self.signals_ref = None
self.strategies_ref = None
try:
sa_key_json, db_url = os.environ.get('FIRESTORE_SA_KEY'), os.environ.get('FIREBASE_DB_URL')
if not all([sa_key_json, db_url]):
print("FIREBASE MANAGER: Secrets not set. Logger disabled.")
return
cred = credentials.Certificate(json.loads(sa_key_json))
if not firebase_admin._apps:
firebase_admin.initialize_app(cred, {'databaseURL': db_url})
self.signals_ref = db.reference(db_signals_ref_name)
self.strategies_ref = db.reference(db_strategies_ref_name)
print(f"FIREBASE MANAGER: Successfully connected. Signals: '{db_signals_ref_name}', Strategies: '{db_strategies_ref_name}'.")
except Exception as e:
print(f"FIREBASE MANAGER: CRITICAL ERROR - Failed to initialize: {e}")
def log_signal(self, ts, strategy, action, entry, sl, tp, context_vector, market_regime, confidence, predictions_str):
if not self.signals_ref: return
try:
signal_data = {
"timestamp": ts, "strategy": strategy, "action": action, "entry": float(entry),
"stop_loss": float(sl), "take_profit": float(tp), "context_vector": [float(x) for x in context_vector.tolist()],
"market_regime_at_trade": market_regime, "confidence_at_trade": confidence,
"predictions_at_trade": predictions_str, "pnl": None, "reward": None, "outcome_reason": None
}
self.signals_ref.push(signal_data)
except Exception as e:
print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not write signal: {e}")
def save_strategy(self, strategy_name, code_string):
if not self.strategies_ref: return
try:
self.strategies_ref.child(strategy_name).set({"code": code_string, "created_at": time.time()})
print(f"FIREBASE MANAGER: Saved new strategy '{strategy_name}' to database.")
except Exception as e:
print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not save strategy: {e}")
def load_all_strategies(self):
if not self.strategies_ref: return {}
try:
return self.strategies_ref.get() or {}
except Exception as e:
print(f"FIREBASE MANAGER: CRITICAL ERROR - Could not load strategies: {e}")
return {}
class StrategyManager:
def __init__(self, situation_room, prediction_engine):
self.situation_room = situation_room
self.prediction_engine = prediction_engine
self._strategies = self._initialize_base_strategies()
def _initialize_base_strategies(self):
def predictive_strategy(seq):
preds_dict, preds_str = self.prediction_engine.predict_single(seq)
return self.situation_room.generate_thesis(preds_dict, seq), preds_str
def ema_crossover_strategy(seq):
return self.situation_room.generate_thesis({}, seq), "N/A (EMA Strategy)"
return {"predictive_rule_based": predictive_strategy, "ema_crossover": ema_crossover_strategy}
def list_strategies(self):
return self._strategies
def add_strategy(self, name, function_obj):
print(f"STRATEGY MANAGER: Adding new strategy '{name}' to the active pool.")
self._strategies[name] = function_obj
def load_evolved_strategies(self, firebase_manager):
print("STRATEGY MANAGER: Checking for evolved strategies in the database...")
evolved_strategies = firebase_manager.load_all_strategies()
count = 0
for name, data in evolved_strategies.items():
code = data.get("code")
if code:
try:
namespace = {}
exec(code, globals(), namespace)
strategy_func = namespace['generated_strategy']
def wrapper(func):
def inner(sequence_df):
thesis = func(sequence_df)
return thesis, "N/A (Evolved)"
return inner
self.add_strategy(name, wrapper(strategy_func))
count += 1
except Exception as e:
print(f"STRATEGY MANAGER: ERROR - Failed to load evolved strategy '{name}'. Reason: {e}")
print(f"STRATEGY MANAGER: Loaded {count} evolved strategies from the database.")
class CognitiveCore:
def __init__(self, firebase_manager):
self.firebase_manager = firebase_manager
def analyze_performance_history(self):
print("COGNITIVE CORE: Analyzing historical performance...")
if not self.firebase_manager.signals_ref: return None
all_signals = self.firebase_manager.signals_ref.get()
if not all_signals:
print("COGNITIVE CORE: No history to analyze."); return None
df = pd.DataFrame.from_dict(all_signals, orient='index')
df = df.dropna(subset=['reward', 'market_regime_at_trade'])
if df.empty or len(df) < 20:
print(f"COGNITIVE CORE: Not enough completed trades ({len(df)}) to analyze. Need at least 20."); return None
regime_performance = df.groupby('market_regime_at_trade')['reward'].mean()
if regime_performance.empty: return None
worst_regime = regime_performance.idxmin()
worst_regime_score = regime_performance.min()
if worst_regime_score >= -0.1:
print(f"COGNITIVE CORE INSIGHT: Performance is acceptable. No evolution needed."); return None
memo = f"""
**To:** Strategy Lab
**From:** Cognitive Core
**Subject:** Performance Analysis & Strategic Gaps
**Key Findings:** A critical weakness has been identified in the **'{worst_regime.upper()}'** market regime, with an average reward of {worst_regime_score:.2f}.
**Strategic Directive:** Generate a new **mean-reversion** strategy for **'{worst_regime.upper()}'** markets. The strategy should SELL when RSI is overbought (e.g., > 70) and BUY when RSI is oversold (e.g., < 30). Use a tight stop-loss of 1.5 times the ATR.
"""
print(f"COGNITIVE CORE INSIGHT: Generated memo targeting poor performance in '{worst_regime}' regime.")
return memo
class LocalStrategyLab:
def __init__(self, model_name="microsoft/Phi-3-mini-4k-instruct"):
self.generator = None
if not NLP_LIBRARIES_AVAILABLE:
print("STRATEGY LAB: WARNING - Transformers/Torch not installed. Strategy generation disabled.")
return
print(f"STRATEGY LAB: Initializing local LLM: {model_name}...")
try:
self.generator = pipeline(
"text-generation", model=model_name, torch_dtype=torch.bfloat16, device_map="auto"
)
print("STRATEGY LAB: LLM Initialized successfully.")
except Exception as e:
print(f"STRATEGY LAB: CRITICAL ERROR - Failed to initialize LLM pipeline: {e}")
def _create_prompt(self, performance_memo):
return f"""You are an expert quantitative trading strategist. Your task is to design a new, syntactically correct Python trading strategy function based on a performance analysis memo.
**Performance Memo:**
---
{performance_memo}
---
**Instructions:**
1. Write a single Python function. The function name MUST be `generated_strategy`.
2. The function must accept one argument: `sequence_df`, a pandas DataFrame. The last row (`sequence_df.iloc[-1]`) is the current timestep.
3. The function MUST return a Python dictionary with the trade thesis.
4. The dictionary keys must include: "action", "entry", "stop_loss", "take_profit", "confidence", "reasoning", and "strategy_type".
5. For "action", use "BUY", "SELL", or "NO_TRADE".
6. Base your strategy logic directly on the 'Strategic Directive' in the memo. Use standard indicators available in the DataFrame like 'RSI', 'EMA_20', and 'ATR'.
7. The value for "strategy_type" should be "Mean Reversion".
Begin the Python code block now.
```python
"""
def generate_new_strategy_code(self, memo):
if not self.generator or not memo: return None
prompt = self._create_prompt(memo)
outputs = self.generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.6, top_k=50, top_p=0.95)
generated_text = outputs['generated_text']
try:
code = generated_text.split("```python")[1].split("```").strip()
print("STRATEGY LAB: Successfully generated new strategy code.")
return code
except IndexError:
print("STRATEGY LAB: ERROR - Could not extract Python code from LLM response."); return None
CONTEXT_FEATURES = ['close', 'ATR', 'EMA_20', 'RSI', 'time_since_event', 'time_to_event', 'hour_of_day']
REGIME_COLS = ['regime_TRENDING', 'regime_BREAKOUT', 'regime_CHOPPY', 'regime_RANGING']
CONTEXT_DIMENSION = len(CONTEXT_FEATURES) + len(REGIME_COLS)
class ContextVectorPreprocessor:
def __init__(self):
self.scaler = MinMaxScaler(feature_range=(-1, 1))
self.calibrated_context_scaler_path = 'calibrated_context_scaler.joblib'
self.feature_names = CONTEXT_FEATURES
self.regime_cols = REGIME_COLS
def is_calibrated(self): return os.path.exists(self.calibrated_context_scaler_path)
def load_calibrated_scaler(self):
if self.is_calibrated():
print(f"--- Loading pre-calibrated context scaler ---")
self.scaler = joblib.load(self.calibrated_context_scaler_path)
return True
return False
def calibrate(self, calibration_data_for_context):
print("--- Fitting Context Scaler ---")
self.scaler.fit(calibration_data_for_context[self.feature_names])
joblib.dump(self.scaler, self.calibrated_context_scaler_path)
print("--- Context Scaler calibration complete ---")
def build_context_vector(self, df_with_regime):
if 'regime' not in df_with_regime.columns: raise ValueError("DataFrame must contain a 'regime' column.")
df_copy = df_with_regime.copy()
df_copy[self.feature_names] = df_copy[self.feature_names].astype(float)
last_row_features = df_copy[self.feature_names].iloc[-1:].values
scaled_numeric_vec = self.scaler.transform(last_row_features).flatten()
last_regime = df_copy.iloc[-1]['regime']
regime_vec = np.zeros(len(self.regime_cols))
try:
regime_idx = self.regime_cols.index(f"regime_{last_regime}")
regime_vec[regime_idx] = 1
except ValueError:
print(f"Warning: Regime '{last_regime}' not found. Setting regime vector to zeros.")
return np.concatenate([scaled_numeric_vec, regime_vec])
class LiveDataStore:
def __init__(self, api_key, finbert_tokenizer, finbert_model, update_interval_seconds=120):
self.api_key = api_key; self.finbert_tokenizer = finbert_tokenizer; self.finbert_model = finbert_model
self.update_interval_seconds = update_interval_seconds; self._data_lock = threading.Lock()
self._latest_features = pd.DataFrame(); self._next_event_info = {"title": "None within 24h", "time_str": "N/A"}
self._stop_event = threading.Event(); self._update_thread = None; self._last_update_time = None
def _fetch_and_update(self, output_size=500):
start_time = time.time()
try:
price_data = fetch_twelvedata_prices(self.api_key, output_size=output_size)
if price_data.empty: return
events_data = fetch_live_events_with_cache()
features, next_event_info = create_feature_set_for_inference(price_data, events_data, self.finbert_tokenizer, self.finbert_model)
with self._data_lock: self._latest_features, self._next_event_info, self._last_update_time = features, next_event_info, pd.Timestamp.now(tz='UTC')
except Exception as e: print(f"LiveDataStore ERROR during update: {e}")
elapsed_time = time.time() - start_time
sleep_duration = max(0, self.update_interval_seconds - elapsed_time)
if sleep_duration > 0: self._stop_event.wait(sleep_duration)
def _update_loop(self):
while not self._stop_event.is_set(): self._fetch_and_update()
def start(self):
if self._update_thread is None or not self._update_thread.is_alive():
print("LiveDataStore: Starting background update thread.")
self._stop_event.clear(); self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start(); print("LiveDataStore: Performing initial data fetch."); self._fetch_and_update()
def get_latest_data(self, num_bars=None):
with self._data_lock:
if num_bars and not self._latest_features.empty: return self._latest_features.iloc[-num_bars:].copy(), self._next_event_info
return self._latest_features.copy(), self._next_event_info
def get_raw_price_data(self, num_bars=None):
with self._data_lock:
if self._latest_features.empty: return pd.DataFrame()
df_slice = self._latest_features[['open', 'high', 'low', 'close']].copy()
if num_bars: df_slice = df_slice.iloc[-num_bars:]
df_slice.reset_index(inplace=True)
if df_slice['Datetime'].dt.tz is None: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_localize('UTC')
else: df_slice['Datetime'] = df_slice['Datetime'].dt.tz_convert('UTC')
return df_slice
def download_models_from_hf(repo_id, hf_token):
print("Downloading base agent model from Hugging Face Hub...")
try: return hf_hub_download(repo_id=repo_id, filename="multi_horizon_model.keras", token=hf_token)
except Exception as e: print(f"FATAL: Failed to download base model: {e}"); raise
def send_ntfy_notification(topic, trade_thesis):
if not topic: return
title = f"V20 Signal: {trade_thesis.get('action')} EUR/USD"
message = (f"Strategy: {trade_thesis.get('strategy_type')} ({trade_thesis.get('confidence')})\n"
f"Reason: {trade_thesis.get('reasoning')}\n"
f"Entry: {trade_thesis.get('entry')} | SL: {trade_thesis.get('stop_loss')} | TP: {trade_thesis.get('take_profit')}")
try: requests.post(f"https://ntfy.sh/{topic}", data=message.encode('utf-8'), headers={"Title": title}); print("ntfy notification sent.")
except requests.exceptions.RequestException as e: print(f"Failed to send ntfy notification: {e}")
def evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store):
if not firebase_manager.signals_ref: return
now_utc = pd.Timestamp.now(tz='UTC')
try:
all_signals = firebase_manager.signals_ref.get();
if not all_signals: return
live_price_history = live_data_store.get_raw_price_data(num_bars=288)
if live_price_history.empty: return
for key, signal in all_signals.items():
if signal.get('reward') is not None: continue
signal_time = pd.to_datetime(signal['timestamp'])
if now_utc < (signal_time + pd.Timedelta(minutes=5)): continue
entry, sl, tp, action = float(signal['entry']), float(signal['stop_loss']), float(signal['take_profit']), signal['action']
relevant_bars = live_price_history[live_price_history['Datetime'] > signal_time]
if relevant_bars.empty: continue
outcome_reason, exit_price = "Time Exit", relevant_bars.iloc[-1]['close']
for _, bar in relevant_bars.iterrows():
if action == 'BUY':
if bar['low'] <= sl: exit_price, outcome_reason = sl, "SL"; break
if bar['high'] >= tp: exit_price, outcome_reason = tp, "TP"; break
elif action == 'SELL':
if bar['high'] >= sl: exit_price, outcome_reason = sl, "SL"; break
if bar['low'] <= tp: exit_price, outcome_reason = tp, "TP"; break
pnl = (exit_price - entry) if action == 'BUY' else (entry - exit_price)
reward = np.clip(pnl / 0.005, -1.0, 1.0)
context_vector = np.array(signal['context_vector'])
bandit.update(signal['strategy'], context_vector, reward)
if change_detector.update(-reward): print("! CONCEPT DRIFT DETECTED !"); bandit.increase_exploration()
firebase_manager.signals_ref.child(key).update({'pnl': pnl, 'reward': reward, 'outcome_reason': outcome_reason})
print(f"EVALUATOR: Updated signal from {signal_time.isoformat()}. Outcome: {outcome_reason}, PnL: {pnl:.5f}")
except Exception as e: print(f"EVALUATOR V2 ERROR: {e}"); traceback.print_exc()
# --- MAIN WORKER FUNCTION ---
def main_worker():
print("--- [Conscious Agent V20 - Definitive Fix] Worker Thread Started ---")
api_key, hf_token, ntfy_topic = os.environ.get('TWELVE_DATA_API_KEY'), os.environ.get('HF_TOKEN'), os.environ.get('NTFY_TOPIC_V2')
HF_REPO_ID = "Badumetsibb/conscious-trading-agent-models"
if not NLP_LIBRARIES_AVAILABLE: print("CRITICAL: Transformers or PyTorch not found. Self-evolution will be disabled.")
finbert_tokenizer = BertTokenizer.from_pretrained('ProsusAI/finbert')
finbert_model = TFBertModel.from_pretrained('ProsusAI/finbert', from_pt=True)
prediction_engine = PredictionCoreTransformer()
context_preprocessor = ContextVectorPreprocessor()
if prediction_engine.is_calibrated() and context_preprocessor.is_calibrated():
prediction_engine.load_calibrated_artifacts()
context_preprocessor.load_calibrated_scaler()
else:
print("No calibrated artifacts found. Starting one-time production calibration...")
base_model_path = download_models_from_hf(HF_REPO_ID, hf_token)
calibration_price_data = fetch_twelvedata_prices(api_key, output_size=5000)
if calibration_price_data.empty or len(calibration_price_data) < 500:
print("FATAL: Could not fetch enough data for calibration. Worker stopping."); return
calibration_events_data = fetch_live_events_with_cache()
calibration_features, _ = create_feature_set_for_inference(calibration_price_data, calibration_events_data, finbert_tokenizer, finbert_model)
prediction_engine.calibrate(base_model_path, calibration_features.copy())
causal_engine_for_calibration = CausalReasoningNetwork(calibration_features.copy())
calibration_features_with_regime = causal_engine_for_calibration.identify_volatility_regimes()
context_preprocessor.calibrate(calibration_features_with_regime.copy())
firebase_manager = FirebaseManager()
live_data_store = LiveDataStore(api_key, finbert_tokenizer, finbert_model)
live_data_store.start()
situation_room = RuleBasedSituationRoom({'sl_atr_multiplier': 2.0, 'tp_atr_multiplier': 4.0})
strategy_manager = StrategyManager(situation_room, prediction_engine)
strategy_manager.load_evolved_strategies(firebase_manager)
bandit = LinUCBBandit(strategy_manager.list_strategies().keys(), d=CONTEXT_DIMENSION, alpha=1.5)
change_detector = PageHinkley()
cognitive_core = CognitiveCore(firebase_manager)
strategy_lab = LocalStrategyLab()
last_evolution_check = time.time()
EVOLUTION_CYCLE_SECONDS = 86400
print("--- WORKER V20: Initialization Complete. Starting main adaptive loop. ---")
while True:
try:
print(f"WORKER V20: [{pd.Timestamp.now(tz='UTC')}] Waking up...")
features, next_event_info = live_data_store.get_latest_data()
if features.empty or len(features) < prediction_engine.sequence_length:
print("WORKER V20: Not enough data. Waiting..."); time.sleep(300); continue
causal_engine = CausalReasoningNetwork(features.copy())
features_with_regime = causal_engine.identify_volatility_regimes()
input_sequence = features_with_regime.iloc[-prediction_engine.sequence_length:]
ctx_vec = context_preprocessor.build_context_vector(input_sequence)
chosen_strategy_name = bandit.select(ctx_vec)
trade_thesis, preds_str = strategy_manager.list_strategies()[chosen_strategy_name](input_sequence)
current_regime = input_sequence.iloc[-1]['regime']
is_tradeable = MarketRegimeFilter().should_trade(current_regime, trade_thesis)
final_action = trade_thesis['action'] if is_tradeable else "NO TRADE"
if final_action in ["BUY", "SELL"]:
ts = pd.Timestamp.now(tz='UTC').isoformat()
firebase_manager.log_signal(ts, chosen_strategy_name, final_action, trade_thesis['entry'],
trade_thesis['stop_loss'], trade_thesis['take_profit'], ctx_vec,
current_regime, trade_thesis['confidence'], preds_str)
send_ntfy_notification(ntfy_topic, trade_thesis)
evaluate_pending_signals_v2(firebase_manager, bandit, change_detector, live_data_store)
final_reasoning = f"Bandit chose '{chosen_strategy_name}'. Thesis: '{trade_thesis['reasoning']}'"
if trade_thesis['action'] != 'NO TRADE':
final_reasoning += f" -> {'✅ EXECUTABLE.' if is_tradeable else f'❌ REJECTED by Regime Filter for {current_regime}.'}"
status = {"last_checked": pd.Timestamp.now(tz='UTC').isoformat(), "market_price": f"{input_sequence.iloc[-1]['close']:.5f}", "market_regime": current_regime, "signal": final_action, "reasoning": final_reasoning, "predictions": preds_str, "next_event": f"{next_event_info['title']} ({next_event_info['time_str']})"}
with open('status.json', 'w') as f: json.dump(status, f)
print(f"WORKER V20: Analysis complete. Signal: {final_action}. Sleeping.")
if (time.time() - last_evolution_check) > EVOLUTION_CYCLE_SECONDS:
print("\n--- INITIATING NIGHTLY SELF-EVOLUTION CYCLE ---")
memo = cognitive_core.analyze_performance_history()
if memo:
new_code = strategy_lab.generate_new_strategy_code(memo)
if new_code:
new_strategy_name = f"evolved_strategy_{int(time.time())}"
firebase_manager.save_strategy(new_strategy_name, new_code)
print("EVOLUTION: New strategy saved. It will be active after the next restart.")
last_evolution_check = time.time()
print("--- SELF-EVOLUTION CYCLE COMPLETE ---\n")
time.sleep(300)
except Exception as e:
print(f"WORKER V20 CRITICAL ERROR: {e}"); traceback.print_exc(); time.sleep(60)
# --- GRADIO DASHBOARD ---
def get_latest_status():
if not os.path.exists('status.json'): return "Agent is starting...", "", "", "", "", "", ""
with open('status.json', 'r') as f: status = json.load(f)
return (f"Status from worker at: {status.get('last_checked', 'N/A')}", status.get('market_price', 'N/A'), status.get('market_regime', 'N/A'), status.get('signal', 'N/A'), status.get('reasoning', 'N/A'), status.get('predictions', 'N/A'), status.get('next_event', 'N/A'))
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🧠 V20 Conscious Agent (Definitive Fix)")
gr.Markdown("This agent uses a reliable calibration flow and a persistent, database-driven memory for self-evolution.")
all_secrets = all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'NTFY_TOPIC_V2', 'HF_TOKEN', 'FIRESTORE_SA_KEY', 'FIREBASE_DB_URL']])
secret_status = "✅ All required secrets appear to be set." if all_secrets else "❌ One or more secrets are MISSING. Check all required secrets."
gr.Markdown(f"**Secrets Status:** {secret_status}")
refresh_btn = gr.Button("Refresh Status", variant="primary")
status_output = gr.Textbox(label="Status", interactive=False)
gr.Markdown("## Agent's Last Analysis")
with gr.Row(): price_output = gr.Textbox(label="Last Market Price"); regime_output = gr.Textbox(label="Last Market Regime")
with gr.Row(): predictions_output = gr.Textbox(label="DL Model Predictions (5m|15m|1h)"); event_output = gr.Textbox(label="Next High-Impact Event")
action_output = gr.Textbox(label="Final Signal / Action")
reasoning_output = gr.Textbox(label="Full Reasoning", lines=3)
refresh_btn.click(fn=get_latest_status, inputs=[], outputs=[status_output, price_output, regime_output, action_output, reasoning_output, predictions_output, event_output])
if __name__ == "__main__":
if not all([os.environ.get(k) for k in ['TWELVE_DATA_API_KEY', 'HF_TOKEN']]):
print("FATAL: Core secrets (API Key, HF Token) are missing. Worker cannot start.")
else:
worker_thread = threading.Thread(target=main_worker, daemon=True)
worker_thread.start()
demo.launch()