arbintel / src /models /kalman.py
AJAY KASU
Add root app.py for Streamlit GUI and dependencies
77fd2f6
import numpy as np
import logging
from typing import Tuple
logger = logging.getLogger(__name__)
class KalmanPriceSmoother:
def __init__(self, process_variance: float = 1e-5, measurement_variance: float = 1e-3):
"""
1D Kalman Filter for smoothing prediction market prices.
Args:
process_variance (Q): How fast we think the true probability changes.
measurement_variance (R): How much noise/bid-ask bounce there is in trade prices.
"""
self.Q = process_variance
self.R = measurement_variance
# State: [price_estimate]
self.x = None
# Uncertainty covariance
self.P = 1.0
def initialize(self, initial_price: float):
"""Initialize filter with the first observed price."""
self.x = initial_price
self.P = 1.0 # High initial uncertainty
logger.info(f"Kalman Filter initialized at {self.x:.4f}")
def update(self, measurement: float) -> Tuple[float, float]:
"""
Process a new price observation.
Returns: (smoothed_price, confidence_interval_width)
"""
if self.x is None:
self.initialize(measurement)
return self.x, np.sqrt(self.P)
# 1. Prediction Step
# Assume price stays the same (random walk model)
x_pred = self.x
P_pred = self.P + self.Q
# 2. Update Step
# Calculate Kalman Gain
K = P_pred / (P_pred + self.R)
# Update estimate with measurement
self.x = x_pred + K * (measurement - x_pred)
# Update covariance
self.P = (1 - K) * P_pred
return self.x, np.sqrt(self.P)
def batch_smooth(self, prices: np.ndarray) -> np.ndarray:
"""Smooth an array of historical prices."""
smoothed = []
for p in prices:
s, _ = self.update(p)
smoothed.append(s)
return np.array(smoothed)