File size: 9,046 Bytes
aec0295 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | """
State management for the trading environment.
Defines MarketState, PortfolioState, RiskState, and observation construction.
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
@dataclass
class MarketState:
"""Holds current market data and technical indicators for the observation."""
prices: pd.DataFrame # OHLCV + indicators dataframe
current_step: int = 0
def current_row(self) -> pd.Series:
return self.prices.iloc[self.current_step]
def current_price(self) -> float:
return float(self.prices.iloc[self.current_step]["close"])
def observation_vector(self) -> np.ndarray:
"""Return a normalized vector of market features."""
row = self.current_row()
features = []
# Normalized price features (relative to close)
close = row["close"]
for col in ["open", "high", "low", "close"]:
features.append(row[col] / (close + 1e-10))
# Volume — log-normalize
features.append(np.log1p(row["volume"]) / 20.0)
# RSI normalized to [0, 1]
features.append(row["rsi"] / 100.0)
# EMAs relative to close
features.append(row["ema_20"] / (close + 1e-10))
features.append(row["ema_50"] / (close + 1e-10))
# MACD features normalized
features.append(np.tanh(row["macd"] / (close + 1e-10) * 100))
features.append(np.tanh(row["macd_signal"] / (close + 1e-10) * 100))
features.append(np.tanh(row["macd_hist"] / (close + 1e-10) * 100))
# Bollinger Band position: where is price within bands
bb_range = row["bb_upper"] - row["bb_lower"] + 1e-10
features.append((close - row["bb_lower"]) / bb_range)
# Volatility — clip to reasonable range
features.append(min(row["volatility"] * 100, 1.0))
# ATR relative to close (normalized)
features.append(row["atr"] / (close + 1e-10))
return np.array(features, dtype=np.float32)
@property
def feature_size(self) -> int:
return 14 # Number of features in observation_vector
@dataclass
class PortfolioState:
"""Tracks portfolio holdings and cash."""
initial_cash: float = 100_000.0
cash: float = 100_000.0
positions: Dict[str, float] = field(default_factory=dict) # ticker -> quantity
avg_costs: Dict[str, float] = field(default_factory=dict) # ticker -> average entry price
trade_durations: Dict[str, int] = field(default_factory=dict) # ticker -> steps held
trade_history: List[Dict[str, Any]] = field(default_factory=list)
# Professional risk management: Stop Loss and Take Profit
# Format: {ticker: price}
stop_losses: Dict[str, "Optional[float]"] = field(default_factory=dict)
take_profits: Dict[str, "Optional[float]"] = field(default_factory=dict)
def reset(self):
self.cash = self.initial_cash
self.positions = {}
self.avg_costs = {}
self.trade_history = []
self.stop_losses = {}
self.take_profits = {}
def total_value(self, current_price: float, ticker: str = "default") -> float:
"""Total portfolio value = cash + position mark-to-market.
For longs: value = cash + qty * price
For shorts: value = cash + qty * (avg_cost - price) + qty * avg_cost
which simplifies to cash + qty * (2 * avg_cost - price)
But since qty is negative for shorts, we use the unified formula:
value = cash + qty * price (for longs)
value = cash + margin_held + unrealized_pnl (for shorts)
"""
position_qty = self.positions.get(ticker, 0.0)
if position_qty >= 0:
# Long position
return self.cash + position_qty * current_price
else:
# Short position: cash already reduced by margin (|qty| * avg_cost)
# Unrealized P&L = |qty| * (avg_cost - current_price)
avg_cost = self.avg_costs.get(ticker, current_price)
unrealized = abs(position_qty) * (avg_cost - current_price)
return self.cash + unrealized
def unrealized_pnl(self, current_price: float, ticker: str = "default") -> float:
"""
Unrealized profit/loss from open positions using tracked average cost.
Supports both long (positive qty) and short (negative qty) positions.
"""
position_qty = self.positions.get(ticker, 0.0)
if abs(position_qty) < 1e-10:
return 0.0
avg_entry = self.avg_costs.get(ticker, 0.0)
if position_qty > 0:
# Long: profit when price goes up
return position_qty * (current_price - avg_entry)
else:
# Short: profit when price goes down
return abs(position_qty) * (avg_entry - current_price)
def observation_vector(self, current_price: float, ticker: str = "default") -> np.ndarray:
"""Return normalized portfolio features."""
total_val = self.total_value(current_price, ticker)
position_qty = self.positions.get(ticker, 0.0)
long_value = max(position_qty, 0.0) * current_price
short_value = abs(min(position_qty, 0.0)) * current_price
features = [
self.cash / (self.initial_cash + 1e-10), # cash ratio
long_value / (total_val + 1e-10), # long exposure ratio
total_val / (self.initial_cash + 1e-10), # portfolio return ratio
np.tanh(self.unrealized_pnl(current_price, ticker) / (self.initial_cash + 1e-10) * 10), # normalized PnL
short_value / (self.initial_cash + 1e-10), # short exposure ratio
]
return np.array(features, dtype=np.float32)
@property
def feature_size(self) -> int:
return 5
@dataclass
class RiskState:
"""Tracks risk metrics: drawdown, exposure."""
peak_value: float = 100_000.0
current_drawdown: float = 0.0
max_drawdown: float = 0.0
return_history: List[float] = field(default_factory=list)
trade_count: int = 0
def reset(self, initial_value: float = 100_000.0):
self.peak_value = initial_value
self.current_drawdown = 0.0
self.max_drawdown = 0.0
self.return_history = []
self.trade_count = 0
def update(self, portfolio_value: float):
"""Update risk metrics with latest portfolio value."""
# Track returns
if self.return_history:
prev = self.return_history[-1]
ret = (portfolio_value - prev) / (prev + 1e-10)
else:
ret = 0.0
self.return_history.append(portfolio_value)
# Update peak and drawdown
if portfolio_value > self.peak_value:
self.peak_value = portfolio_value
self.current_drawdown = (self.peak_value - portfolio_value) / (self.peak_value + 1e-10)
self.max_drawdown = max(self.max_drawdown, self.current_drawdown)
def sharpe_ratio(self, risk_free_rate: float = 0.0) -> float:
"""Compute Sharpe ratio from return history."""
if len(self.return_history) < 2:
return 0.0
values = np.array(self.return_history)
returns = np.diff(values) / (values[:-1] + 1e-10)
if len(returns) == 0 or np.std(returns) < 1e-10:
return 0.0
return float((np.mean(returns) - risk_free_rate) / (np.std(returns) + 1e-10))
def return_volatility(self) -> float:
"""Compute rolling return volatility."""
if len(self.return_history) < 2:
return 0.0
values = np.array(self.return_history)
returns = np.diff(values) / (values[:-1] + 1e-10)
return float(np.std(returns))
def observation_vector(self) -> np.ndarray:
"""Return normalized risk features."""
features = [
min(self.current_drawdown, 1.0), # current drawdown [0, 1]
min(self.max_drawdown, 1.0), # max drawdown [0, 1]
np.tanh(self.sharpe_ratio()), # sharpe ratio [-1, 1] -> tanh
min(self.return_volatility() * 100, 1.0), # volatility
min(self.trade_count / 100.0, 1.0), # normalized trade count
]
return np.array(features, dtype=np.float32)
@property
def feature_size(self) -> int:
return 5
def get_observation(market: MarketState, portfolio: PortfolioState,
risk: RiskState, ticker: str = "default") -> np.ndarray:
"""Concatenate all state observations into a single flat vector."""
current_price = market.current_price()
obs = np.concatenate([
market.observation_vector(),
portfolio.observation_vector(current_price, ticker),
risk.observation_vector(),
])
return obs
def get_observation_size(market: MarketState, portfolio: PortfolioState,
risk: RiskState) -> int:
"""Total observation vector size."""
return market.feature_size + portfolio.feature_size + risk.feature_size
|