QuantHive / _tmp_old_env_test /env /multi_agent_env.py
ARKAISW's picture
Update latest changes
aec0295
"""
Multi-Agent Trading Environment using PettingZoo AEC API.
Three independent RL agents operate in a decentralized governance framework:
- risk_manager_0: Rewarded for restricting dangerous trades. Penalized when Trader loses.
- portfolio_manager_0: Oversees capital allocation. Rewarded for portfolio growth + drawdown control.
- trader_0: Rewarded purely for PnL. Sees Risk/PM constraints as observations.
The AEC (Agent-Environment Cycle) loop alternates agent turns each step.
Agent Negotiation: Each agent's *output message* (constraints, allocations) becomes
part of the next agent's observation, creating an emergent negotiation dynamic.
"""
from __future__ import annotations
import functools
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
import pandas as pd
from gymnasium import spaces
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from env.state import MarketState, PortfolioState, RiskState, get_observation
from env.reward import compute_raw_reward, normalize_reward, compute_grade
from utils.indicators import compute_indicators
# ─── Agent IDs ─────────────────────────────────────────────────────────────────
RISK_MANAGER = "risk_manager_0"
PORTFOLIO_MGR = "portfolio_manager_0"
TRADER = "trader_0"
ALL_AGENTS = [RISK_MANAGER, PORTFOLIO_MGR, TRADER]
# ─── Observation Sizes ──────────────────────────────────────────────────────────
# Base market+portfolio+risk obs size: 14 + 5 + 5 = 24
BASE_OBS_SIZE = 24
# Risk Manager message appended to PM and Trader observations: [size_limit, allow_new, force_reduce]
RM_MSG_SIZE = 3
# PM message appended to Trader observations: [cap_allocation, is_override_signaled]
PM_MSG_SIZE = 2
class MultiAgentTradingEnv(AECEnv):
"""
A PettingZoo AEC environment for decentralized multi-agent trading governance.
Turn order per step: risk_manager_0 → portfolio_manager_0 → trader_0
On each full cycle, the market advances by one candle.
Observations:
risk_manager_0: base_obs (24,)
portfolio_mgr_0: base_obs + rm_message (24 + 3 = 27,)
trader_0: base_obs + rm_message + pm_message (24 + 3 + 2 = 29,)
Actions:
risk_manager_0: Box(3,) — [size_limit, allow_new_positions, force_reduce] — continuous
portfolio_mgr_0: Box(2,) — [capital_allocation_fraction, override_flag] — continuous
trader_0: Dict — direction (Discrete 3), size (Box 1), sl (Box 1), tp (Box 1)
"""
metadata = {
"render_modes": ["human", "ansi"],
"name": "multi_agent_trading_v1",
"is_parallelizable": False,
}
def __init__(
self,
df: Optional[pd.DataFrame] = None,
initial_cash: float = 100_000.0,
ticker: str = "default",
commission: float = 0.001,
max_steps: Optional[int] = None,
difficulty: str = "hard",
):
super().__init__()
self.difficulty = difficulty
if df is None:
df = self._make_dummy_data(difficulty=difficulty)
self.raw_df = df.copy()
self.df = compute_indicators(df)
self.ticker = ticker
self.initial_cash = initial_cash
self.commission = commission
self.max_steps = max_steps or (len(self.df) - 1)
# ── PettingZoo required attributes ──────────────────────────────────
self.agents = ALL_AGENTS[:]
self.possible_agents = ALL_AGENTS[:]
# ── Observation spaces ──────────────────────────────────────────────
self.observation_spaces = {
RISK_MANAGER: spaces.Box(low=-np.inf, high=np.inf,
shape=(BASE_OBS_SIZE,), dtype=np.float32),
PORTFOLIO_MGR: spaces.Box(low=-np.inf, high=np.inf,
shape=(BASE_OBS_SIZE + RM_MSG_SIZE,), dtype=np.float32),
TRADER: spaces.Box(low=-np.inf, high=np.inf,
shape=(BASE_OBS_SIZE + RM_MSG_SIZE + PM_MSG_SIZE,), dtype=np.float32),
}
# ── Action spaces ───────────────────────────────────────────────────
self.action_spaces = {
RISK_MANAGER: spaces.Box(low=np.array([0.01, 0.0, 0.0], dtype=np.float32),
high=np.array([1.0, 1.0, 1.0], dtype=np.float32),
shape=(3,), dtype=np.float32),
PORTFOLIO_MGR: spaces.Box(low=np.array([0.0, 0.0], dtype=np.float32),
high=np.array([1.0, 1.0], dtype=np.float32),
shape=(2,), dtype=np.float32),
TRADER: spaces.Dict({
"direction": spaces.Discrete(3), # 0=Hold, 1=Buy, 2=Sell/Short
"size": spaces.Box(0.0, 1.0, shape=(1,), dtype=np.float32),
"sl": spaces.Box(0.0, np.inf, shape=(1,), dtype=np.float32),
"tp": spaces.Box(0.0, np.inf, shape=(1,), dtype=np.float32),
}),
}
# ── Internal state (reset before first use) ─────────────────────────
self._agent_selector = agent_selector(ALL_AGENTS)
self._reset_internal_state()
# ───────────────────────────────────────────────────────────────────────────
# PettingZoo required API
# ───────────────────────────────────────────────────────────────────────────
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
if seed is not None:
np.random.seed(seed)
self.agents = ALL_AGENTS[:]
self._agent_selector.reinit(ALL_AGENTS)
self._reset_internal_state()
self._generate_observations()
self.agent_selection = self._agent_selector.reset()
# Zero-fill all rewards/terminations/truncations/infos for PZ compliance
self.rewards = {ag: 0.0 for ag in self.agents}
self._cumulative_rewards = {ag: 0.0 for ag in self.agents}
self.terminations = {ag: False for ag in self.agents}
self.truncations = {ag: False for ag in self.agents}
self.infos = {ag: {} for ag in self.agents}
def step(self, action):
"""Process one agent's action in the AEC turn order."""
agent = self.agent_selection
if self.terminations[agent] or self.truncations[agent]:
# Dead-step: PZ compliance requires we handle this
self._was_dead_step(action)
return
# ── Route action to the correct handler ────────────────────────────
if agent == RISK_MANAGER:
self._step_risk_manager(action)
elif agent == PORTFOLIO_MGR:
self._step_portfolio_manager(action)
elif agent == TRADER:
self._step_trader(action)
# After the trader acts, the market cycle is complete → advance step
self._advance_market()
# Advance to next agent
self._accumulate_rewards()
self.agent_selection = self._agent_selector.next()
def observe(self, agent: str) -> np.ndarray:
return self._observations[agent]
def observation_space(self, agent: str) -> spaces.Space:
return self.observation_spaces[agent]
def action_space(self, agent: str) -> spaces.Space:
return self.action_spaces[agent]
def render(self):
price = self._market.current_price()
val = self._portfolio.total_value(price, self.ticker)
print(
f"Step {self._current_step:4d} | "
f"Price: {price:10,.2f} | "
f"Value: {val:12,.2f} | "
f"Agent: {self.agent_selection}"
)
def close(self):
pass
# ───────────────────────────────────────────────────────────────────────────
# Per-Agent Step Handlers
# ───────────────────────────────────────────────────────────────────────────
def _step_risk_manager(self, action: np.ndarray):
"""
Risk Manager decides governance constraints.
action = [size_limit (0-1), allow_new_positions (0-1), force_reduce (0-1)]
Reward logic (adversarial):
+0.2 for restricting a dangerous action (high drawdown → low size_limit)
-0.3 for each $ portfolio value LOST since it last acted (it shares downside pain)
+0.05 for being compliant (not overriding a healthy portfolio)
"""
size_limit, allow_new_raw, force_reduce_raw = float(action[0]), float(action[1]), float(action[2])
allow_new = allow_new_raw > 0.5
force_reduce = force_reduce_raw > 0.5
# Store message to pass to PM and Trader
self._rm_message = np.array(
[size_limit, float(allow_new), float(force_reduce)], dtype=np.float32
)
# Compute RM's step reward
drawdown = self._risk.current_drawdown
rm_reward = 0.0
# Rewarded for restricting size when portfolio is underwater
if drawdown > 0.10 and size_limit < 0.30:
rm_reward += 0.20 # RM correctly capped risk during drawdown
if force_reduce and drawdown > 0.20:
rm_reward += 0.15 # Correct force-reduce under severe drawdown
# Penalize for allowing reckless sizing when at risk
if drawdown > 0.15 and size_limit > 0.70:
rm_reward -= 0.20 # RM being reckless during drawdown
# Shared downside: RM suffers when portfolio loses money this step
prev_val = self._prev_portfolio_value
curr_price = self._market.current_price()
curr_val = self._portfolio.total_value(curr_price, self.ticker)
portfolio_delta_pct = (curr_val - prev_val) / (self.initial_cash + 1e-10)
rm_reward += min(portfolio_delta_pct * 0.5, 0.0) # Only downside pain
self._pending_rewards[RISK_MANAGER] = rm_reward
def _step_portfolio_manager(self, action: np.ndarray):
"""
Portfolio Manager decides capital allocation and optionally signals override.
action = [capital_allocation (0-1), override_strength (0-1)]
Reward logic:
Aligned with overall portfolio performance (grade-based).
Penalized for excessive overrides that don't improve outcomes.
"""
cap_alloc = float(np.clip(action[0], 0.0, 1.0))
override_s = float(action[1])
self._pm_message = np.array([cap_alloc, override_s], dtype=np.float32)
self._pm_capital_allocation = cap_alloc
self._pm_override_strength = override_s
# PM reward deferred to after trader executes (knows the outcome)
self._pending_rewards[PORTFOLIO_MGR] = 0.0 # Will be updated in _advance_market
def _step_trader(self, action: Dict):
"""
Trader proposes a trade using the constrained action space.
Receives both RM and PM guidance in its observation.
Reward logic (adversarial):
Rewarded purely on PnL.
Penalized when governance overrides (RM size cap, PM force-close) are triggered.
Bonus for proposing compliant actions that need no governance intervention.
"""
direction = int(action["direction"])
size_raw = float(action["size"][0]) if hasattr(action["size"], "__len__") else float(action["size"])
sl_input = float(action["sl"][0]) if hasattr(action["sl"], "__len__") else float(action.get("sl", 0.0))
tp_input = float(action["tp"][0]) if hasattr(action["tp"], "__len__") else float(action.get("tp", 0.0))
size = float(np.clip(size_raw, 0.0, 1.0))
# ── Apply Risk Manager constraints ──────────────────────────────────
rm_size_limit = float(self._rm_message[0])
rm_allow_new = bool(self._rm_message[1] > 0.5)
rm_force_reduce = bool(self._rm_message[2] > 0.5)
interventions: List[Dict] = []
if direction != 0 and size > rm_size_limit:
interventions.append({
"agent": "RiskManager",
"type": "size_clamp",
"original_size": size,
"enforced_size": rm_size_limit,
})
size = rm_size_limit
if direction in (1, 2) and not rm_allow_new:
interventions.append({
"agent": "RiskManager",
"type": "no_new_positions",
"reason": "RM blocked new positions during drawdown",
})
direction = 0 # Force hold
if rm_force_reduce and direction == 1:
interventions.append({
"agent": "RiskManager",
"type": "force_reduce",
"reason": "RM signaling to reduce longs",
})
direction = 2 # Flip to reduce
# ── Apply Portfolio Manager override ────────────────────────────────
cap_alloc = self._pm_capital_allocation
if direction != 0 and size > cap_alloc:
interventions.append({
"agent": "PortfolioManager",
"type": "capital_cap",
"original_size": size,
"enforced_size": cap_alloc,
})
size = min(size, cap_alloc)
# PM strong override_strength >0.7 means PM wants to force hold
if self._pm_override_strength > 0.7 and direction != 0:
interventions.append({
"agent": "PortfolioManager",
"type": "pm_veto",
"reason": "PM vetoed trade (insufficient conviction signal)",
})
direction = 0
# ── Auto SL/TP (governance baseline) ───────────────────────────────
current_price = self._market.current_price()
DEFAULT_SL = 0.02
if direction != 0 and sl_input <= 0:
if direction == 1:
sl_input = current_price * (1 - DEFAULT_SL)
else:
sl_input = current_price * (1 + DEFAULT_SL)
interventions.append({"agent": "RiskManager", "type": "auto_sl"})
if direction != 0 and tp_input <= 0 and sl_input > 0:
sl_dist = abs(current_price - sl_input)
tp_input = (current_price + sl_dist * 2.0) if direction == 1 else (current_price - sl_dist * 2.0)
interventions.append({"agent": "RiskManager", "type": "auto_tp"})
# Store pending trade for market advance
self._pending_trade = {
"direction": direction,
"size": size,
"sl": sl_input,
"tp": tp_input,
"interventions": interventions,
"original_direction": int(action["direction"]),
"original_size": size_raw,
}
# Compliance reward/penalty — will be finalized after market moves
n_interventions = len(interventions)
compliance_bonus = 0.15 if (n_interventions == 0 and direction != 0) else (-0.05 * n_interventions)
self._trader_compliance_bonus = compliance_bonus
# ───────────────────────────────────────────────────────────────────────────
# Market Advance (called after Trader acts)
# ───────────────────────────────────────────────────────────────────────────
def _advance_market(self):
"""Execute the pending trade, advance market, compute final rewards."""
if not hasattr(self, "_pending_trade") or self._pending_trade is None:
# No trade was staged (edge case)
self._pending_trade = {"direction": 0, "size": 0.0, "sl": 0.0, "tp": 0.0,
"interventions": [], "original_direction": 0, "original_size": 0.0}
trade = self._pending_trade
direction = trade["direction"]
size = trade["size"]
sl_input = trade["sl"]
tp_input = trade["tp"]
current_price = self._market.current_price()
prev_value = self._portfolio.total_value(current_price, self.ticker)
# Check SL/TP before executing new action
self._check_sl_tp(current_price)
# Execute trade in portfolio state
traded = self._execute_trade(direction, size, sl_input, tp_input, current_price)
# Advance market step
self._current_step += 1
self._market.current_step = self._current_step
# Update risk state
new_price = self._market.current_price() if self._current_step < len(self.df) else current_price
new_value = self._portfolio.total_value(new_price, self.ticker)
self._risk.update(new_value)
self._episode_values.append(new_value)
# Compute portfolio delta
profit = (new_value - prev_value) / (self.initial_cash + 1e-10)
price_trend = (new_price - current_price) / (current_price + 1e-10)
raw_r = compute_raw_reward(
profit=profit,
drawdown=self._risk.current_drawdown,
volatility=self._risk.return_volatility(),
sharpe=self._risk.sharpe_ratio(),
trade_count=int(traded),
direction=direction,
price_trend=price_trend,
)
# ── Trader reward ───────────────────────────────────────────────────
trader_reward = normalize_reward(raw_r + self._trader_compliance_bonus)
self._pending_rewards[TRADER] = float(trader_reward)
self._episode_rewards.append(trader_reward)
# ── PM reward: grade-based portfolio performance ────────────────────
normalized_profit = float(np.clip((profit + 1.0) / 2.0, 0.0, 1.0))
normalized_sharpe = float(np.clip((self._risk.sharpe_ratio() + 2.0) / 4.0, 0.0, 1.0))
consistency = float(np.mean(np.diff(np.array(self._episode_values)) > 0)) if len(self._episode_values) > 2 else 0.5
grade = float(compute_grade({
"profit": normalized_profit,
"sharpe": normalized_sharpe,
"drawdown": float(self._risk.max_drawdown),
"consistency": consistency,
}))
pm_reward = (grade - 0.5) * 0.4 # Grade in [0,1] → centered reward
if self._risk.max_drawdown > 0.20:
pm_reward -= 0.15 # PM penalized for deep drawdown
self._pending_rewards[PORTFOLIO_MGR] = float(pm_reward)
# ── RM: shared downside with final portfolio value ──────────────────
# We ADD to whatever penalty was already set in _step_risk_manager
rm_pain = min(profit * 0.5, 0.0) # Only share downside
self._pending_rewards[RISK_MANAGER] = float(self._pending_rewards.get(RISK_MANAGER, 0.0) + rm_pain)
# ── Termination Check ───────────────────────────────────────────────
terminated = (
self._current_step >= self.max_steps or
new_value < self.initial_cash * 0.10 # Blowup condition
)
if terminated:
for ag in self.agents:
self.terminations[ag] = True
# Rebuild observations for the next cycle
self._generate_observations()
# Update governance log
gov_record = {
"step": self._current_step,
"proposed": {"direction": trade["original_direction"], "size": trade["original_size"]},
"executed": {"direction": direction, "size": size, "sl": sl_input, "tp": tp_input},
"interventions": trade["interventions"],
"was_compliant": len(trade["interventions"]) == 0,
"rm_message": self._rm_message.tolist(),
"pm_message": self._pm_message.tolist(),
}
self._governance_log.append(gov_record)
# Expose info for the Trader (most info-rich agent)
self.infos[TRADER] = {
"step": self._current_step,
"portfolio_value": float(new_value),
"cash": float(self._portfolio.cash),
"pnl": float(new_value - self.initial_cash),
"pnl_pct": float(profit),
"max_drawdown": float(self._risk.max_drawdown),
"sharpe_ratio": float(self._risk.sharpe_ratio()),
"grade": grade,
"governance": gov_record,
"rewards": dict(self._pending_rewards),
}
self.infos[RISK_MANAGER] = {"step": self._current_step, "drawdown": float(self._risk.max_drawdown)}
self.infos[PORTFOLIO_MGR] = {"step": self._current_step, "grade": grade}
self._prev_portfolio_value = new_value
self._pending_trade = None
# ───────────────────────────────────────────────────────────────────────────
# Observation Generation
# ───────────────────────────────────────────────────────────────────────────
def _generate_observations(self):
base_obs = get_observation(self._market, self._portfolio, self._risk, self.ticker)
self._observations = {
RISK_MANAGER: base_obs.copy(),
PORTFOLIO_MGR: np.concatenate([base_obs, self._rm_message]),
TRADER: np.concatenate([base_obs, self._rm_message, self._pm_message]),
}
# ───────────────────────────────────────────────────────────────────────────
# Internal Helpers
# ───────────────────────────────────────────────────────────────────────────
def _reset_internal_state(self):
self._market = MarketState(prices=self.df, current_step=0)
self._portfolio = PortfolioState(initial_cash=self.initial_cash, cash=self.initial_cash)
self._risk = RiskState(peak_value=self.initial_cash)
self._current_step = 0
# Inter-agent messages (start neutral)
self._rm_message = np.array([0.5, 1.0, 0.0], dtype=np.float32) # [size_limit=50%, allow=yes, force_reduce=no]
self._pm_message = np.array([0.5, 0.0], dtype=np.float32) # [cap_alloc=50%, override_strength=0]
self._pm_capital_allocation = 0.5
self._pm_override_strength = 0.0
self._pending_trade = None
self._pending_rewards = {ag: 0.0 for ag in ALL_AGENTS}
self._trader_compliance_bonus = 0.0
self._episode_values = [self.initial_cash]
self._episode_rewards = []
self._governance_log: List[Dict] = []
self._prev_portfolio_value = self.initial_cash
# PZ state dictionaries
self._observations = {ag: np.zeros(self.observation_spaces[ag].shape, dtype=np.float32)
for ag in ALL_AGENTS}
def _accumulate_rewards(self):
"""Move pending rewards into PZ cumulative reward tracking."""
for ag in self.agents:
self.rewards[ag] = self._pending_rewards.get(ag, 0.0)
self._cumulative_rewards[ag] += self.rewards[ag]
def _execute_trade(
self, direction: int, size: float, sl: float, tp: float, current_price: float
) -> bool:
"""Execute trade on portfolio state. Returns True if a trade was made."""
traded = False
if direction == 1: # BUY / Cover Short
pos = self._portfolio.positions.get(self.ticker, 0.0)
if pos < 0:
# Cover short
abs_qty = abs(pos)
cover_cost = abs_qty * current_price * (1 + self.commission)
margin_return = abs_qty * self._portfolio.avg_costs.get(self.ticker, current_price)
self._portfolio.cash += margin_return - cover_cost
self._portfolio.positions[self.ticker] = 0.0
self._portfolio.avg_costs[self.ticker] = 0.0
self._portfolio.stop_losses[self.ticker] = None
self._portfolio.take_profits[self.ticker] = None
traded = True
else:
trade_qty = (self._portfolio.cash * size) / (current_price * (1 + self.commission) + 1e-10)
if trade_qty > 1e-8:
cost = trade_qty * current_price * (1 + self.commission)
self._portfolio.cash -= cost
prev_qty = pos
prev_avg = self._portfolio.avg_costs.get(self.ticker, 0.0)
new_qty = prev_qty + trade_qty
new_avg = ((prev_qty * prev_avg) + (trade_qty * current_price)) / (new_qty + 1e-10)
self._portfolio.positions[self.ticker] = new_qty
self._portfolio.avg_costs[self.ticker] = new_avg
if sl > 0: self._portfolio.stop_losses[self.ticker] = sl
if tp > 0: self._portfolio.take_profits[self.ticker] = tp
traded = True
elif direction == 2: # SELL / Short
pos = self._portfolio.positions.get(self.ticker, 0.0)
if pos > 0:
sell_qty = min(pos, pos * size)
if sell_qty > 1e-8:
revenue = sell_qty * current_price * (1 - self.commission)
self._portfolio.cash += revenue
remaining = pos - sell_qty
self._portfolio.positions[self.ticker] = max(remaining, 0.0)
if remaining <= 1e-8:
self._portfolio.avg_costs[self.ticker] = 0.0
self._portfolio.stop_losses[self.ticker] = None
self._portfolio.take_profits[self.ticker] = None
traded = True
else:
margin = self._portfolio.cash * size
short_qty = margin / (current_price * (1 + self.commission) + 1e-10)
if short_qty > 1e-8:
self._portfolio.cash -= short_qty * current_price
prev_qty = abs(pos)
prev_avg = self._portfolio.avg_costs.get(self.ticker, 0.0)
new_qty = prev_qty + short_qty
new_avg = ((prev_qty * prev_avg) + (short_qty * current_price)) / (new_qty + 1e-10)
self._portfolio.positions[self.ticker] = -new_qty
self._portfolio.avg_costs[self.ticker] = new_avg
if sl > 0: self._portfolio.stop_losses[self.ticker] = sl
if tp > 0: self._portfolio.take_profits[self.ticker] = tp
traded = True
if traded:
self._risk.trade_count += 1
return traded
def _check_sl_tp(self, current_price: float):
"""Check and execute SL/TP orders."""
ticker = self.ticker
pos_qty = self._portfolio.positions.get(ticker, 0.0)
sl = self._portfolio.stop_losses.get(ticker)
tp = self._portfolio.take_profits.get(ticker)
if abs(pos_qty) < 1e-8:
return
hit = False
if pos_qty > 0:
if sl and current_price <= sl: hit = True
if tp and current_price >= tp: hit = True
if hit:
revenue = pos_qty * current_price * (1 - self.commission)
self._portfolio.cash += revenue
self._portfolio.positions[ticker] = 0.0
self._portfolio.avg_costs[ticker] = 0.0
self._portfolio.stop_losses[ticker] = None
self._portfolio.take_profits[ticker] = None
self._risk.trade_count += 1
elif pos_qty < 0:
abs_qty = abs(pos_qty)
if sl and current_price >= sl: hit = True
if tp and current_price <= tp: hit = True
if hit:
avg_cost = self._portfolio.avg_costs.get(ticker, current_price)
cover_cost = abs_qty * current_price * (1 + self.commission)
margin_ret = abs_qty * avg_cost
self._portfolio.cash += margin_ret - cover_cost
self._portfolio.positions[ticker] = 0.0
self._portfolio.avg_costs[ticker] = 0.0
self._portfolio.stop_losses[ticker] = None
self._portfolio.take_profits[ticker] = None
self._risk.trade_count += 1
def _make_dummy_data(self, n: int = 500, difficulty: str = "hard") -> pd.DataFrame:
"""Delegate to TradingEnv's proven synthetic data generator."""
from env.trading_env import TradingEnv
tmp = TradingEnv.__new__(TradingEnv)
return tmp._generate_market_data(n=n, difficulty=difficulty)
# ───────────────────────────────────────────────────────────────────────────
# Convenience
# ───────────────────────────────────────────────────────────────────────────
@functools.lru_cache(maxsize=None)
def _obs_space(self, agent: str) -> spaces.Space:
return self.observation_spaces[agent]
@functools.lru_cache(maxsize=None)
def _act_space(self, agent: str) -> spaces.Space:
return self.action_spaces[agent]
def state(self) -> Dict:
"""Return the full shared environment state (for visualization)."""
price = self._market.current_price()
return {
"step": self._current_step,
"price": float(price),
"portfolio_value": float(self._portfolio.total_value(price, self.ticker)),
"cash": float(self._portfolio.cash),
"positions": {k: float(v) for k, v in self._portfolio.positions.items()},
"max_drawdown": float(self._risk.max_drawdown),
"sharpe_ratio": float(self._risk.sharpe_ratio()),
"trade_count": self._risk.trade_count,
"rm_message": self._rm_message.tolist(),
"pm_message": self._pm_message.tolist(),
"governance_log": self._governance_log[-10:],
}