Spaces:
Sleeping
Sleeping
| # =============================================== | |
| # ADAPTIVE META-CONTROLLER MAIN LOOP (V1) | |
| # Drop-in for app.py — replaces your main_worker. | |
| # =============================================== | |
| import os | |
| import csv | |
| import time | |
| import math | |
| import random | |
| from collections import deque, defaultdict | |
| import pandas as pd | |
| # ------------------ Meta Components ------------------ | |
| class PerformanceLogger: | |
| \"\"\"Append signals and outcomes to a CSV for meta-learning and replay.\"\"\" | |
| def __init__(self, path=\"/mnt/data/agent_signals_log.csv\"): | |
| self.path = path | |
| header = [\"timestamp\",\"strategy\",\"action\",\"entry\",\"stop_loss\",\"take_profit\",\"price_at_signal\",\"eval_time\",\"pnl\",\"reward\",\"context_hash\"] | |
| if not os.path.exists(self.path): | |
| with open(self.path, \"w\", newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(header) | |
| def log_signal(self, ts, strategy, action, entry, sl, tp, price, eval_time, context_hash): | |
| with open(self.path, \"a\", newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ts, strategy, action, entry, sl, tp, price, eval_time, \"\", \"\", context_hash]) | |
| def update_outcome(self, ts, pnl, reward): | |
| rows = [] | |
| filled = False | |
| with open(self.path, \"r\", newline='') as f: | |
| rows = list(csv.reader(f)) | |
| for i in range(len(rows)-1, 0, -1): | |
| if rows[i][0] == ts and rows[i][8] == \"\": | |
| rows[i][8] = f\"{pnl:.6f}\" | |
| rows[i][9] = f\"{reward:.6f}\" | |
| filled = True | |
| break | |
| if filled: | |
| with open(self.path, \"w\", newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerows(rows) | |
| class PageHinkley: | |
| \"\"\"Page-Hinkley change detector for streaming losses/returns.\"\"\" | |
| def __init__(self, delta=0.0001, lambda_=40, alpha=1-1e-3): | |
| self.mean = 0.0 | |
| self.delta = delta | |
| self.lambda_ = lambda_ | |
| self.alpha = alpha | |
| self.cumulative = 0.0 | |
| def update(self, x): | |
| # x: score (e.g., negative pnl or error) | |
| self.mean = self.mean * self.alpha + x * (1 - self.alpha) | |
| self.cumulative = min(self.cumulative + x - self.mean - self.delta, 0) | |
| if -self.cumulative > self.lambda_: | |
| self.cumulative = 0 | |
| return True | |
| return False | |
| class ThompsonBandit: | |
| \"\"\"Thompson sampling bandit with Bernoulli reward (win/loss).\"\"\" | |
| def __init__(self, strategies): | |
| self.strategies = list(strategies) | |
| self.success = {s: 1 for s in self.strategies} # Beta(1,1) priors | |
| self.fail = {s: 1 for s in self.strategies} | |
| def select(self, context=None): | |
| samples = {s: random.betavariate(self.success[s], self.fail[s]) for s in self.strategies} | |
| return max(samples, key=samples.get) | |
| def update(self, strategy, reward_binary): | |
| if reward_binary >= 1: | |
| self.success[strategy] += 1 | |
| else: | |
| self.fail[strategy] += 1 | |
| class StrategyManager: | |
| \"\"\"Wrap strategies with a uniform callable interface.\"\"\" | |
| def __init__(self, situation_room, extra_strategies=None): | |
| self.situation_room = situation_room | |
| self.extra = extra_strategies or {} | |
| def list_strategies(self): | |
| # Provide your canonical rule-based strategy | |
| # The exact signature for generate_thesis may differ in your code. | |
| def rule_based(seq): | |
| # You may customize how horizons/params are passed from your BEST_PARAMS | |
| return self.situation_room.generate_thesis({}, seq) | |
| all_strat = {\"rule_based\": rule_based} | |
| all_strat.update(self.extra) | |
| return all_strat | |
| # ------------------ Small helpers ------------------ | |
| def context_hash_from_df(df): | |
| r = df.iloc[-1] | |
| keys = [k for k in [\"close\",\"ATR\",\"EMA_20\",\"RSI\",\"session_london\"] if k in r.index] | |
| vals = [f\"{r[k]:.6f}\" for k in keys] | |
| return \"_\".join(vals) if vals else f\"{float(r.get('close', 0.0)):.6f}\" | |
| def fetch_current_price_or_last(seq): | |
| try: | |
| return float(seq.iloc[-1]['close']) | |
| except Exception: | |
| return float(seq['close'].iloc[-1]) | |
| # ------------------ Evaluation pass ------------------ | |
| def evaluate_pending_signals(perf_logger_path, bandit, change_detector, price_fetch_seq): | |
| now = pd.Timestamp.now(tz='UTC') | |
| rows = [] | |
| updated = False | |
| try: | |
| with open(perf_logger_path, \"r\", newline='') as f: | |
| rows = list(csv.reader(f)) | |
| except FileNotFoundError: | |
| return | |
| for i in range(1, len(rows)): | |
| if rows[i][8] != \"\": # already evaluated | |
| continue | |
| eval_time_str = rows[i][7] | |
| try: | |
| eval_time = pd.to_datetime(eval_time_str) | |
| except Exception: | |
| continue | |
| if eval_time <= now: | |
| strategy = rows[i][1]; action = rows[i][2] | |
| try: | |
| entry = float(rows[i][3]) | |
| except Exception: | |
| continue | |
| price_now = fetch_current_price_or_last(price_fetch_seq()) | |
| pnl = (price_now - entry) if action == \"BUY\" else (entry - price_now) | |
| reward = 1.0 if pnl > 0 else 0.0 | |
| rows[i][8] = f\"{pnl:.6f}\" | |
| rows[i][9] = f\"{reward:.6f}\" | |
| bandit.update(strategy, reward) | |
| _ = change_detector.update(-pnl) | |
| updated = True | |
| if updated: | |
| with open(perf_logger_path, \"w\", newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerows(rows) | |
| # ------------------ Bootstrap dependencies ------------------ | |
| def bootstrap_components(symbol): | |
| \"\"\"Create or load your core app components. | |
| If your app constructs these elsewhere, replace this with imports/uses of your instances. | |
| \"\"\" | |
| # Prediction engine: assumes a class PredictionEngine() exists in your app | |
| try: | |
| pred_engine = PredictionEngine(symbol=symbol) | |
| except Exception: | |
| pred_engine = None # If you don't have it or construct elsewhere | |
| # Situation room & regime filter | |
| try: | |
| sr = RuleBasedSituationRoom(BEST_PARAMS) | |
| except Exception: | |
| sr = RuleBasedSituationRoom({}) | |
| try: | |
| rf = MarketRegimeFilter() | |
| except Exception: | |
| class _DummyRF: | |
| def should_trade(self, regime, thesis): return True | |
| rf = _DummyRF() | |
| return pred_engine, sr, rf | |
| # ------------------ NEW main_worker ------------------ | |
| def main_worker(symbol: str, ntfy_topic: str, poll_interval_seconds: int = 60, lookback_minutes: int = 240, eval_horizon_minutes: int = 30): | |
| \"\"\"Adaptive, self-evaluating main loop. | |
| Replaces your existing main_worker. Safe to run in paper mode. | |
| \"\"\" | |
| pred_engine, situation_room, regime_filter = bootstrap_components(symbol) | |
| strategy_manager = StrategyManager(situation_room, extra_strategies={ | |
| # Example alt strategy: a tiny scalp variant built on top of your situation room. | |
| \"scalp\": lambda seq: situation_room.generate_thesis({}, seq) | |
| }) | |
| bandit = ThompsonBandit(strategy_manager.list_strategies().keys()) | |
| perf_logger = PerformanceLogger() | |
| change_detector = PageHinkley(delta=0.0001, lambda_=40) | |
| def _price_seq_provider(): | |
| # Replace with your data fetcher to get the latest window | |
| return fetch_latest_sequence(symbol, lookback_minutes) | |
| print(\"[Adaptive] main_worker started.\") | |
| while True: | |
| try: | |
| # 1) Fetch latest window + build features | |
| input_sequence = _price_seq_provider() | |
| if input_sequence is None or len(input_sequence) < 10: | |
| time.sleep(poll_interval_seconds); continue | |
| features = create_feature_set_for_inference(input_sequence) | |
| # 2) Predict (optional): if you have a prediction_engine, use it to enrich features | |
| if pred_engine is not None and hasattr(pred_engine, \"predict\"): | |
| try: | |
| _ = pred_engine.predict(features) | |
| except Exception as _e: | |
| pass | |
| # 3) Regime classification (optional): if you have a function, call it; else set default | |
| current_regime = \"normal\" | |
| # 4) Strategy selection and signal | |
| available = strategy_manager.list_strategies() | |
| chosen_name = bandit.select(context=None) | |
| trade_thesis = available[chosen_name](features) | |
| is_tradeable = True | |
| try: | |
| is_tradeable = regime_filter.should_trade(current_regime, trade_thesis) | |
| except Exception: | |
| pass | |
| final_action = trade_thesis.get('action', 'NO ACTION') | |
| if not is_tradeable: | |
| final_action = \"NO TRADE (FILTERED)\" | |
| # 5) Log signal for later evaluation | |
| ts = str(pd.Timestamp.now(tz='UTC')) | |
| context_hash = context_hash_from_df(features) | |
| if final_action in [\"BUY\", \"SELL\"]: | |
| perf_logger.log_signal( | |
| ts, chosen_name, final_action, | |
| trade_thesis.get('entry', features.iloc[-1]['close']), | |
| trade_thesis.get('stop_loss', None), | |
| trade_thesis.get('take_profit', None), | |
| float(features.iloc[-1]['close']), | |
| (pd.Timestamp.now(tz='UTC') + pd.Timedelta(minutes=eval_horizon_minutes)).isoformat(), | |
| context_hash | |
| ) | |
| # Notify | |
| try: | |
| send_ntfy_notification(ntfy_topic, trade_thesis | {\"strategy\": chosen_name}) | |
| except Exception: | |
| pass | |
| # 6) Evaluate pending signals (shadow P&L) | |
| evaluate_pending_signals(perf_logger.path, bandit, change_detector, _price_seq_provider) | |
| # 7) Optional: trigger fine-tune on drift | |
| # You can check the internal state of change_detector if you adapt the class to expose flags. | |
| time.sleep(poll_interval_seconds) | |
| except KeyboardInterrupt: | |
| print(\"[Adaptive] Stopping main_worker.\") | |
| break | |
| except Exception as e: | |
| # Keep the loop resilient | |
| print(f\"[Adaptive] Loop error: {e}\") | |
| time.sleep(poll_interval_seconds) | |