Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| # Wrappers for Tigramite (PCMCI) | |
| # Note: Tigramite must be installed in environment | |
| try: | |
| from tigramite import data_processing as pp | |
| from tigramite.pcmci import PCMCI | |
| from tigramite.independence_tests.parcorr import ParCorr | |
| TIGRAMITE_AVAILABLE = True | |
| except ImportError: | |
| TIGRAMITE_AVAILABLE = False | |
| # Only print if imported directly, to avoid log spam | |
| if __name__ == "__main__": | |
| print("Warning: Tigramite not found. Using Placeholder.") | |
| class CausalDiscovery: | |
| """ | |
| Causal Discovery using Tigramite (PCMCI). | |
| Identifies causal links in time-series data using Partial Correlation (ParCorr). | |
| Focuses on finding parents of key variables (e.g., Returns). | |
| """ | |
| def __init__(self, alpha=0.05, max_lag=5): | |
| self.alpha = alpha | |
| self.max_lag = max_lag | |
| self.results = None | |
| self.graph = None | |
| self.var_names = None | |
| def fit(self, df: pd.DataFrame): | |
| """ | |
| Fit PCMCI on the dataframe. | |
| df: Pandas DataFrame (Time Series). | |
| """ | |
| if not TIGRAMITE_AVAILABLE: | |
| return self | |
| # 1. Prepare Data | |
| # Tigramite requires (T, N) numpy array | |
| data = df.values | |
| self.var_names = df.columns.tolist() | |
| dataframe = pp.DataFrame(data, | |
| var_names=self.var_names, | |
| missing_flag=999) | |
| # 2. Init PCMCI with ParCorr (Linear Partial Correlation) | |
| # For non-linear, use GPDC or CMIknn (slower) | |
| parcorr = ParCorr(significance='analytic') | |
| pcmci = PCMCI(dataframe=dataframe, cond_ind_test=parcorr, verbosity=0) | |
| # 3. Run PCMCI | |
| # PC phase then MCI phase | |
| self.results = pcmci.run_pcmci(tau_max=self.max_lag, pc_alpha=self.alpha) | |
| # 4. Extract Graph (p_matrix < alpha) | |
| # q_matrix handles FDR control, often better | |
| # Fallback to p_matrix if q_matrix is not available (depends on tigramite version/settings) | |
| pval_matrix = self.results.get('q_matrix') | |
| if pval_matrix is None: | |
| pval_matrix = self.results['p_matrix'] | |
| self.graph = pval_matrix < self.alpha | |
| return self | |
| def get_feature_weights(self): | |
| """ | |
| Calculate feature importance based on Causal Strength (Val Matrix) | |
| or Degree in the Causal Graph. | |
| Returns: normalized weights for each feature. | |
| """ | |
| if not TIGRAMITE_AVAILABLE or self.results is None: | |
| return np.ones(5) # Fallback | |
| # We want to know which features cause 'Volatility' or 'Returns' (if present) | |
| # Or simply generalized centrality. | |
| val_matrix = np.abs(self.results['val_matrix']) # (N, N, Lags+1) | |
| # Sum absolute causal strength across all lags for each link | |
| # Shape: (N_features, N_features) - Strength of i -> j | |
| strength_matrix = np.sum(val_matrix, axis=2) | |
| # Total Outgoing Causal Strength (How much 'i' influences others) | |
| out_strength = np.sum(strength_matrix, axis=1) # Sum over j | |
| # Total Incoming Causal Strength (How much 'i' is influenced) | |
| in_strength = np.sum(strength_matrix, axis=0) | |
| # Hybrid Score: Drivers are important | |
| score = out_strength + in_strength | |
| # Normalize | |
| if score.sum() == 0: return np.ones(len(score)) | |
| weights = score / score.max() | |
| return np.maximum(weights, 0.2) # Min weight | |
| def get_causal_model(): | |
| return CausalDiscovery(alpha=0.05, max_lag=3) | |