import numpy as np import logging from hmmlearn import hmm from typing import Tuple logger = logging.getLogger(__name__) class RegimeDetector: def __init__(self, n_regimes: int = 2): """ Hidden Markov Model for detecting market regimes (e.g., Low Volatility vs High Volatility) """ self.n_regimes = n_regimes # Gaussian HMM because we'll be looking at continuous variables like returns and spread self.model = hmm.GaussianHMM(n_components=n_regimes, covariance_type="diag", n_iter=100) self.is_fitted = False def _prepare_features(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray) -> np.ndarray: """ Convert raw tick/candle data into stationary features for the HMM. """ # Calculate log returns # Add small epsilon to avoid log(0) epsilon = 1e-8 returns = np.diff(np.log(np.maximum(prices, epsilon))) returns = np.insert(returns, 0, 0) # Pad first element # Normalize volumes and spreads (simple rolling standardization could also be used) vol_norm = (volumes - np.mean(volumes)) / (np.std(volumes) + epsilon) spread_norm = (spreads - np.mean(spreads)) / (np.std(spreads) + epsilon) # Feature matrix shape: (n_samples, n_features) X = np.column_stack([returns, vol_norm, spread_norm]) return X def fit(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray): """Train the HMM on historical data.""" X = self._prepare_features(prices, volumes, spreads) try: self.model.fit(X) self.is_fitted = True logger.info("HMM Regime Detector successfully fitted.") except Exception as e: logger.error(f"Failed to fit HMM: {e}") def predict_regime(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray) -> np.ndarray: """Predict the regime sequence for the given data.""" if not self.is_fitted: logger.warning("HMM not fitted. Call fit() first.") return np.zeros(len(prices), dtype=int) X = self._prepare_features(prices, volumes, spreads) regimes = self.model.predict(X) return regimes def get_current_regime_prob(self, recent_prices: np.ndarray, recent_volumes: np.ndarray, recent_spreads: np.ndarray) -> Tuple[int, np.ndarray]: """ Get the probability distribution over regimes for the most recent observation. Returns: (most_likely_regime, array_of_probabilities) """ if not self.is_fitted: return 0, np.array([1.0, 0.0]) # Default fallback X = self._prepare_features(recent_prices, recent_volumes, recent_spreads) # predict_proba returns probs for all time steps, we want the last one probs = self.model.predict_proba(X)[-1] regime = np.argmax(probs) return regime, probs