import numpy as np import logging from typing import Tuple logger = logging.getLogger(__name__) class KalmanPriceSmoother: def __init__(self, process_variance: float = 1e-5, measurement_variance: float = 1e-3): """ 1D Kalman Filter for smoothing prediction market prices. Args: process_variance (Q): How fast we think the true probability changes. measurement_variance (R): How much noise/bid-ask bounce there is in trade prices. """ self.Q = process_variance self.R = measurement_variance # State: [price_estimate] self.x = None # Uncertainty covariance self.P = 1.0 def initialize(self, initial_price: float): """Initialize filter with the first observed price.""" self.x = initial_price self.P = 1.0 # High initial uncertainty logger.info(f"Kalman Filter initialized at {self.x:.4f}") def update(self, measurement: float) -> Tuple[float, float]: """ Process a new price observation. Returns: (smoothed_price, confidence_interval_width) """ if self.x is None: self.initialize(measurement) return self.x, np.sqrt(self.P) # 1. Prediction Step # Assume price stays the same (random walk model) x_pred = self.x P_pred = self.P + self.Q # 2. Update Step # Calculate Kalman Gain K = P_pred / (P_pred + self.R) # Update estimate with measurement self.x = x_pred + K * (measurement - x_pred) # Update covariance self.P = (1 - K) * P_pred return self.x, np.sqrt(self.P) def batch_smooth(self, prices: np.ndarray) -> np.ndarray: """Smooth an array of historical prices.""" smoothed = [] for p in prices: s, _ = self.update(p) smoothed.append(s) return np.array(smoothed)