Spaces:
Running
Running
| # Propensity Score Matching Implementation | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.neighbors import NearestNeighbors | |
| import statsmodels.api as sm # For bias adjustment regression | |
| import logging # For logging fallback | |
| from typing import Dict, List, Optional, Any | |
| # Import DoWhy | |
| from dowhy import CausalModel | |
| from .base import estimate_propensity_scores, format_ps_results, select_propensity_model | |
| from .diagnostics import assess_balance #, plot_overlap, plot_balance # Import diagnostic functions | |
| # Remove determine_optimal_caliper, it will be replaced by a heuristic | |
| from .llm_assist import get_llm_parameters # Import LLM helpers | |
| logger = logging.getLogger(__name__) | |
| def _calculate_logit(pscore): | |
| """Calculate logit of propensity score, clipping to avoid inf.""" | |
| # Clip pscore to prevent log(0) or log(1) issues which lead to inf | |
| epsilon = 1e-7 | |
| pscore_clipped = np.clip(pscore, epsilon, 1 - epsilon) | |
| return np.log(pscore_clipped / (1 - pscore_clipped)) | |
| def _perform_matching_and_get_att( | |
| df_sample: pd.DataFrame, | |
| treatment: str, | |
| outcome: str, | |
| covariates: List[str], | |
| propensity_model_type: str, | |
| n_neighbors: int, | |
| caliper: float, | |
| perform_bias_adjustment: bool, | |
| **kwargs | |
| ) -> float: | |
| """ | |
| Helper to perform Custom KNN PSM and calculate ATT, potentially with bias adjustment. | |
| Returns the ATT estimate. | |
| """ | |
| df_ps = df_sample.copy() | |
| try: | |
| propensity_scores = estimate_propensity_scores( | |
| df_ps, treatment, covariates, model_type=propensity_model_type, **kwargs | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Propensity score estimation failed in helper: {e}") | |
| return np.nan # Cannot proceed without propensity scores | |
| df_ps['propensity_score'] = propensity_scores | |
| treated = df_ps[df_ps[treatment] == 1] | |
| control = df_ps[df_ps[treatment] == 0] | |
| if treated.empty or control.empty: | |
| return np.nan | |
| nn = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2) | |
| try: | |
| # Ensure control PS are valid before fitting | |
| control_ps_values = control[['propensity_score']].values | |
| if np.isnan(control_ps_values).any(): | |
| logger.warning("NaN values found in control propensity scores before NN fitting.") | |
| return np.nan | |
| nn.fit(control_ps_values) | |
| # Ensure treated PS are valid before querying | |
| treated_ps_values = treated[['propensity_score']].values | |
| if np.isnan(treated_ps_values).any(): | |
| logger.warning("NaN values found in treated propensity scores before NN query.") | |
| return np.nan | |
| distances, indices = nn.kneighbors(treated_ps_values) | |
| except ValueError as e: | |
| # Handles case where control group might be too small or have NaN PS scores | |
| logger.warning(f"NearestNeighbors fitting/query failed: {e}") | |
| return np.nan | |
| matched_outcomes_treated = [] | |
| matched_outcomes_control_means = [] | |
| propensity_diffs = [] | |
| for i in range(len(treated)): | |
| treated_unit = treated.iloc[[i]] | |
| valid_neighbors_mask = distances[i] <= (caliper if caliper is not None else np.inf) | |
| valid_neighbors_idx = indices[i][valid_neighbors_mask] | |
| if len(valid_neighbors_idx) > 0: | |
| matched_controls_for_this_treated = control.iloc[valid_neighbors_idx] | |
| if matched_controls_for_this_treated.empty: | |
| continue # Should not happen with valid_neighbors_idx check, but safety | |
| matched_outcomes_treated.append(treated_unit[outcome].values[0]) | |
| matched_outcomes_control_means.append(matched_controls_for_this_treated[outcome].mean()) | |
| if perform_bias_adjustment: | |
| # Ensure PS scores are valid before calculating difference | |
| treated_ps = treated_unit['propensity_score'].values[0] | |
| control_ps_mean = matched_controls_for_this_treated['propensity_score'].mean() | |
| if np.isnan(treated_ps) or np.isnan(control_ps_mean): | |
| logger.warning("NaN propensity score encountered during bias adjustment calculation.") | |
| # Cannot perform bias adjustment for this unit, potentially skip or handle | |
| # For now, let's skip adding to propensity_diffs if NaN found | |
| continue | |
| propensity_diff = treated_ps - control_ps_mean | |
| propensity_diffs.append(propensity_diff) | |
| if not matched_outcomes_treated: | |
| return np.nan | |
| raw_att_components = np.array(matched_outcomes_treated) - np.array(matched_outcomes_control_means) | |
| if perform_bias_adjustment: | |
| # Ensure lengths match *after* potential skips due to NaNs | |
| if not propensity_diffs or len(raw_att_components) != len(propensity_diffs): | |
| logger.warning("Bias adjustment skipped due to inconsistent data lengths after NaN checks.") | |
| return np.mean(raw_att_components) | |
| try: | |
| X_bias_adj = sm.add_constant(np.array(propensity_diffs)) | |
| y_bias_adj = raw_att_components | |
| # Add check for NaNs/Infs in inputs to OLS | |
| if np.isnan(X_bias_adj).any() or np.isnan(y_bias_adj).any() or \ | |
| np.isinf(X_bias_adj).any() or np.isinf(y_bias_adj).any(): | |
| logger.warning("NaN/Inf values detected in OLS inputs for bias adjustment. Falling back.") | |
| return np.mean(raw_att_components) | |
| bias_model = sm.OLS(y_bias_adj, X_bias_adj).fit() | |
| bias_adjusted_att = bias_model.params[0] | |
| return bias_adjusted_att | |
| except Exception as e: | |
| logger.warning(f"OLS for bias adjustment failed: {e}. Falling back to raw ATT.") | |
| return np.mean(raw_att_components) | |
| else: | |
| return np.mean(raw_att_components) | |
| def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str, | |
| covariates: List[str], **kwargs) -> Dict[str, Any]: | |
| '''Estimate ATT using Propensity Score Matching. | |
| Tries DoWhy's PSM first, falls back to custom implementation if DoWhy fails. | |
| Uses bootstrap SE based on the custom implementation regardless. | |
| ''' | |
| query = kwargs.get('query') | |
| n_bootstraps = kwargs.get('n_bootstraps', 100) | |
| # --- Parameter Setup (as before) --- | |
| llm_params = get_llm_parameters(df, query, "PS.Matching") | |
| llm_suggested_params = llm_params.get("parameters", {}) | |
| caliper = kwargs.get('caliper', llm_suggested_params.get('caliper')) | |
| temp_propensity_scores_for_caliper = None | |
| try: | |
| temp_propensity_scores_for_caliper = estimate_propensity_scores( | |
| df, treatment, covariates, | |
| model_type=llm_suggested_params.get('propensity_model_type', 'logistic'), | |
| **kwargs | |
| ) | |
| if caliper is None and temp_propensity_scores_for_caliper is not None: | |
| logit_ps = _calculate_logit(temp_propensity_scores_for_caliper) | |
| if not np.isnan(logit_ps).all(): # Check if logit calculation was successful | |
| caliper = 0.2 * np.nanstd(logit_ps) # Use nanstd for robustness | |
| else: | |
| logger.warning("Logit of propensity scores resulted in NaNs, cannot calculate heuristic caliper.") | |
| caliper = None | |
| elif caliper is None: | |
| logger.warning("Could not estimate propensity scores for caliper heuristic.") | |
| caliper = None | |
| except Exception as e: | |
| logger.warning(f"Failed to estimate initial propensity scores for caliper heuristic: {e}. Caliper set to None.") | |
| caliper = None # Proceed without caliper if heuristic fails | |
| n_neighbors = kwargs.get('n_neighbors', llm_suggested_params.get('n_neighbors', 1)) | |
| propensity_model_type = kwargs.get('propensity_model_type', | |
| llm_suggested_params.get('propensity_model_type', | |
| select_propensity_model(df, treatment, covariates, query))) | |
| # --- Attempt DoWhy PSM for Point Estimate --- | |
| att_estimate = np.nan | |
| method_used_for_att = "Fallback Custom PSM" | |
| dowhy_model = None | |
| identified_estimand = None | |
| try: | |
| logger.info("Attempting estimation using DoWhy Propensity Score Matching...") | |
| dowhy_model = CausalModel( | |
| data=df, | |
| treatment=treatment, | |
| outcome=outcome, | |
| common_causes=covariates, | |
| estimand_type='nonparametric-ate' # Provide list of names directly | |
| ) | |
| # Identify estimand (optional step, but good practice) | |
| identified_estimand = dowhy_model.identify_effect(proceed_when_unidentifiable=True) | |
| logger.info(f"DoWhy identified estimand: {identified_estimand}") | |
| # Estimate effect using DoWhy's PSM | |
| estimate = dowhy_model.estimate_effect( | |
| identified_estimand, | |
| method_name="backdoor.propensity_score_matching", | |
| target_units="att", | |
| method_params={} | |
| ) | |
| att_estimate = estimate.value | |
| method_used_for_att = "DoWhy PSM" | |
| logger.info(f"DoWhy PSM successful. ATT Estimate: {att_estimate}") | |
| except Exception as e: | |
| logger.warning(f"DoWhy PSM failed: {e}. Falling back to custom PSM implementation.") | |
| # Fallback is triggered implicitly if att_estimate remains NaN | |
| # --- Fallback or if DoWhy failed --- | |
| if np.isnan(att_estimate): | |
| logger.info("Calculating ATT estimate using fallback custom PSM...") | |
| att_estimate = _perform_matching_and_get_att( | |
| df, treatment, outcome, covariates, | |
| propensity_model_type, n_neighbors, caliper, | |
| perform_bias_adjustment=True, **kwargs # Bias adjust the fallback | |
| ) | |
| method_used_for_att = "Fallback Custom PSM" # Confirm it's fallback | |
| if np.isnan(att_estimate): | |
| raise ValueError("Fallback custom PSM estimation also failed. Cannot proceed.") | |
| logger.info(f"Fallback Custom PSM successful. ATT Estimate: {att_estimate}") | |
| # --- Bootstrap SE (using custom helper for consistency) --- | |
| logger.info(f"Calculating Bootstrap SE using custom helper ({n_bootstraps} iterations)...") | |
| bootstrap_atts = [] | |
| for i in range(n_bootstraps): | |
| try: | |
| # Ensure bootstrap samples are drawn correctly | |
| df_boot = df.sample(n=len(df), replace=True, random_state=np.random.randint(1000000) + i) | |
| # Bias adjustment in bootstrap can be slow, optionally disable it | |
| boot_att = _perform_matching_and_get_att( | |
| df_boot, treatment, outcome, covariates, | |
| propensity_model_type, n_neighbors, caliper, | |
| perform_bias_adjustment=False, **kwargs # Set bias adjustment to False for speed in bootstrap | |
| ) | |
| if not np.isnan(boot_att): | |
| bootstrap_atts.append(boot_att) | |
| except Exception as boot_e: | |
| logger.warning(f"Bootstrap iteration {i+1} failed: {boot_e}") | |
| continue # Skip failed bootstrap iteration | |
| att_se = np.nanstd(bootstrap_atts) if bootstrap_atts else np.nan # Use nanstd | |
| actual_bootstrap_iterations = len(bootstrap_atts) | |
| logger.info(f"Bootstrap SE calculated: {att_se} from {actual_bootstrap_iterations} successful iterations.") | |
| # --- Diagnostics (using custom matching logic for consistency) --- | |
| logger.info("Performing diagnostic checks using custom matching logic...") | |
| diagnostics = {"error": "Diagnostics failed to run."} | |
| propensity_scores_orig = temp_propensity_scores_for_caliper # Reuse if available and not None | |
| if propensity_scores_orig is None: | |
| try: | |
| propensity_scores_orig = estimate_propensity_scores( | |
| df, treatment, covariates, model_type=propensity_model_type, **kwargs | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to estimate propensity scores for diagnostics: {e}") | |
| propensity_scores_orig = None | |
| if propensity_scores_orig is not None and not np.isnan(propensity_scores_orig).all(): | |
| df_ps_orig = df.copy() | |
| df_ps_orig['propensity_score'] = propensity_scores_orig | |
| treated_orig = df_ps_orig[df_ps_orig[treatment] == 1] | |
| control_orig = df_ps_orig[df_ps_orig[treatment] == 0] | |
| unmatched_treated_count = 0 | |
| # Drop rows with NaN propensity scores before diagnostics | |
| treated_orig = treated_orig.dropna(subset=['propensity_score']) | |
| control_orig = control_orig.dropna(subset=['propensity_score']) | |
| if not treated_orig.empty and not control_orig.empty: | |
| try: | |
| nn_diag = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2) | |
| nn_diag.fit(control_orig[['propensity_score']].values) | |
| distances_diag, indices_diag = nn_diag.kneighbors(treated_orig[['propensity_score']].values) | |
| matched_treated_indices_diag = [] | |
| matched_control_indices_diag = [] | |
| for i in range(len(treated_orig)): | |
| valid_neighbors_mask_diag = distances_diag[i] <= (caliper if caliper is not None else np.inf) | |
| valid_neighbors_idx_diag = indices_diag[i][valid_neighbors_mask_diag] | |
| if len(valid_neighbors_idx_diag) > 0: | |
| # Get original DataFrame indices from control_orig based on iloc indices | |
| selected_control_original_indices = control_orig.index[valid_neighbors_idx_diag] | |
| matched_treated_indices_diag.extend([treated_orig.index[i]] * len(selected_control_original_indices)) | |
| matched_control_indices_diag.extend(selected_control_original_indices) | |
| else: | |
| unmatched_treated_count += 1 | |
| if matched_control_indices_diag: | |
| # Use unique indices for creating the diagnostic dataframe | |
| unique_matched_control_indices = list(set(matched_control_indices_diag)) | |
| unique_matched_treated_indices = list(set(matched_treated_indices_diag)) | |
| matched_control_df_diag = df.loc[unique_matched_control_indices] | |
| matched_treated_df_for_diag = df.loc[unique_matched_treated_indices] | |
| matched_df_diag = pd.concat([matched_treated_df_for_diag, matched_control_df_diag]).drop_duplicates() | |
| # Retrieve propensity scores for the specific units in matched_df_diag | |
| ps_matched_for_diag = propensity_scores_orig.loc[matched_df_diag.index] | |
| diagnostics = assess_balance(df, matched_df_diag, treatment, covariates, | |
| method="PSM", | |
| propensity_scores_original=propensity_scores_orig, | |
| propensity_scores_matched=ps_matched_for_diag) | |
| else: | |
| diagnostics = {"message": "No units could be matched for diagnostic assessment."} | |
| # If no controls were matched, all treated were unmatched | |
| unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0 | |
| except Exception as diag_e: | |
| logger.error(f"Error during diagnostic matching/balance assessment: {diag_e}") | |
| diagnostics = {"error": f"Diagnostics failed: {diag_e}"} | |
| else: | |
| diagnostics = {"message": "Treatment or control group empty after dropping NaN PS, diagnostics skipped."} | |
| unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0 | |
| # Ensure unmatched count calculation is safe | |
| if 'unmatched_treated_count' not in locals(): | |
| unmatched_treated_count = 0 # Initialize if loop didn't run | |
| diagnostics["unmatched_treated_count"] = unmatched_treated_count | |
| diagnostics["percent_treated_matched"] = (len(treated_orig) - unmatched_treated_count) / len(treated_orig) * 100 if len(treated_orig) > 0 else 0 | |
| else: | |
| diagnostics = {"error": "Propensity scores could not be estimated for diagnostics."} | |
| # Add final details to diagnostics | |
| diagnostics["att_estimation_method"] = method_used_for_att | |
| diagnostics["propensity_score_model"] = propensity_model_type | |
| diagnostics["bootstrap_iterations_for_se"] = actual_bootstrap_iterations | |
| diagnostics["final_caliper_used"] = caliper | |
| # --- Format and return results --- | |
| logger.info(f"Formatting results. ATT Estimate: {att_estimate}, SE: {att_se}, Method: {method_used_for_att}") | |
| return format_ps_results(att_estimate, att_se, diagnostics, | |
| method_details=f"PSM ({method_used_for_att})", | |
| parameters={"caliper": caliper, | |
| "n_neighbors": n_neighbors, # n_neighbors used in fallback/bootstrap/diag | |
| "propensity_model": propensity_model_type, | |
| "n_bootstraps_config": n_bootstraps}) |