| import numpy as np |
| import logging |
| from typing import Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class KalmanPriceSmoother: |
| def __init__(self, process_variance: float = 1e-5, measurement_variance: float = 1e-3): |
| """ |
| 1D Kalman Filter for smoothing prediction market prices. |
| |
| Args: |
| process_variance (Q): How fast we think the true probability changes. |
| measurement_variance (R): How much noise/bid-ask bounce there is in trade prices. |
| """ |
| self.Q = process_variance |
| self.R = measurement_variance |
| |
| |
| self.x = None |
| |
| self.P = 1.0 |
| |
| def initialize(self, initial_price: float): |
| """Initialize filter with the first observed price.""" |
| self.x = initial_price |
| self.P = 1.0 |
| logger.info(f"Kalman Filter initialized at {self.x:.4f}") |
|
|
| def update(self, measurement: float) -> Tuple[float, float]: |
| """ |
| Process a new price observation. |
| Returns: (smoothed_price, confidence_interval_width) |
| """ |
| if self.x is None: |
| self.initialize(measurement) |
| return self.x, np.sqrt(self.P) |
|
|
| |
| |
| x_pred = self.x |
| P_pred = self.P + self.Q |
|
|
| |
| |
| K = P_pred / (P_pred + self.R) |
| |
| |
| self.x = x_pred + K * (measurement - x_pred) |
| |
| |
| self.P = (1 - K) * P_pred |
| |
| return self.x, np.sqrt(self.P) |
| |
| def batch_smooth(self, prices: np.ndarray) -> np.ndarray: |
| """Smooth an array of historical prices.""" |
| smoothed = [] |
| for p in prices: |
| s, _ = self.update(p) |
| smoothed.append(s) |
| return np.array(smoothed) |
|
|