| """ |
| Multi-Agent Trading Environment built on Gymnasium. |
| Integrates MarketState, PortfolioState, RiskState with the agent interaction loop. |
| """ |
|
|
| import gymnasium as gym |
| from gymnasium import spaces |
| import numpy as np |
| import pandas as pd |
| from typing import Optional, Tuple, Dict, Any |
| from openenv.env import Env as OpenEnvBase |
|
|
| from env.state import MarketState, PortfolioState, RiskState, get_observation |
| from env.reward import compute_raw_reward, normalize_reward, compute_grade |
| from utils.indicators import compute_indicators |
|
|
|
|
| class TradingEnv(OpenEnvBase, gym.Env): |
| """ |
| A multi-agent RL trading environment. |
| |
| Observation: concatenated normalized features from market, portfolio, and risk state. |
| Action: Dict with 'direction' (0=Hold, 1=Buy, 2=Sell), 'size' [0, 1], 'sl' (price), 'tp' (price). |
| """ |
|
|
| metadata = {"render_modes": ["human"]} |
|
|
| def __init__( |
| self, |
| df: Optional[pd.DataFrame] = None, |
| initial_cash: float = 100_000.0, |
| ticker: str = "default", |
| commission: float = 0.001, |
| reward_weights: Optional[Dict[str, float]] = None, |
| max_steps: Optional[int] = None, |
| difficulty: str = "hard", |
| ): |
| """ |
| Args: |
| df: OHLCV DataFrame. |
| initial_cash: Starting cash. |
| ticker: Asset identifier. |
| commission: Trading commission. |
| reward_weights: Custom weights. |
| max_steps: Max steps. |
| difficulty: 'easy', 'medium', or 'hard' for curriculum learning. |
| """ |
| self.difficulty = difficulty |
| |
| if df is None: |
| df = self._make_dummy_data(difficulty=self.difficulty) |
| self.raw_df = df.copy() |
| self.df = compute_indicators(df) |
| self.ticker = ticker |
| self.initial_cash = initial_cash |
| self.commission = commission |
| self.reward_weights = reward_weights |
| self.max_steps = max_steps or (len(self.df) - 1) |
|
|
| |
| self.market = MarketState(prices=self.df) |
| self.portfolio = PortfolioState(initial_cash=initial_cash, cash=initial_cash) |
| self.risk = RiskState(peak_value=initial_cash) |
|
|
| |
| obs_size = self.market.feature_size + self.portfolio.feature_size + self.risk.feature_size |
| self.observation_space = spaces.Box( |
| low=-np.inf, high=np.inf, shape=(obs_size,), dtype=np.float32 |
| ) |
| self.action_space = spaces.Dict({ |
| "direction": spaces.Discrete(3), |
| "size": spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32), |
| "sl": spaces.Box(low=0.0, high=np.inf, shape=(1,), dtype=np.float32), |
| "tp": spaces.Box(low=0.0, high=np.inf, shape=(1,), dtype=np.float32), |
| }) |
| OpenEnvBase.__init__( |
| self, |
| name="TradingEnv", |
| state_space=self.observation_space, |
| action_space=self.action_space, |
| episode_max_length=self.max_steps, |
| ) |
|
|
| |
| self.current_step = 0 |
| self.done = False |
| self.episode_rewards = [] |
| self.episode_values = [] |
| self.margin_call_threshold = 0.5 |
|
|
| |
| self.governance_log: list = [] |
| self.episode_interventions = 0 |
| self.episode_compliant_actions = 0 |
|
|
| def _make_dummy_data(self, n=500, difficulty="hard") -> pd.DataFrame: |
| """ |
| Generate synthetic price data with realistic market regimes. |
| Easy: Trending (bull_steady, recovery). |
| Medium: Sideways, mean-reverting, volatile bull. |
| Hard: Crashes, bubble pops, bear markets + regime switching. |
| """ |
| return self._generate_market_data(n=n, difficulty=difficulty) |
|
|
| def _generate_market_data( |
| self, |
| n: int = 500, |
| difficulty: str = "hard", |
| ) -> pd.DataFrame: |
| """Multi-regime synthetic market data generator. |
| |
| Supports 8 realistic market regimes with calibrated parameters, |
| jump diffusion, fat tails, and volume spikes. |
| """ |
| rng = np.random.default_rng() |
| dt = 1 / (24 * 365) |
|
|
| |
| regimes = { |
| "bull_steady": {"mu": 0.30, "sigma": 0.08, "jump_prob": 0.0, "jump_std": 0.0, "df": 30}, |
| "bull_volatile": {"mu": 0.40, "sigma": 0.35, "jump_prob": 0.02, "jump_std": 0.04, "df": 5}, |
| "bear_steady": {"mu": -0.20, "sigma": 0.15, "jump_prob": 0.01, "jump_std": 0.03, "df": 8}, |
| "crash": {"mu": -0.80, "sigma": 0.60, "jump_prob": 0.05, "jump_std": 0.10, "df": 3}, |
| "sideways_choppy": {"mu": 0.0, "sigma": 0.25, "jump_prob": 0.01, "jump_std": 0.03, "df": 6}, |
| "mean_revert": {"mu": 0.0, "sigma": 0.12, "jump_prob": 0.0, "jump_std": 0.0, "df": 15}, |
| "bubble_pop": {"mu": 1.00, "sigma": 0.50, "jump_prob": 0.0, "jump_std": 0.0, "df": 4}, |
| "recovery": {"mu": 0.50, "sigma": 0.20, "jump_prob": 0.01, "jump_std": 0.02, "df": 10}, |
| } |
|
|
| |
| if difficulty == "easy": |
| regime_pool = ["bull_steady", "recovery"] |
| elif difficulty == "medium": |
| regime_pool = ["sideways_choppy", "mean_revert", "bull_volatile", "recovery"] |
| else: |
| regime_pool = list(regimes.keys()) |
|
|
| |
| if difficulty == "hard": |
| num_regimes = rng.choice([1, 2, 3], p=[0.3, 0.4, 0.3]) |
| elif difficulty == "medium": |
| num_regimes = rng.choice([1, 2], p=[0.5, 0.5]) |
| else: |
| num_regimes = 1 |
|
|
| chosen_regimes = rng.choice(regime_pool, size=num_regimes) |
| splits = sorted(rng.integers(50, n - 50, size=max(0, num_regimes - 1))) |
| boundaries = [0] + list(splits) + [n] |
|
|
| |
| all_returns = np.zeros(n) |
| for i, regime_name in enumerate(chosen_regimes): |
| start_idx, end_idx = boundaries[i], boundaries[i + 1] |
| seg_len = end_idx - start_idx |
| params = regimes[regime_name] |
|
|
| |
| noise = rng.standard_t(df=params["df"], size=seg_len) * params["sigma"] * np.sqrt(dt) |
|
|
| |
| drift = (params["mu"] - 0.5 * params["sigma"] ** 2) * dt |
|
|
| |
| jump_mask = rng.random(seg_len) < params["jump_prob"] |
| jumps = jump_mask * rng.normal(0, params["jump_std"], seg_len) |
|
|
| |
| if regime_name == "bubble_pop": |
| midpoint = seg_len // 2 |
| |
| accel = np.linspace(1.0, 3.0, midpoint) |
| noise[:midpoint] *= 0.5 |
| drift_arr = np.full(seg_len, drift) |
| drift_arr[:midpoint] *= accel |
| |
| drift_arr[midpoint:] = -abs(drift) * 2.5 |
| noise[midpoint:] *= 2.0 |
| jumps[midpoint:] += rng.normal(-0.05, 0.08, seg_len - midpoint) * (rng.random(seg_len - midpoint) > 0.9) |
| all_returns[start_idx:end_idx] = drift_arr + noise + jumps |
| elif regime_name == "mean_revert": |
| |
| raw = drift + noise + jumps |
| cumulative = np.cumsum(raw) |
| reversion = -0.05 * cumulative * dt |
| all_returns[start_idx:end_idx] = raw + reversion |
| else: |
| all_returns[start_idx:end_idx] = drift + noise + jumps |
|
|
| |
| s0 = 50000.0 |
| prices = s0 * np.exp(np.cumsum(all_returns)) |
|
|
| |
| base_volume = rng.integers(100_000_000, 500_000_000, n).astype(float) |
| abs_rets = np.abs(all_returns) |
| vol_multiplier = 1.0 + 10.0 * (abs_rets / (abs_rets.max() + 1e-10)) |
| volume = (base_volume * vol_multiplier).astype(int) |
|
|
| |
| intrabar_noise = rng.normal(0, 0.003, n) |
| high_noise = np.abs(rng.normal(0, 0.008, n)) |
| low_noise = np.abs(rng.normal(0, 0.008, n)) |
|
|
| df = pd.DataFrame({ |
| "open": prices * (1 + intrabar_noise), |
| "high": prices * (1 + high_noise), |
| "low": prices * (1 - low_noise), |
| "close": prices, |
| "volume": volume, |
| }, index=pd.date_range("2024-01-01", periods=n, freq="h")) |
|
|
| df.index.name = "date" |
| return df |
|
|
| def _make_dummy_data_from_profile( |
| self, |
| n: int = 500, |
| difficulty: str = "hard", |
| mu: float | None = None, |
| sigma: float | None = None, |
| ) -> pd.DataFrame: |
| """Generate data with explicit mu/sigma (for backward compatibility).""" |
| if mu is not None and sigma is not None: |
| rng = np.random.default_rng() |
| dt = 1 / (24 * 365) |
| Z = rng.standard_normal(n) |
| returns = np.exp((mu - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * Z) |
| s0 = 50000.0 |
| prices = s0 * np.cumprod(returns) |
| df = pd.DataFrame({ |
| "open": prices * (1 + np.random.randn(n) * 0.005), |
| "high": prices * (1 + abs(np.random.randn(n) * 0.01)), |
| "low": prices * (1 - abs(np.random.randn(n) * 0.01)), |
| "close": prices, |
| "volume": np.random.randint(100_000_000, 1_000_000_000, n), |
| }, index=pd.date_range("2024-01-01", periods=n, freq="h")) |
| df.index.name = "date" |
| return df |
| return self._generate_market_data(n=n, difficulty=difficulty) |
|
|
| def reset( |
| self, seed: Optional[int] = None, options: Optional[dict] = None |
| ) -> Tuple[np.ndarray, dict]: |
| """Reset environment to initial state.""" |
| super().reset(seed=seed) |
|
|
| self.current_step = 0 |
| self.done = False |
| self.market = MarketState(prices=self.df, current_step=0) |
| self.portfolio = PortfolioState( |
| initial_cash=self.initial_cash, cash=self.initial_cash |
| ) |
| self.risk = RiskState(peak_value=self.initial_cash) |
| self.episode_rewards = [] |
| self.episode_values = [self.initial_cash] |
| self.governance_log = [] |
| self.episode_interventions = 0 |
| self.episode_compliant_actions = 0 |
|
|
| obs = get_observation(self.market, self.portfolio, self.risk, self.ticker) |
| info = self._get_info() |
| return obs, info |
|
|
| def _check_sl_tp(self, current_price: float): |
| """Check if any open position hit SL or TP, and apply trailing updates. |
| |
| Long positions: SL triggers when price falls to SL; TP when price rises to TP. |
| Short positions: SL triggers when price rises to SL; TP when price falls to TP. |
| """ |
| atr = self.df["atr"].iloc[self.current_step] |
| |
| for ticker, position_qty in list(self.portfolio.positions.items()): |
| if abs(position_qty) < 1e-8: |
| continue |
|
|
| sl = self.portfolio.stop_losses.get(ticker) |
| tp = self.portfolio.take_profits.get(ticker) |
| |
| |
| if sl is not None: |
| if position_qty > 0: |
| trailing_level = current_price - (atr * 2.0) |
| if trailing_level > sl and current_price > self.portfolio.avg_costs.get(ticker, current_price): |
| self.portfolio.stop_losses[ticker] = trailing_level |
| elif position_qty < 0: |
| trailing_level = current_price + (atr * 2.0) |
| if trailing_level < sl and current_price < self.portfolio.avg_costs.get(ticker, current_price): |
| self.portfolio.stop_losses[ticker] = trailing_level |
| |
|
|
| exit_triggered = False |
| exit_price = current_price |
| reason = "" |
|
|
| |
| qty = self.portfolio.positions.get(self.ticker, 0.0) |
| sl = self.portfolio.stop_losses.get(self.ticker) |
| tp = self.portfolio.take_profits.get(self.ticker) |
|
|
| if qty > 0: |
| if sl is not None and current_price <= sl: |
| exit_triggered = True |
| exit_price = sl |
| reason = "stop_loss" |
| elif tp is not None and current_price >= tp: |
| exit_triggered = True |
| exit_price = tp |
| reason = "take_profit" |
|
|
| if exit_triggered: |
| revenue = qty * exit_price * (1 - self.commission) |
| self.portfolio.cash += revenue |
| self.portfolio.positions[self.ticker] = 0.0 |
| self.portfolio.avg_costs[self.ticker] = 0.0 |
| self.portfolio.stop_losses[self.ticker] = None |
| self.portfolio.take_profits[self.ticker] = None |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "sell", |
| "ticker": self.ticker, |
| "price": exit_price, |
| "quantity": qty, |
| "reason": reason |
| }) |
| self.risk.trade_count += 1 |
| return True |
|
|
| elif qty < 0: |
| abs_qty = abs(qty) |
| if sl is not None and current_price >= sl: |
| exit_triggered = True |
| exit_price = sl |
| reason = "stop_loss" |
| elif tp is not None and current_price <= tp: |
| exit_triggered = True |
| exit_price = tp |
| reason = "take_profit" |
|
|
| if exit_triggered: |
| |
| avg_cost = self.portfolio.avg_costs.get(self.ticker, exit_price) |
| cover_cost = abs_qty * exit_price * (1 + self.commission) |
| |
| margin_return = abs_qty * avg_cost |
| self.portfolio.cash += margin_return - cover_cost |
| self.portfolio.positions[self.ticker] = 0.0 |
| self.portfolio.avg_costs[self.ticker] = 0.0 |
| self.portfolio.stop_losses[self.ticker] = None |
| self.portfolio.take_profits[self.ticker] = None |
| self.portfolio.trade_durations[self.ticker] = 0 |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "cover", |
| "ticker": self.ticker, |
| "price": exit_price, |
| "quantity": abs_qty, |
| "reason": reason |
| }) |
| self.risk.trade_count += 1 |
| return True |
|
|
| return False |
|
|
| def step(self, action: Dict[str, Any]) -> Tuple[np.ndarray, float, bool, bool, dict]: |
| """ |
| Execute one step in the multi-agent governance environment. |
| |
| The environment acts as a governance framework: the agent proposes |
| an action, and internal Risk/Compliance agents may modify or |
| override it. Every intervention is logged so the agent can learn |
| to self-regulate (propose compliant actions that pass governance |
| without modification). |
| """ |
| if self.done: |
| obs = get_observation(self.market, self.portfolio, self.risk, self.ticker) |
| return obs, 0.0, True, False, self._get_info() |
|
|
| current_price = self.market.current_price() |
| prev_value = self.portfolio.total_value(current_price, self.ticker) |
| |
| |
| sl_tp_hit = self._check_sl_tp(current_price) |
|
|
| |
| direction = int(action["direction"]) |
| size = action.get("size", [0.0]) |
| if hasattr(size, "__len__"): |
| size = float(size[0]) |
| else: |
| size = float(size) |
| size = float(np.clip(size, 0.0, 1.0)) |
| |
| sl_input = float(action["sl"][0]) if "sl" in action and hasattr(action["sl"], '__len__') else float(action.get("sl", 0.0)) |
| tp_input = float(action["tp"][0]) if "tp" in action and hasattr(action["tp"], '__len__') else float(action.get("tp", 0.0)) |
|
|
| |
| |
| |
| original_direction = direction |
| original_size = size |
| original_sl = sl_input |
| original_tp = tp_input |
| interventions: list = [] |
|
|
| |
| volatility = self.df["volatility"].iloc[self.current_step] |
| |
| effective_commission = self.commission + (size * volatility * 0.25) |
| |
| |
| time_penalty = 0.0 |
| for ticker, pos_qty in list(self.portfolio.positions.items()): |
| if abs(pos_qty) > 1e-8: |
| |
| dur = self.portfolio.trade_durations.get(ticker, 0) + 1 |
| self.portfolio.trade_durations[ticker] = dur |
| |
| |
| if pos_qty < 0: |
| borrow_fee = abs(pos_qty) * current_price * 0.00005 |
| self.portfolio.cash -= borrow_fee |
| |
| |
| time_penalty += (dur * 0.0001) |
| |
|
|
| |
| |
| |
| |
| DEFAULT_SL_RATIO = 0.02 |
| if direction != 0 and sl_input <= 0: |
| if direction == 1: |
| sl_input = current_price * (1.0 - DEFAULT_SL_RATIO) |
| elif direction == 2: |
| sl_input = current_price * (1.0 + DEFAULT_SL_RATIO) |
| interventions.append({ |
| "agent": "RiskManager", |
| "type": "auto_stop_loss", |
| "reason": "No stop-loss provided β governance auto-set 2% SL", |
| "enforced_sl": float(sl_input), |
| }) |
|
|
| |
| if direction != 0 and tp_input <= 0 and sl_input > 0: |
| sl_dist = abs(current_price - sl_input) |
| if direction == 1: |
| tp_input = current_price + sl_dist * 2.0 |
| elif direction == 2: |
| tp_input = current_price - sl_dist * 2.0 |
| interventions.append({ |
| "agent": "RiskManager", |
| "type": "auto_take_profit", |
| "reason": "No take-profit provided β governance auto-set 2:1 RRR", |
| "enforced_tp": float(tp_input), |
| }) |
|
|
| |
| |
| position_qty = self.portfolio.positions.get(self.ticker, 0.0) |
| is_opening = (direction == 1 and position_qty >= 0) or (direction == 2 and position_qty <= 0) |
|
|
| HARD_RISK_CAP = 0.01 |
| if direction != 0 and sl_input > 0 and is_opening: |
| portfolio_value = self.portfolio.total_value(current_price, self.ticker) |
| sl_distance = abs(current_price - sl_input) |
| if sl_distance > 1e-10: |
| max_loss = portfolio_value * HARD_RISK_CAP |
| max_qty = max_loss / sl_distance |
| max_size = (max_qty * current_price) / (portfolio_value + 1e-10) |
| if size > max_size: |
| interventions.append({ |
| "agent": "RiskManager", |
| "type": "size_clamp", |
| "original_size": float(size), |
| "enforced_size": float(max_size), |
| "reason": f"Position size {size:.2%} exceeded Kelly 1% risk cap β clamped to {max_size:.2%}", |
| }) |
| size = min(size, max_size) |
|
|
| traded = False |
| step_trade_count = int(sl_tp_hit) |
|
|
| if direction == 1: |
| position_qty = self.portfolio.positions.get(self.ticker, 0.0) |
| |
| if position_qty < 0: |
| |
| abs_qty = abs(position_qty) |
| cover_qty = min(abs_qty, abs_qty * size) if size < 1.0 else abs_qty |
| avg_cost = self.portfolio.avg_costs.get(self.ticker, current_price) |
| cover_cost = cover_qty * current_price * (1 + self.commission) |
| margin_return = cover_qty * avg_cost |
| self.portfolio.cash += margin_return - cover_cost |
| remaining = position_qty + cover_qty |
| if abs(remaining) <= 1e-8: |
| remaining = 0.0 |
| self.portfolio.avg_costs[self.ticker] = 0.0 |
| self.portfolio.stop_losses[self.ticker] = None |
| self.portfolio.take_profits[self.ticker] = None |
| self.portfolio.trade_durations[self.ticker] = 0 |
| self.portfolio.positions[self.ticker] = remaining |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "cover", |
| "ticker": self.ticker, |
| "price": current_price, |
| "quantity": cover_qty, |
| }) |
| traded = True |
| else: |
| |
| trade_qty = (self.portfolio.cash * size) / (current_price * (1 + self.commission) + 1e-10) |
| if trade_qty > 1e-8: |
| cost = trade_qty * current_price * (1 + self.commission) |
| self.portfolio.cash -= cost |
| prev_qty = position_qty |
| prev_avg_cost = self.portfolio.avg_costs.get(self.ticker, 0.0) |
| new_qty = prev_qty + trade_qty |
| new_avg_cost = ( |
| ((prev_qty * prev_avg_cost) + (trade_qty * current_price)) / (new_qty + 1e-10) |
| ) |
| self.portfolio.positions[self.ticker] = new_qty |
| self.portfolio.avg_costs[self.ticker] = new_avg_cost |
| |
| |
| if sl_input > 0: self.portfolio.stop_losses[self.ticker] = sl_input |
| if tp_input > 0: self.portfolio.take_profits[self.ticker] = tp_input |
| |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "buy", |
| "ticker": self.ticker, |
| "price": current_price, |
| "quantity": trade_qty, |
| }) |
| traded = True |
|
|
| elif direction == 2: |
| position_qty = self.portfolio.positions.get(self.ticker, 0.0) |
| |
| if position_qty > 0: |
| |
| sell_qty = min(position_qty, position_qty * size) |
| if sell_qty > 1e-8: |
| revenue = sell_qty * current_price * (1 - self.commission) |
| self.portfolio.cash += revenue |
| remaining_qty = position_qty - sell_qty |
| if remaining_qty <= 1e-8: |
| remaining_qty = 0.0 |
| self.portfolio.positions[self.ticker] = remaining_qty |
| |
| |
| if remaining_qty == 0.0: |
| self.portfolio.avg_costs[self.ticker] = 0.0 |
| self.portfolio.stop_losses[self.ticker] = None |
| self.portfolio.take_profits[self.ticker] = None |
|
|
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "sell", |
| "ticker": self.ticker, |
| "price": current_price, |
| "quantity": sell_qty, |
| }) |
| traded = True |
| else: |
| |
| |
| margin_available = self.portfolio.cash * size |
| short_qty = margin_available / (current_price * (1 + self.commission) + 1e-10) |
| if short_qty > 1e-8: |
| margin_cost = short_qty * current_price |
| self.portfolio.cash -= margin_cost |
| prev_qty = abs(position_qty) |
| prev_avg_cost = self.portfolio.avg_costs.get(self.ticker, 0.0) |
| new_qty = prev_qty + short_qty |
| new_avg_cost = ( |
| ((prev_qty * prev_avg_cost) + (short_qty * current_price)) / (new_qty + 1e-10) |
| ) |
| self.portfolio.positions[self.ticker] = -(new_qty) |
| self.portfolio.avg_costs[self.ticker] = new_avg_cost |
| |
| |
| if sl_input > 0: self.portfolio.stop_losses[self.ticker] = sl_input |
| if tp_input > 0: self.portfolio.take_profits[self.ticker] = tp_input |
| |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "short", |
| "ticker": self.ticker, |
| "price": current_price, |
| "quantity": short_qty, |
| }) |
| traded = True |
|
|
| if traded: |
| self.risk.trade_count += 1 |
| step_trade_count += 1 |
|
|
| |
| self.current_step += 1 |
| self.market.current_step = self.current_step |
|
|
| |
| new_price = self.market.current_price() |
| new_value = self.portfolio.total_value(new_price, self.ticker) |
| self.risk.update(new_value) |
| self.episode_values.append(new_value) |
|
|
| |
| profit = (new_value - prev_value) / (self.initial_cash + 1e-10) |
| price_trend = (new_price - current_price) / (current_price + 1e-10) |
| raw_r = compute_raw_reward( |
| profit=profit, |
| drawdown=self.risk.current_drawdown, |
| volatility=self.risk.return_volatility(), |
| sharpe=self.risk.sharpe_ratio(), |
| trade_count=step_trade_count, |
| weights=self.reward_weights, |
| direction=direction, |
| price_trend=price_trend, |
| ) |
| |
| |
| step_reward = raw_r |
| |
| |
| step_reward -= time_penalty |
|
|
| |
| |
| |
| |
| |
| n_interventions = len(interventions) |
| if n_interventions == 0 and direction != 0: |
| step_reward += 0.15 |
| self.episode_compliant_actions += 1 |
| elif n_interventions > 0: |
| step_reward -= 0.05 * n_interventions |
| self.episode_interventions += n_interventions |
|
|
| reward = normalize_reward(step_reward) |
| self.episode_rewards.append(reward) |
|
|
| |
| terminated = self.current_step >= self.max_steps |
| truncated = False |
| if new_value < self.initial_cash * 0.1: |
| terminated = True |
| |
| position_qty = self.portfolio.positions.get(self.ticker, 0.0) |
| if position_qty < 0: |
| short_pnl = self.portfolio.unrealized_pnl(new_price, self.ticker) |
| if short_pnl < -(self.initial_cash * self.margin_call_threshold): |
| |
| abs_qty = abs(position_qty) |
| avg_cost = self.portfolio.avg_costs.get(self.ticker, new_price) |
| cover_cost = abs_qty * new_price * (1 + self.commission) |
| margin_return = abs_qty * avg_cost |
| self.portfolio.cash += margin_return - cover_cost |
| self.portfolio.positions[self.ticker] = 0.0 |
| self.portfolio.avg_costs[self.ticker] = 0.0 |
| self.portfolio.stop_losses[self.ticker] = None |
| self.portfolio.take_profits[self.ticker] = None |
| self.portfolio.trade_history.append({ |
| "step": self.current_step, |
| "action": "margin_call", |
| "ticker": self.ticker, |
| "price": new_price, |
| "quantity": abs_qty, |
| "reason": "margin_call", |
| }) |
| self.risk.trade_count += 1 |
| interventions.append({ |
| "agent": "ComplianceOfficer", |
| "type": "margin_call", |
| "reason": f"Unrealized short loss exceeded {self.margin_call_threshold:.0%} threshold β forced liquidation", |
| }) |
| self.episode_interventions += 1 |
| terminated = True |
| if terminated: |
| self.done = True |
|
|
| |
| |
| |
| governance_record = { |
| "step": self.current_step, |
| "proposed": { |
| "direction": original_direction, |
| "size": original_size, |
| "sl": original_sl, |
| "tp": original_tp, |
| }, |
| "executed": { |
| "direction": direction, |
| "size": size, |
| "sl": sl_input, |
| "tp": tp_input, |
| }, |
| "interventions": interventions, |
| "was_compliant": len(interventions) == 0, |
| } |
| self.governance_log.append(governance_record) |
|
|
| obs = get_observation(self.market, self.portfolio, self.risk, self.ticker) |
| info = self._get_info() |
| info["governance"] = governance_record |
| info["governance_stats"] = { |
| "episode_interventions": self.episode_interventions, |
| "episode_compliant_actions": self.episode_compliant_actions, |
| "compliance_rate": ( |
| self.episode_compliant_actions / max(self.current_step, 1) |
| ), |
| } |
| return obs, reward, terminated, truncated, info |
|
|
| def _get_info(self) -> dict: |
| """Return diagnostic info dict.""" |
| current_price = self.market.current_price() |
| total_value = self.portfolio.total_value(current_price, self.ticker) |
|
|
| |
| profit_ratio = (total_value - self.initial_cash) / (self.initial_cash + 1e-10) |
| normalized_profit = np.clip((profit_ratio + 1.0) / 2.0, 0.0, 1.0) |
| normalized_sharpe = np.clip((self.risk.sharpe_ratio() + 2.0) / 4.0, 0.0, 1.0) |
|
|
| if len(self.episode_values) > 1: |
| vals = np.array(self.episode_values) |
| returns = np.diff(vals) / (vals[:-1] + 1e-10) |
| consistency = np.mean(returns > 0) |
| else: |
| consistency = 0.5 |
|
|
| grade = compute_grade({ |
| "profit": float(normalized_profit), |
| "sharpe": float(normalized_sharpe), |
| "drawdown": float(self.risk.max_drawdown), |
| "consistency": float(consistency), |
| }) |
|
|
| return { |
| "step": self.current_step, |
| "portfolio_value": float(total_value), |
| "cash": float(self.portfolio.cash), |
| "positions": {ticker: float(qty) for ticker, qty in self.portfolio.positions.items()}, |
| "pnl": float(total_value - self.initial_cash), |
| "pnl_pct": float(profit_ratio), |
| "max_drawdown": float(self.risk.max_drawdown), |
| "sharpe_ratio": float(self.risk.sharpe_ratio()), |
| "normalized_profit": float(normalized_profit), |
| "normalized_sharpe": float(normalized_sharpe), |
| "normalized_drawdown_inverse": float(1.0 - np.clip(self.risk.max_drawdown, 0.0, 1.0)), |
| "consistency": float(consistency), |
| "trade_count": self.risk.trade_count, |
| "grade": float(grade), |
| "episode_reward_sum": float(sum(self.episode_rewards)) if self.episode_rewards else 0.0, |
| "episode_reward_mean": float(np.mean(self.episode_rewards)) if self.episode_rewards else 0.0, |
| } |
|
|
| def sample_action(self) -> dict: |
| """Sample a random action (convenience method).""" |
| action_space: Any = self.action_space |
| return { |
| "direction": action_space["direction"].sample(), |
| "size": action_space["size"].sample(), |
| "sl": np.array([0.0], dtype=np.float32), |
| "tp": np.array([0.0], dtype=np.float32), |
| } |
|
|