JohnGenetica's picture
Deploy ANE KAN runtime Space
201cf4d verified
"""Position manager: tracks open positions, exposure, and P&L.
Maintains real-time state of all positions across venues with
per-market and per-venue exposure limits.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from prediction_engine.edge_modules.base import MarketVenue, TradeSide
@dataclass
class Position:
"""A single open position."""
position_id: str
market_id: str
venue: MarketVenue
side: TradeSide
size: float # dollars
entry_price: float
entry_time: float = field(default_factory=time.time)
current_price: Optional[float] = None
source_edge: str = "" # which edge module created this
@property
def unrealized_pnl(self) -> float:
if self.current_price is None:
return 0.0
if self.side == TradeSide.YES:
return (self.current_price - self.entry_price) * self.size
return (self.entry_price - self.current_price) * self.size
@property
def is_profitable(self) -> bool:
return self.unrealized_pnl > 0
@dataclass
class ClosedPosition:
"""A resolved position with final P&L."""
position: Position
exit_price: float
exit_time: float
realized_pnl: float
outcome: Optional[str] = None # "YES" or "NO" for binary
class PositionManager:
"""Track and manage all open and closed positions.
Enforces per-market and per-venue exposure limits.
Tracks real-time NAV (Net Asset Value).
"""
def __init__(
self,
max_per_market: float = 1000.0,
max_per_venue: float = 5000.0,
max_total: float = 10000.0,
max_correlated: int = 5,
):
self.max_per_market = max_per_market
self.max_per_venue = max_per_venue
self.max_total = max_total
self.max_correlated = max_correlated
self._positions: Dict[str, Position] = {}
self._closed: List[ClosedPosition] = []
self._position_counter = 0
def open_position(
self,
market_id: str,
venue: MarketVenue,
side: TradeSide,
size: float,
price: float,
source_edge: str = "",
) -> Optional[Position]:
"""Open a new position if within limits."""
# Check exposure limits
if not self._check_limits(market_id, venue, size):
return None
self._position_counter += 1
pid = f"pos_{self._position_counter}"
pos = Position(
position_id=pid,
market_id=market_id,
venue=venue,
side=side,
size=size,
entry_price=price,
source_edge=source_edge,
)
self._positions[pid] = pos
return pos
def close_position(
self, position_id: str, exit_price: float, outcome: Optional[str] = None
) -> Optional[ClosedPosition]:
"""Close a position and record P&L."""
pos = self._positions.pop(position_id, None)
if pos is None:
return None
pos.current_price = exit_price
pnl = pos.unrealized_pnl
# For binary outcomes: full win or loss
if outcome is not None:
if (outcome == "YES" and pos.side == TradeSide.YES) or \
(outcome == "NO" and pos.side == TradeSide.NO):
pnl = (1.0 - pos.entry_price) * pos.size
else:
pnl = -pos.entry_price * pos.size
closed = ClosedPosition(
position=pos,
exit_price=exit_price,
exit_time=time.time(),
realized_pnl=pnl,
outcome=outcome,
)
self._closed.append(closed)
return closed
def update_prices(self, prices: Dict[str, float]) -> None:
"""Update current prices for all open positions."""
for pos in self._positions.values():
key = f"{pos.market_id}_{pos.venue.value}"
if key in prices:
pos.current_price = prices[key]
elif pos.market_id in prices:
pos.current_price = prices[pos.market_id]
@property
def open_positions(self) -> List[Position]:
return list(self._positions.values())
@property
def total_exposure(self) -> float:
return sum(p.size for p in self._positions.values())
@property
def total_unrealized_pnl(self) -> float:
return sum(p.unrealized_pnl for p in self._positions.values())
@property
def total_realized_pnl(self) -> float:
return sum(c.realized_pnl for c in self._closed)
@property
def nav(self) -> float:
"""Net Asset Value = realized + unrealized P&L."""
return self.total_realized_pnl + self.total_unrealized_pnl
def exposure_by_market(self, market_id: str) -> float:
return sum(
p.size for p in self._positions.values() if p.market_id == market_id
)
def exposure_by_venue(self, venue: MarketVenue) -> float:
return sum(
p.size for p in self._positions.values() if p.venue == venue
)
def get_positions(self, market_id: str) -> List[Position]:
"""Get all open positions for a market."""
return [p for p in self._positions.values() if p.market_id == market_id]
def positions_by_edge(self, edge_type: str) -> List[Position]:
return [p for p in self._positions.values() if p.source_edge == edge_type]
def _check_limits(
self, market_id: str, venue: MarketVenue, size: float
) -> bool:
"""Check if new position is within all exposure limits."""
if self.total_exposure + size > self.max_total:
return False
if self.exposure_by_market(market_id) + size > self.max_per_market:
return False
if self.exposure_by_venue(venue) + size > self.max_per_venue:
return False
# Correlated positions check
n_markets = len(set(p.market_id for p in self._positions.values()))
if market_id not in {p.market_id for p in self._positions.values()}:
if n_markets >= self.max_correlated:
return False
return True
def summary(self) -> Dict[str, float]:
"""Summary of current portfolio state."""
return {
"n_open": len(self._positions),
"n_closed": len(self._closed),
"total_exposure": self.total_exposure,
"unrealized_pnl": self.total_unrealized_pnl,
"realized_pnl": self.total_realized_pnl,
"nav": self.nav,
"win_rate": self._win_rate(),
}
def _win_rate(self) -> float:
if not self._closed:
return 0.0
wins = sum(1 for c in self._closed if c.realized_pnl > 0)
return wins / len(self._closed)