"""Position manager: tracks open positions, exposure, and P&L. Maintains real-time state of all positions across venues with per-market and per-venue exposure limits. """ from __future__ import annotations import time from dataclasses import dataclass, field from typing import Dict, List, Optional from prediction_engine.edge_modules.base import MarketVenue, TradeSide @dataclass class Position: """A single open position.""" position_id: str market_id: str venue: MarketVenue side: TradeSide size: float # dollars entry_price: float entry_time: float = field(default_factory=time.time) current_price: Optional[float] = None source_edge: str = "" # which edge module created this @property def unrealized_pnl(self) -> float: if self.current_price is None: return 0.0 if self.side == TradeSide.YES: return (self.current_price - self.entry_price) * self.size return (self.entry_price - self.current_price) * self.size @property def is_profitable(self) -> bool: return self.unrealized_pnl > 0 @dataclass class ClosedPosition: """A resolved position with final P&L.""" position: Position exit_price: float exit_time: float realized_pnl: float outcome: Optional[str] = None # "YES" or "NO" for binary class PositionManager: """Track and manage all open and closed positions. Enforces per-market and per-venue exposure limits. Tracks real-time NAV (Net Asset Value). """ def __init__( self, max_per_market: float = 1000.0, max_per_venue: float = 5000.0, max_total: float = 10000.0, max_correlated: int = 5, ): self.max_per_market = max_per_market self.max_per_venue = max_per_venue self.max_total = max_total self.max_correlated = max_correlated self._positions: Dict[str, Position] = {} self._closed: List[ClosedPosition] = [] self._position_counter = 0 def open_position( self, market_id: str, venue: MarketVenue, side: TradeSide, size: float, price: float, source_edge: str = "", ) -> Optional[Position]: """Open a new position if within limits.""" # Check exposure limits if not self._check_limits(market_id, venue, size): return None self._position_counter += 1 pid = f"pos_{self._position_counter}" pos = Position( position_id=pid, market_id=market_id, venue=venue, side=side, size=size, entry_price=price, source_edge=source_edge, ) self._positions[pid] = pos return pos def close_position( self, position_id: str, exit_price: float, outcome: Optional[str] = None ) -> Optional[ClosedPosition]: """Close a position and record P&L.""" pos = self._positions.pop(position_id, None) if pos is None: return None pos.current_price = exit_price pnl = pos.unrealized_pnl # For binary outcomes: full win or loss if outcome is not None: if (outcome == "YES" and pos.side == TradeSide.YES) or \ (outcome == "NO" and pos.side == TradeSide.NO): pnl = (1.0 - pos.entry_price) * pos.size else: pnl = -pos.entry_price * pos.size closed = ClosedPosition( position=pos, exit_price=exit_price, exit_time=time.time(), realized_pnl=pnl, outcome=outcome, ) self._closed.append(closed) return closed def update_prices(self, prices: Dict[str, float]) -> None: """Update current prices for all open positions.""" for pos in self._positions.values(): key = f"{pos.market_id}_{pos.venue.value}" if key in prices: pos.current_price = prices[key] elif pos.market_id in prices: pos.current_price = prices[pos.market_id] @property def open_positions(self) -> List[Position]: return list(self._positions.values()) @property def total_exposure(self) -> float: return sum(p.size for p in self._positions.values()) @property def total_unrealized_pnl(self) -> float: return sum(p.unrealized_pnl for p in self._positions.values()) @property def total_realized_pnl(self) -> float: return sum(c.realized_pnl for c in self._closed) @property def nav(self) -> float: """Net Asset Value = realized + unrealized P&L.""" return self.total_realized_pnl + self.total_unrealized_pnl def exposure_by_market(self, market_id: str) -> float: return sum( p.size for p in self._positions.values() if p.market_id == market_id ) def exposure_by_venue(self, venue: MarketVenue) -> float: return sum( p.size for p in self._positions.values() if p.venue == venue ) def get_positions(self, market_id: str) -> List[Position]: """Get all open positions for a market.""" return [p for p in self._positions.values() if p.market_id == market_id] def positions_by_edge(self, edge_type: str) -> List[Position]: return [p for p in self._positions.values() if p.source_edge == edge_type] def _check_limits( self, market_id: str, venue: MarketVenue, size: float ) -> bool: """Check if new position is within all exposure limits.""" if self.total_exposure + size > self.max_total: return False if self.exposure_by_market(market_id) + size > self.max_per_market: return False if self.exposure_by_venue(venue) + size > self.max_per_venue: return False # Correlated positions check n_markets = len(set(p.market_id for p in self._positions.values())) if market_id not in {p.market_id for p in self._positions.values()}: if n_markets >= self.max_correlated: return False return True def summary(self) -> Dict[str, float]: """Summary of current portfolio state.""" return { "n_open": len(self._positions), "n_closed": len(self._closed), "total_exposure": self.total_exposure, "unrealized_pnl": self.total_unrealized_pnl, "realized_pnl": self.total_realized_pnl, "nav": self.nav, "win_rate": self._win_rate(), } def _win_rate(self) -> float: if not self._closed: return 0.0 wins = sum(1 for c in self._closed if c.realized_pnl > 0) return wins / len(self._closed)