Aurora / adaptive_meta_patch_v1.py
Badumetsibb's picture
Upload adaptive_meta_patch_v1.py
3a0e72b verified
# ===============================================
# ADAPTIVE META-CONTROLLER MAIN LOOP (V1)
# Drop-in for app.py — replaces your main_worker.
# ===============================================
import os
import csv
import time
import math
import random
from collections import deque, defaultdict
import pandas as pd
# ------------------ Meta Components ------------------
class PerformanceLogger:
\"\"\"Append signals and outcomes to a CSV for meta-learning and replay.\"\"\"
def __init__(self, path=\"/mnt/data/agent_signals_log.csv\"):
self.path = path
header = [\"timestamp\",\"strategy\",\"action\",\"entry\",\"stop_loss\",\"take_profit\",\"price_at_signal\",\"eval_time\",\"pnl\",\"reward\",\"context_hash\"]
if not os.path.exists(self.path):
with open(self.path, \"w\", newline='') as f:
writer = csv.writer(f)
writer.writerow(header)
def log_signal(self, ts, strategy, action, entry, sl, tp, price, eval_time, context_hash):
with open(self.path, \"a\", newline='') as f:
writer = csv.writer(f)
writer.writerow([ts, strategy, action, entry, sl, tp, price, eval_time, \"\", \"\", context_hash])
def update_outcome(self, ts, pnl, reward):
rows = []
filled = False
with open(self.path, \"r\", newline='') as f:
rows = list(csv.reader(f))
for i in range(len(rows)-1, 0, -1):
if rows[i][0] == ts and rows[i][8] == \"\":
rows[i][8] = f\"{pnl:.6f}\"
rows[i][9] = f\"{reward:.6f}\"
filled = True
break
if filled:
with open(self.path, \"w\", newline='') as f:
writer = csv.writer(f)
writer.writerows(rows)
class PageHinkley:
\"\"\"Page-Hinkley change detector for streaming losses/returns.\"\"\"
def __init__(self, delta=0.0001, lambda_=40, alpha=1-1e-3):
self.mean = 0.0
self.delta = delta
self.lambda_ = lambda_
self.alpha = alpha
self.cumulative = 0.0
def update(self, x):
# x: score (e.g., negative pnl or error)
self.mean = self.mean * self.alpha + x * (1 - self.alpha)
self.cumulative = min(self.cumulative + x - self.mean - self.delta, 0)
if -self.cumulative > self.lambda_:
self.cumulative = 0
return True
return False
class ThompsonBandit:
\"\"\"Thompson sampling bandit with Bernoulli reward (win/loss).\"\"\"
def __init__(self, strategies):
self.strategies = list(strategies)
self.success = {s: 1 for s in self.strategies} # Beta(1,1) priors
self.fail = {s: 1 for s in self.strategies}
def select(self, context=None):
samples = {s: random.betavariate(self.success[s], self.fail[s]) for s in self.strategies}
return max(samples, key=samples.get)
def update(self, strategy, reward_binary):
if reward_binary >= 1:
self.success[strategy] += 1
else:
self.fail[strategy] += 1
class StrategyManager:
\"\"\"Wrap strategies with a uniform callable interface.\"\"\"
def __init__(self, situation_room, extra_strategies=None):
self.situation_room = situation_room
self.extra = extra_strategies or {}
def list_strategies(self):
# Provide your canonical rule-based strategy
# The exact signature for generate_thesis may differ in your code.
def rule_based(seq):
# You may customize how horizons/params are passed from your BEST_PARAMS
return self.situation_room.generate_thesis({}, seq)
all_strat = {\"rule_based\": rule_based}
all_strat.update(self.extra)
return all_strat
# ------------------ Small helpers ------------------
def context_hash_from_df(df):
r = df.iloc[-1]
keys = [k for k in [\"close\",\"ATR\",\"EMA_20\",\"RSI\",\"session_london\"] if k in r.index]
vals = [f\"{r[k]:.6f}\" for k in keys]
return \"_\".join(vals) if vals else f\"{float(r.get('close', 0.0)):.6f}\"
def fetch_current_price_or_last(seq):
try:
return float(seq.iloc[-1]['close'])
except Exception:
return float(seq['close'].iloc[-1])
# ------------------ Evaluation pass ------------------
def evaluate_pending_signals(perf_logger_path, bandit, change_detector, price_fetch_seq):
now = pd.Timestamp.now(tz='UTC')
rows = []
updated = False
try:
with open(perf_logger_path, \"r\", newline='') as f:
rows = list(csv.reader(f))
except FileNotFoundError:
return
for i in range(1, len(rows)):
if rows[i][8] != \"\": # already evaluated
continue
eval_time_str = rows[i][7]
try:
eval_time = pd.to_datetime(eval_time_str)
except Exception:
continue
if eval_time <= now:
strategy = rows[i][1]; action = rows[i][2]
try:
entry = float(rows[i][3])
except Exception:
continue
price_now = fetch_current_price_or_last(price_fetch_seq())
pnl = (price_now - entry) if action == \"BUY\" else (entry - price_now)
reward = 1.0 if pnl > 0 else 0.0
rows[i][8] = f\"{pnl:.6f}\"
rows[i][9] = f\"{reward:.6f}\"
bandit.update(strategy, reward)
_ = change_detector.update(-pnl)
updated = True
if updated:
with open(perf_logger_path, \"w\", newline='') as f:
writer = csv.writer(f)
writer.writerows(rows)
# ------------------ Bootstrap dependencies ------------------
def bootstrap_components(symbol):
\"\"\"Create or load your core app components.
If your app constructs these elsewhere, replace this with imports/uses of your instances.
\"\"\"
# Prediction engine: assumes a class PredictionEngine() exists in your app
try:
pred_engine = PredictionEngine(symbol=symbol)
except Exception:
pred_engine = None # If you don't have it or construct elsewhere
# Situation room & regime filter
try:
sr = RuleBasedSituationRoom(BEST_PARAMS)
except Exception:
sr = RuleBasedSituationRoom({})
try:
rf = MarketRegimeFilter()
except Exception:
class _DummyRF:
def should_trade(self, regime, thesis): return True
rf = _DummyRF()
return pred_engine, sr, rf
# ------------------ NEW main_worker ------------------
def main_worker(symbol: str, ntfy_topic: str, poll_interval_seconds: int = 60, lookback_minutes: int = 240, eval_horizon_minutes: int = 30):
\"\"\"Adaptive, self-evaluating main loop.
Replaces your existing main_worker. Safe to run in paper mode.
\"\"\"
pred_engine, situation_room, regime_filter = bootstrap_components(symbol)
strategy_manager = StrategyManager(situation_room, extra_strategies={
# Example alt strategy: a tiny scalp variant built on top of your situation room.
\"scalp\": lambda seq: situation_room.generate_thesis({}, seq)
})
bandit = ThompsonBandit(strategy_manager.list_strategies().keys())
perf_logger = PerformanceLogger()
change_detector = PageHinkley(delta=0.0001, lambda_=40)
def _price_seq_provider():
# Replace with your data fetcher to get the latest window
return fetch_latest_sequence(symbol, lookback_minutes)
print(\"[Adaptive] main_worker started.\")
while True:
try:
# 1) Fetch latest window + build features
input_sequence = _price_seq_provider()
if input_sequence is None or len(input_sequence) < 10:
time.sleep(poll_interval_seconds); continue
features = create_feature_set_for_inference(input_sequence)
# 2) Predict (optional): if you have a prediction_engine, use it to enrich features
if pred_engine is not None and hasattr(pred_engine, \"predict\"):
try:
_ = pred_engine.predict(features)
except Exception as _e:
pass
# 3) Regime classification (optional): if you have a function, call it; else set default
current_regime = \"normal\"
# 4) Strategy selection and signal
available = strategy_manager.list_strategies()
chosen_name = bandit.select(context=None)
trade_thesis = available[chosen_name](features)
is_tradeable = True
try:
is_tradeable = regime_filter.should_trade(current_regime, trade_thesis)
except Exception:
pass
final_action = trade_thesis.get('action', 'NO ACTION')
if not is_tradeable:
final_action = \"NO TRADE (FILTERED)\"
# 5) Log signal for later evaluation
ts = str(pd.Timestamp.now(tz='UTC'))
context_hash = context_hash_from_df(features)
if final_action in [\"BUY\", \"SELL\"]:
perf_logger.log_signal(
ts, chosen_name, final_action,
trade_thesis.get('entry', features.iloc[-1]['close']),
trade_thesis.get('stop_loss', None),
trade_thesis.get('take_profit', None),
float(features.iloc[-1]['close']),
(pd.Timestamp.now(tz='UTC') + pd.Timedelta(minutes=eval_horizon_minutes)).isoformat(),
context_hash
)
# Notify
try:
send_ntfy_notification(ntfy_topic, trade_thesis | {\"strategy\": chosen_name})
except Exception:
pass
# 6) Evaluate pending signals (shadow P&L)
evaluate_pending_signals(perf_logger.path, bandit, change_detector, _price_seq_provider)
# 7) Optional: trigger fine-tune on drift
# You can check the internal state of change_detector if you adapt the class to expose flags.
time.sleep(poll_interval_seconds)
except KeyboardInterrupt:
print(\"[Adaptive] Stopping main_worker.\")
break
except Exception as e:
# Keep the loop resilient
print(f\"[Adaptive] Loop error: {e}\")
time.sleep(poll_interval_seconds)