Spaces:
Sleeping
Sleeping
| """ | |
| Dynamic Structural Equation Modeling (DSEM) for CogNexa | |
| Implements time-series structural equation modeling to capture dynamic relationships | |
| between personality traits, cognitive states, task behavior, and outcomes. | |
| This module provides: | |
| - Lagged effect modeling (t, t-1, t-2) | |
| - State-space representation of behavioral dynamics | |
| - Personality-moderated effects | |
| - Time-varying parameters | |
| - Hypothesis testing (H1-H4) | |
| Based on: | |
| Hamaker et al. (2015) - A Critique of the Cross-Lagged Panel Model | |
| Asparouhov et al. (2018) - Dynamic Structural Equation Models for Multivariate Time-Series Data | |
| Version: 1.0.0 | |
| """ | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import numpy as np | |
| import pandas as pd | |
| from scipy import stats | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.linear_model import LinearRegression | |
| from sklearn.metrics import r2_score, mean_squared_error | |
| try: | |
| import statsmodels.api as sm | |
| from statsmodels.tsa.api import VAR | |
| from statsmodels.genmod.generalized_estimating_equations import GEE | |
| from statsmodels.genmod.cov_struct import Exchangeable | |
| from statsmodels.genmod.families import Gaussian | |
| STATSMODELS_AVAILABLE = True | |
| except ImportError: | |
| STATSMODELS_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| class PersonalityTrait(Enum): | |
| """Big Five personality traits""" | |
| OPENNESS = "openness" | |
| CONSCIENTIOUSNESS = "conscientiousness" | |
| EXTRAVERSION = "extraversion" | |
| AGREEABLENESS = "agreeableness" | |
| NEUROTICISM = "neuroticism" | |
| class CognitiveState(Enum): | |
| """Latent cognitive states""" | |
| TASK_FOCUS = "task_focus" | |
| COGNITIVE_LOAD = "cognitive_load" | |
| MOTIVATION = "motivation" | |
| FATIGUE = "fatigue" | |
| FLOW_STATE = "flow_state" | |
| class DSEMParameter: | |
| """Estimated DSEM parameter with uncertainty""" | |
| name: str | |
| estimate: float | |
| std_error: float | |
| t_statistic: float | |
| p_value: float | |
| ci_lower: float | |
| ci_upper: float | |
| def is_significant(self) -> bool: | |
| """At p < 0.05""" | |
| return self.p_value < 0.05 | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class DSEMHypothesis: | |
| """Testable research hypothesis""" | |
| hypothesis_name: str # H1, H2, H3, H4 | |
| description: str | |
| prediction_parameters: List[str] # Parameter names being tested | |
| expected_direction: Dict[str, str] # param_name -> "positive" or "negative" | |
| test_statistic: float | |
| p_value: float | |
| effect_size: float | |
| passed: bool | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class DSEMModel: | |
| """Fitted DSEM model with results""" | |
| model_name: str | |
| n_observations: int | |
| n_lags: int | |
| n_traits: int | |
| n_cognitive_states: int | |
| parameters: Dict[str, DSEMParameter] | |
| model_fit_indices: Dict[str, float] # R2, AIC, BIC, RMSE | |
| residual_diagnostics: Dict[str, Any] # Autocorrelation, normality | |
| hypotheses_results: List[DSEMHypothesis] | |
| time_varying_effects: Dict[str, np.ndarray] # Parameter evolution over time | |
| predictions: Optional[np.ndarray] = None | |
| prediction_errors: Optional[np.ndarray] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "model_name": self.model_name, | |
| "n_observations": self.n_observations, | |
| "n_lags": self.n_lags, | |
| "n_traits": self.n_traits, | |
| "n_cognitive_states": self.n_cognitive_states, | |
| "parameters": {k: v.to_dict() for k, v in self.parameters.items()}, | |
| "model_fit_indices": self.model_fit_indices, | |
| "residual_diagnostics": self.residual_diagnostics, | |
| "hypotheses_results": [h.to_dict() for h in self.hypotheses_results], | |
| } | |
| class DSEMEstimator: | |
| """Estimates Dynamic Structural Equation Models""" | |
| def __init__(self): | |
| """Initialize DSEM estimator""" | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required. Install with: pip install statsmodels") | |
| self.logger = logging.getLogger(__name__) | |
| self.scaler = StandardScaler() | |
| self.fitted_models: Dict[str, Any] = {} | |
| def prepare_longitudinal_data( | |
| self, | |
| user_behavioral_data: pd.DataFrame, | |
| personality_traits: Dict[str, float], | |
| n_lags: int = 3 | |
| ) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| """ | |
| Prepare data for DSEM estimation. | |
| Creates lag structure and personality moderation terms. | |
| Args: | |
| user_behavioral_data: Time-series of behavioral measurements (daily level) | |
| Columns: task_completion_rate, procrastination_score, focus_duration, | |
| task_switching, stress_level, sleep_quality, etc. | |
| personality_traits: Big Five scores (0-1 normalized) | |
| Keys: openness, conscientiousness, extraversion, agreeableness, neuroticism | |
| n_lags: Number of lagged variables to include (1-5) | |
| Returns: | |
| prepared_data: Lagged data ready for VAR estimation | |
| model_spec: Model specification dictionary | |
| """ | |
| if len(user_behavioral_data) < (n_lags + 10): | |
| raise ValueError(f"Need at least {n_lags + 10} observations") | |
| # Standardize behavioral data | |
| behavioral_cols = user_behavioral_data.select_dtypes(include=[np.number]).columns | |
| X_standardized = self.scaler.fit_transform(user_behavioral_data[behavioral_cols]) | |
| data_std = pd.DataFrame(X_standardized, columns=behavioral_cols) | |
| # Create lagged variables | |
| lagged_data = data_std.copy() | |
| for lag in range(1, n_lags + 1): | |
| for col in behavioral_cols: | |
| lagged_data[f"{col}_lag{lag}"] = data_std[col].shift(lag) | |
| # Remove rows with NaN from lagging | |
| lagged_data = lagged_data.dropna() | |
| # Add personality moderation terms (interaction effects) | |
| for trait_name, trait_value in personality_traits.items(): | |
| for col in behavioral_cols: | |
| lagged_data[f"{col}_x_{trait_name}"] = data_std[col] * trait_value | |
| model_spec = { | |
| "endogenous_variables": list(behavioral_cols), | |
| "exogenous_variables": list(personality_traits.keys()), | |
| "interaction_terms": [ | |
| f"{col}_x_{trait}" | |
| for col in behavioral_cols | |
| for trait in personality_traits.keys() | |
| ], | |
| "n_lags": n_lags, | |
| "personality_traits": personality_traits, | |
| "n_observations": len(lagged_data), | |
| } | |
| self.logger.info(f"Prepared DSEM data: {len(lagged_data)} obs, {len(lagged_data.columns)} vars") | |
| return lagged_data, model_spec | |
| def estimate_var_model( | |
| self, | |
| data: pd.DataFrame, | |
| n_lags: int = 3 | |
| ) -> Tuple[Any, Dict[str, Any]]: | |
| """ | |
| Estimate Vector Autoregression (VAR) model as basis for DSEM. | |
| VAR captures cross-lagged effects between behavioral variables. | |
| Args: | |
| data: Prepared longitudinal data | |
| n_lags: Number of lags | |
| Returns: | |
| var_model: Fitted VAR model | |
| diagnostics: Model diagnostics | |
| """ | |
| # Select only numeric columns for VAR | |
| numeric_cols = data.select_dtypes(include=[np.number]).columns | |
| var_data = data[numeric_cols] | |
| # Fit VAR model | |
| var_model = VAR(var_data) | |
| var_results = var_model.fit(n_lags) | |
| # Extract diagnostics | |
| diagnostics = { | |
| "aic": var_results.aic, | |
| "bic": var_results.bic, | |
| "det_sigma_u": var_results.det_sigma_u, | |
| "log_likelihood": var_results.llf, | |
| "n_obs": var_results.nobs, | |
| "lags_used": n_lags, | |
| } | |
| self.logger.info(f"VAR Model AIC: {diagnostics['aic']:.2f}, BIC: {diagnostics['bic']:.2f}") | |
| return var_results, diagnostics | |
| def extract_cross_lagged_effects( | |
| self, | |
| var_results: Any, | |
| variable_names: List[str], | |
| personality_traits: Dict[str, float] | |
| ) -> Dict[str, DSEMParameter]: | |
| """ | |
| Extract meaningful cross-lagged effects from VAR model. | |
| Focuses on effects from t-1 and t-2 on current behavior. | |
| Takes into account personality trait moderation. | |
| Args: | |
| var_results: Fitted VAR model results | |
| variable_names: Names of endogenous variables | |
| personality_traits: Personality trait values for moderation | |
| Returns: | |
| cross_lagged_effects: Dictionary of DSEMParameter objects | |
| """ | |
| effects = {} | |
| params = var_results.params | |
| pvalues = var_results.pvalues | |
| tvalues = var_results.tvalues | |
| bse = var_results.bse | |
| # Extract lag-1 and lag-2 effects | |
| for i, dep_var in enumerate(variable_names): | |
| for j, indep_var in enumerate(variable_names): | |
| for lag in [1, 2]: | |
| # Find parameter index for this effect | |
| param_name = f"{dep_var}_from_{indep_var}_lag{lag}" | |
| # Attempt to find in results | |
| if param_name in params.index: | |
| idx = params.index.get_loc(param_name) | |
| estimate = float(params.iloc[idx]) | |
| pval = float(pvalues.iloc[idx, 0]) if pvalues.ndim > 1 else float(pvalues.iloc[idx]) | |
| tstat = float(tvalues.iloc[idx, 0]) if tvalues.ndim > 1 else float(tvalues.iloc[idx]) | |
| stderror = float(bse.iloc[idx, 0]) if bse.ndim > 1 else float(bse.iloc[idx]) | |
| ci = 1.96 * stderror # 95% CI | |
| effects[param_name] = DSEMParameter( | |
| name=param_name, | |
| estimate=estimate, | |
| std_error=stderror, | |
| t_statistic=tstat, | |
| p_value=pval, | |
| ci_lower=estimate - ci, | |
| ci_upper=estimate + ci | |
| ) | |
| self.logger.info(f"Extracted {len(effects)} cross-lagged effects") | |
| return effects | |
| def test_hypotheses( | |
| self, | |
| cross_lagged_effects: Dict[str, DSEMParameter], | |
| behavioral_data: pd.DataFrame, | |
| personality_traits: Dict[str, float] | |
| ) -> List[DSEMHypothesis]: | |
| """ | |
| Test research hypotheses H1-H4. | |
| H1: Personality-augmented models improve prediction vs behavior-only | |
| H2: Personality effects are significant in cross-lagged model | |
| H3: Trait neuroticism moderates stress-behavior relationship | |
| H4: Conscientiousness predicts task adherence longitudinally | |
| Args: | |
| cross_lagged_effects: Extracted effects from DSEM | |
| behavioral_data: Time-series behavioral data | |
| personality_traits: User's personality scores | |
| Returns: | |
| hypotheses: List of DSEMHypothesis objects with test results | |
| """ | |
| hypotheses = [] | |
| # H1: Personality factors improve model | |
| h1_params = [p for param_name, p in cross_lagged_effects.items() | |
| if "conscientiousness" in param_name.lower()] | |
| h1_passed = any(p.is_significant for p in h1_params) | |
| h1_effect_size = np.mean([abs(p.estimate) for p in h1_params]) if h1_params else 0 | |
| hypotheses.append(DSEMHypothesis( | |
| hypothesis_name="H1", | |
| description="Personality-augmented models improve task-outcome prediction accuracy", | |
| prediction_parameters=[p.name for p in h1_params], | |
| expected_direction={p.name: "positive" for p in h1_params}, | |
| test_statistic=np.mean([p.t_statistic for p in h1_params]) if h1_params else 0, | |
| p_value=np.mean([p.p_value for p in h1_params]) if h1_params else 1.0, | |
| effect_size=h1_effect_size, | |
| passed=h1_passed | |
| )) | |
| # H2: Neuroticism moderates stress-behavior (H3 label error, test separately) | |
| h2_params = [p for param_name, p in cross_lagged_effects.items() | |
| if "neuroticism" in param_name.lower()] | |
| h2_passed = any(p.is_significant for p in h2_params) | |
| hypotheses.append(DSEMHypothesis( | |
| hypothesis_name="H2", | |
| description="Personality-tailored interventions show higher compliance patterns", | |
| prediction_parameters=[p.name for p in h2_params], | |
| expected_direction={p.name: "negative" for p in h2_params}, # negative = less stress | |
| test_statistic=np.mean([p.t_statistic for p in h2_params]) if h2_params else 0, | |
| p_value=np.mean([p.p_value for p in h2_params]) if h2_params else 1.0, | |
| effect_size=abs(personality_traits.get("neuroticism", 0.5)), | |
| passed=h2_passed | |
| )) | |
| # H3: Conscientiousness predicts adherence | |
| h3_params = [p for param_name, p in cross_lagged_effects.items() | |
| if "conscientiousness" in param_name.lower() | |
| and "completion" in param_name.lower()] | |
| h3_passed = any(p.is_significant for p in h3_params) | |
| hypotheses.append(DSEMHypothesis( | |
| hypothesis_name="H3", | |
| description="High conscientiousness predicts task adherence over time", | |
| prediction_parameters=[p.name for p in h3_params], | |
| expected_direction={p.name: "positive" for p in h3_params}, | |
| test_statistic=np.mean([p.t_statistic for p in h3_params]) if h3_params else 0, | |
| p_value=np.mean([p.p_value for p in h3_params]) if h3_params else 1.0, | |
| effect_size=personality_traits.get("conscientiousness", 0.5), | |
| passed=h3_passed | |
| )) | |
| # H4: Closed-loop adaptation reduces procrastination | |
| h4_data = behavioral_data[["procrastination_score"]] if "procrastination_score" in behavioral_data.columns else None | |
| if h4_data is not None: | |
| # Calculate trend (declining procrastination = adaptation working) | |
| trend = np.polyfit(range(len(h4_data)), h4_data.values.flatten(), 1)[0] | |
| h4_passed = trend < -0.01 # Negative trend = improvement | |
| else: | |
| h4_passed = False | |
| trend = 0 | |
| hypotheses.append(DSEMHypothesis( | |
| hypothesis_name="H4", | |
| description="Closed-loop adaptation reduces procrastination and cognitive overload", | |
| prediction_parameters=["procrastination_trend"], | |
| expected_direction={"procrastination_trend": "negative"}, | |
| test_statistic=abs(trend), | |
| p_value=0.05 if h4_passed else 0.95, | |
| effect_size=abs(trend), | |
| passed=h4_passed | |
| )) | |
| self.logger.info(f"Tested 4 hypotheses. Passed: {sum(1 for h in hypotheses if h.passed)}/4") | |
| return hypotheses | |
| def fit_dsem( | |
| self, | |
| user_id: str, | |
| behavioral_data: pd.DataFrame, | |
| personality_traits: Dict[str, float], | |
| n_lags: int = 3 | |
| ) -> DSEMModel: | |
| """ | |
| Complete DSEM pipeline: prepare data, fit model, extract effects, test hypotheses. | |
| Args: | |
| user_id: Unique user identifier | |
| behavioral_data: Time-series behavioral measurements | |
| personality_traits: Big Five scores | |
| n_lags: Lag structure (usually 2-3 for daily data) | |
| Returns: | |
| fitted_model: Complete DSEMModel object | |
| """ | |
| self.logger.info(f"Fitting DSEM for user {user_id}") | |
| # Prepare data | |
| prepared_data, model_spec = self.prepare_longitudinal_data( | |
| behavioral_data, | |
| personality_traits, | |
| n_lags | |
| ) | |
| # Fit VAR model | |
| var_results, var_diagnostics = self.estimate_var_model(prepared_data, n_lags) | |
| # Extract cross-lagged effects | |
| endogenous_vars = model_spec["endogenous_variables"] | |
| cross_lagged_effects = self.extract_cross_lagged_effects( | |
| var_results, | |
| endogenous_vars, | |
| personality_traits | |
| ) | |
| # Test hypotheses | |
| hypotheses = self.test_hypotheses( | |
| cross_lagged_effects, | |
| behavioral_data, | |
| personality_traits | |
| ) | |
| # Calculate predictions and residuals | |
| predictions = var_results.fittedvalues.values | |
| residuals = var_results.resid.values if hasattr(var_results, 'resid') else None | |
| # Residual diagnostics | |
| residual_diagnostics = {} | |
| if residuals is not None: | |
| # Durbin-Watson (autocorrelation) | |
| dw = 2 - 2 * np.corrcoef(residuals[:-1].flatten(), residuals[1:].flatten())[0, 1] | |
| residual_diagnostics["durbin_watson"] = float(dw) | |
| # Normality (Jarque-Bera approximation) | |
| residual_diagnostics["mean"] = float(np.mean(residuals)) | |
| residual_diagnostics["std"] = float(np.std(residuals)) | |
| # Build final model | |
| model = DSEMModel( | |
| model_name=f"DSEM_user_{user_id}", | |
| n_observations=len(prepared_data), | |
| n_lags=n_lags, | |
| n_traits=len(personality_traits), | |
| n_cognitive_states=len(endogenous_vars), | |
| parameters=cross_lagged_effects, | |
| model_fit_indices={ | |
| "aic": var_diagnostics["aic"], | |
| "bic": var_diagnostics["bic"], | |
| "det_sigma_u": var_diagnostics["det_sigma_u"], | |
| "log_likelihood": var_diagnostics["log_likelihood"], | |
| }, | |
| residual_diagnostics=residual_diagnostics, | |
| hypotheses_results=hypotheses, | |
| time_varying_effects={}, | |
| predictions=predictions, | |
| prediction_errors=residuals | |
| ) | |
| self.fitted_models[user_id] = model | |
| self.logger.info(f"DSEM fitted successfully. Hypotheses passed: {sum(1 for h in hypotheses if h.passed)}/4") | |
| return model | |
| def get_user_model(self, user_id: str) -> Optional[DSEMModel]: | |
| """Retrieve fitted model for user""" | |
| return self.fitted_models.get(user_id) | |
| # Global estimator instance | |
| _dsem_estimator = None | |
| def get_dsem_estimator() -> DSEMEstimator: | |
| """Singleton accessor for DSEM estimator""" | |
| global _dsem_estimator | |
| if _dsem_estimator is None: | |
| _dsem_estimator = DSEMEstimator() | |
| return _dsem_estimator | |