| import numpy as np |
| import logging |
| from hmmlearn import hmm |
| from typing import Tuple |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class RegimeDetector: |
| def __init__(self, n_regimes: int = 2): |
| """ |
| Hidden Markov Model for detecting market regimes (e.g., Low Volatility vs High Volatility) |
| """ |
| self.n_regimes = n_regimes |
| |
| self.model = hmm.GaussianHMM(n_components=n_regimes, covariance_type="diag", n_iter=100) |
| self.is_fitted = False |
|
|
| def _prepare_features(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray) -> np.ndarray: |
| """ |
| Convert raw tick/candle data into stationary features for the HMM. |
| """ |
| |
| |
| epsilon = 1e-8 |
| returns = np.diff(np.log(np.maximum(prices, epsilon))) |
| returns = np.insert(returns, 0, 0) |
| |
| |
| vol_norm = (volumes - np.mean(volumes)) / (np.std(volumes) + epsilon) |
| spread_norm = (spreads - np.mean(spreads)) / (np.std(spreads) + epsilon) |
| |
| |
| X = np.column_stack([returns, vol_norm, spread_norm]) |
| return X |
|
|
| def fit(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray): |
| """Train the HMM on historical data.""" |
| X = self._prepare_features(prices, volumes, spreads) |
| try: |
| self.model.fit(X) |
| self.is_fitted = True |
| logger.info("HMM Regime Detector successfully fitted.") |
| except Exception as e: |
| logger.error(f"Failed to fit HMM: {e}") |
|
|
| def predict_regime(self, prices: np.ndarray, volumes: np.ndarray, spreads: np.ndarray) -> np.ndarray: |
| """Predict the regime sequence for the given data.""" |
| if not self.is_fitted: |
| logger.warning("HMM not fitted. Call fit() first.") |
| return np.zeros(len(prices), dtype=int) |
| |
| X = self._prepare_features(prices, volumes, spreads) |
| regimes = self.model.predict(X) |
| return regimes |
| |
| def get_current_regime_prob(self, recent_prices: np.ndarray, recent_volumes: np.ndarray, recent_spreads: np.ndarray) -> Tuple[int, np.ndarray]: |
| """ |
| Get the probability distribution over regimes for the most recent observation. |
| Returns: (most_likely_regime, array_of_probabilities) |
| """ |
| if not self.is_fitted: |
| return 0, np.array([1.0, 0.0]) |
| |
| X = self._prepare_features(recent_prices, recent_volumes, recent_spreads) |
| |
| probs = self.model.predict_proba(X)[-1] |
| regime = np.argmax(probs) |
| return regime, probs |
|
|