Spaces:
Build error
Build error
| """Position manager: tracks open positions, exposure, and P&L. | |
| Maintains real-time state of all positions across venues with | |
| per-market and per-venue exposure limits. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional | |
| from prediction_engine.edge_modules.base import MarketVenue, TradeSide | |
| class Position: | |
| """A single open position.""" | |
| position_id: str | |
| market_id: str | |
| venue: MarketVenue | |
| side: TradeSide | |
| size: float # dollars | |
| entry_price: float | |
| entry_time: float = field(default_factory=time.time) | |
| current_price: Optional[float] = None | |
| source_edge: str = "" # which edge module created this | |
| def unrealized_pnl(self) -> float: | |
| if self.current_price is None: | |
| return 0.0 | |
| if self.side == TradeSide.YES: | |
| return (self.current_price - self.entry_price) * self.size | |
| return (self.entry_price - self.current_price) * self.size | |
| def is_profitable(self) -> bool: | |
| return self.unrealized_pnl > 0 | |
| class ClosedPosition: | |
| """A resolved position with final P&L.""" | |
| position: Position | |
| exit_price: float | |
| exit_time: float | |
| realized_pnl: float | |
| outcome: Optional[str] = None # "YES" or "NO" for binary | |
| class PositionManager: | |
| """Track and manage all open and closed positions. | |
| Enforces per-market and per-venue exposure limits. | |
| Tracks real-time NAV (Net Asset Value). | |
| """ | |
| def __init__( | |
| self, | |
| max_per_market: float = 1000.0, | |
| max_per_venue: float = 5000.0, | |
| max_total: float = 10000.0, | |
| max_correlated: int = 5, | |
| ): | |
| self.max_per_market = max_per_market | |
| self.max_per_venue = max_per_venue | |
| self.max_total = max_total | |
| self.max_correlated = max_correlated | |
| self._positions: Dict[str, Position] = {} | |
| self._closed: List[ClosedPosition] = [] | |
| self._position_counter = 0 | |
| def open_position( | |
| self, | |
| market_id: str, | |
| venue: MarketVenue, | |
| side: TradeSide, | |
| size: float, | |
| price: float, | |
| source_edge: str = "", | |
| ) -> Optional[Position]: | |
| """Open a new position if within limits.""" | |
| # Check exposure limits | |
| if not self._check_limits(market_id, venue, size): | |
| return None | |
| self._position_counter += 1 | |
| pid = f"pos_{self._position_counter}" | |
| pos = Position( | |
| position_id=pid, | |
| market_id=market_id, | |
| venue=venue, | |
| side=side, | |
| size=size, | |
| entry_price=price, | |
| source_edge=source_edge, | |
| ) | |
| self._positions[pid] = pos | |
| return pos | |
| def close_position( | |
| self, position_id: str, exit_price: float, outcome: Optional[str] = None | |
| ) -> Optional[ClosedPosition]: | |
| """Close a position and record P&L.""" | |
| pos = self._positions.pop(position_id, None) | |
| if pos is None: | |
| return None | |
| pos.current_price = exit_price | |
| pnl = pos.unrealized_pnl | |
| # For binary outcomes: full win or loss | |
| if outcome is not None: | |
| if (outcome == "YES" and pos.side == TradeSide.YES) or \ | |
| (outcome == "NO" and pos.side == TradeSide.NO): | |
| pnl = (1.0 - pos.entry_price) * pos.size | |
| else: | |
| pnl = -pos.entry_price * pos.size | |
| closed = ClosedPosition( | |
| position=pos, | |
| exit_price=exit_price, | |
| exit_time=time.time(), | |
| realized_pnl=pnl, | |
| outcome=outcome, | |
| ) | |
| self._closed.append(closed) | |
| return closed | |
| def update_prices(self, prices: Dict[str, float]) -> None: | |
| """Update current prices for all open positions.""" | |
| for pos in self._positions.values(): | |
| key = f"{pos.market_id}_{pos.venue.value}" | |
| if key in prices: | |
| pos.current_price = prices[key] | |
| elif pos.market_id in prices: | |
| pos.current_price = prices[pos.market_id] | |
| def open_positions(self) -> List[Position]: | |
| return list(self._positions.values()) | |
| def total_exposure(self) -> float: | |
| return sum(p.size for p in self._positions.values()) | |
| def total_unrealized_pnl(self) -> float: | |
| return sum(p.unrealized_pnl for p in self._positions.values()) | |
| def total_realized_pnl(self) -> float: | |
| return sum(c.realized_pnl for c in self._closed) | |
| def nav(self) -> float: | |
| """Net Asset Value = realized + unrealized P&L.""" | |
| return self.total_realized_pnl + self.total_unrealized_pnl | |
| def exposure_by_market(self, market_id: str) -> float: | |
| return sum( | |
| p.size for p in self._positions.values() if p.market_id == market_id | |
| ) | |
| def exposure_by_venue(self, venue: MarketVenue) -> float: | |
| return sum( | |
| p.size for p in self._positions.values() if p.venue == venue | |
| ) | |
| def get_positions(self, market_id: str) -> List[Position]: | |
| """Get all open positions for a market.""" | |
| return [p for p in self._positions.values() if p.market_id == market_id] | |
| def positions_by_edge(self, edge_type: str) -> List[Position]: | |
| return [p for p in self._positions.values() if p.source_edge == edge_type] | |
| def _check_limits( | |
| self, market_id: str, venue: MarketVenue, size: float | |
| ) -> bool: | |
| """Check if new position is within all exposure limits.""" | |
| if self.total_exposure + size > self.max_total: | |
| return False | |
| if self.exposure_by_market(market_id) + size > self.max_per_market: | |
| return False | |
| if self.exposure_by_venue(venue) + size > self.max_per_venue: | |
| return False | |
| # Correlated positions check | |
| n_markets = len(set(p.market_id for p in self._positions.values())) | |
| if market_id not in {p.market_id for p in self._positions.values()}: | |
| if n_markets >= self.max_correlated: | |
| return False | |
| return True | |
| def summary(self) -> Dict[str, float]: | |
| """Summary of current portfolio state.""" | |
| return { | |
| "n_open": len(self._positions), | |
| "n_closed": len(self._closed), | |
| "total_exposure": self.total_exposure, | |
| "unrealized_pnl": self.total_unrealized_pnl, | |
| "realized_pnl": self.total_realized_pnl, | |
| "nav": self.nav, | |
| "win_rate": self._win_rate(), | |
| } | |
| def _win_rate(self) -> float: | |
| if not self._closed: | |
| return 0.0 | |
| wins = sum(1 for c in self._closed if c.realized_pnl > 0) | |
| return wins / len(self._closed) | |