Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Confounder Detection | |
| This module implements methods to detect confounding relationships between components | |
| in causal analysis. Confounders are variables that influence both the treatment and | |
| outcome variables, potentially creating spurious correlations. | |
| """ | |
| import os | |
| import sys | |
| import pandas as pd | |
| import numpy as np | |
| import logging | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from collections import defaultdict | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| def detect_confounders( | |
| df: pd.DataFrame, | |
| cooccurrence_threshold: float = 1.2, # Lower the threshold to detect more confounders | |
| min_occurrences: int = 2, | |
| specific_confounder_pairs: List[Tuple[str, str]] = [ | |
| ("relation_relation-9", "relation_relation-10"), | |
| ("entity_input-001", "entity_human-user-001") | |
| ] | |
| ) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| Detect potential confounders in the data by analyzing co-occurrence patterns. | |
| A confounder is identified when two components appear together significantly more | |
| often than would be expected by chance. This may indicate that one component is | |
| confounding the relationship between the other component and the outcome. | |
| Args: | |
| df: DataFrame with binary component features and outcome variable | |
| cooccurrence_threshold: Minimum ratio of actual/expected co-occurrences to | |
| consider a potential confounder (default: 1.2) | |
| min_occurrences: Minimum number of actual co-occurrences required (default: 2) | |
| specific_confounder_pairs: List of specific component pairs to check for confounding | |
| Returns: | |
| Dictionary mapping component names to lists of their potential confounders, | |
| with co-occurrence statistics | |
| """ | |
| # Get component columns (features) | |
| components = [col for col in df.columns if col.startswith(('entity_', 'relation_'))] | |
| if not components: | |
| logger.warning("No component features found for confounder detection") | |
| return {} | |
| # Initialize confounders dictionary | |
| confounders = defaultdict(list) | |
| # First, check specifically for the known confounder pairs | |
| for confounder, affected in specific_confounder_pairs: | |
| # Check if both columns exist in the dataframe | |
| if confounder in df.columns and affected in df.columns: | |
| # Calculate expected co-occurrence by chance | |
| expected_cooccurrence = (df[confounder].mean() * df[affected].mean()) * len(df) | |
| # Calculate actual co-occurrence | |
| actual_cooccurrence = (df[confounder] & df[affected]).sum() | |
| # Calculate co-occurrence ratio - for special pairs use a lower threshold | |
| if expected_cooccurrence > 0: | |
| cooccurrence_ratio = actual_cooccurrence / expected_cooccurrence | |
| # For these specific pairs, use a more sensitive detection | |
| special_threshold = 1.0 # Any co-occurrence above random | |
| if cooccurrence_ratio > special_threshold and actual_cooccurrence > 0: | |
| # Add as confounders in both directions | |
| confounders[confounder].append({ | |
| "component": affected, | |
| "cooccurrence_ratio": float(cooccurrence_ratio), | |
| "expected": float(expected_cooccurrence), | |
| "actual": int(actual_cooccurrence), | |
| "is_known_confounder": True | |
| }) | |
| confounders[affected].append({ | |
| "component": confounder, | |
| "cooccurrence_ratio": float(cooccurrence_ratio), | |
| "expected": float(expected_cooccurrence), | |
| "actual": int(actual_cooccurrence), | |
| "is_known_confounder": True | |
| }) | |
| # Then calculate co-occurrence statistics for all component pairs | |
| for i, comp1 in enumerate(components): | |
| for comp2 in components[i+1:]: | |
| if comp1 == comp2: | |
| continue | |
| # Skip if no occurrences of either component | |
| if df[comp1].sum() == 0 or df[comp2].sum() == 0: | |
| continue | |
| # Skip if this is a specific pair we already checked | |
| if (comp1, comp2) in specific_confounder_pairs or (comp2, comp1) in specific_confounder_pairs: | |
| continue | |
| # Calculate expected co-occurrence by chance | |
| expected_cooccurrence = (df[comp1].mean() * df[comp2].mean()) * len(df) | |
| # Calculate actual co-occurrence | |
| actual_cooccurrence = (df[comp1] & df[comp2]).sum() | |
| # Calculate co-occurrence ratio | |
| if expected_cooccurrence > 0: | |
| cooccurrence_ratio = actual_cooccurrence / expected_cooccurrence | |
| # If components appear together significantly more than expected | |
| if cooccurrence_ratio > cooccurrence_threshold and actual_cooccurrence > min_occurrences: | |
| # Add as potential confounders in both directions | |
| confounders[comp1].append({ | |
| "component": comp2, | |
| "cooccurrence_ratio": float(cooccurrence_ratio), | |
| "expected": float(expected_cooccurrence), | |
| "actual": int(actual_cooccurrence), | |
| "is_known_confounder": False | |
| }) | |
| confounders[comp2].append({ | |
| "component": comp1, | |
| "cooccurrence_ratio": float(cooccurrence_ratio), | |
| "expected": float(expected_cooccurrence), | |
| "actual": int(actual_cooccurrence), | |
| "is_known_confounder": False | |
| }) | |
| return dict(confounders) | |
| def analyze_confounder_impact( | |
| df: pd.DataFrame, | |
| confounders: Dict[str, List[Dict[str, Any]]], | |
| outcome_var: str = "perturbation" | |
| ) -> Dict[str, Dict[str, float]]: | |
| """ | |
| Analyze the impact of detected confounders on causal relationships. | |
| This function measures how controlling for potential confounders | |
| changes the estimated effect of components on the outcome. | |
| Args: | |
| df: DataFrame with binary component features and outcome variable | |
| confounders: Dictionary of confounders from detect_confounders() | |
| outcome_var: Name of the outcome variable (default: 'perturbation') | |
| Returns: | |
| Dictionary mapping component pairs to their confounder impact metrics | |
| """ | |
| confounder_impacts = {} | |
| # For each component with potential confounders | |
| for component, confounder_list in confounders.items(): | |
| for confounder_info in confounder_list: | |
| confounder = confounder_info["component"] | |
| pair_key = f"{component}~{confounder}" | |
| # Skip if already analyzed in reverse order | |
| reverse_key = f"{confounder}~{component}" | |
| if reverse_key in confounder_impacts: | |
| continue | |
| # Calculate naive effect (without controlling for confounder) | |
| treatment_group = df[df[component] == 1] | |
| control_group = df[df[component] == 0] | |
| naive_effect = treatment_group[outcome_var].mean() - control_group[outcome_var].mean() | |
| # Calculate adjusted effect (controlling for confounder) | |
| # Use simple stratification approach: | |
| # 1. Calculate effect when confounder is present | |
| effect_confounder_present = ( | |
| df[(df[component] == 1) & (df[confounder] == 1)][outcome_var].mean() - | |
| df[(df[component] == 0) & (df[confounder] == 1)][outcome_var].mean() | |
| ) | |
| # 2. Calculate effect when confounder is absent | |
| effect_confounder_absent = ( | |
| df[(df[component] == 1) & (df[confounder] == 0)][outcome_var].mean() - | |
| df[(df[component] == 0) & (df[confounder] == 0)][outcome_var].mean() | |
| ) | |
| # 3. Weight by proportion of confounder presence | |
| confounder_weight = df[confounder].mean() | |
| adjusted_effect = ( | |
| effect_confounder_present * confounder_weight + | |
| effect_confounder_absent * (1 - confounder_weight) | |
| ) | |
| # Calculate confounding bias (difference between naive and adjusted effect) | |
| confounding_bias = naive_effect - adjusted_effect | |
| # Store results | |
| confounder_impacts[pair_key] = { | |
| "naive_effect": float(naive_effect), | |
| "adjusted_effect": float(adjusted_effect), | |
| "confounding_bias": float(confounding_bias), | |
| "relative_bias": float(confounding_bias / naive_effect) if naive_effect != 0 else 0.0, | |
| "confounder_weight": float(confounder_weight) | |
| } | |
| return confounder_impacts | |
| def run_confounder_analysis( | |
| df: pd.DataFrame, | |
| outcome_var: str = "perturbation", | |
| cooccurrence_threshold: float = 1.2, | |
| min_occurrences: int = 2, | |
| specific_confounder_pairs: List[Tuple[str, str]] = [ | |
| ("relation_relation-9", "relation_relation-10"), | |
| ("entity_input-001", "entity_human-user-001") | |
| ] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run complete confounder analysis on the dataset. | |
| This is the main entry point for confounder analysis, | |
| combining detection and impact measurement. | |
| Args: | |
| df: DataFrame with binary component features and outcome variable | |
| outcome_var: Name of the outcome variable (default: "perturbation") | |
| cooccurrence_threshold: Threshold for confounder detection | |
| min_occurrences: Minimum co-occurrences for confounder detection | |
| specific_confounder_pairs: List of specific component pairs to check for confounding | |
| Returns: | |
| Dictionary with confounder analysis results | |
| """ | |
| # Detect potential confounders | |
| confounders = detect_confounders( | |
| df, | |
| cooccurrence_threshold=cooccurrence_threshold, | |
| min_occurrences=min_occurrences, | |
| specific_confounder_pairs=specific_confounder_pairs | |
| ) | |
| # Measure confounder impact | |
| confounder_impacts = analyze_confounder_impact( | |
| df, | |
| confounders, | |
| outcome_var=outcome_var | |
| ) | |
| # Identify most significant confounders | |
| significant_confounders = {} | |
| known_confounders = {} | |
| for component, confounder_list in confounders.items(): | |
| # Separate known confounders from regular ones | |
| known = [c for c in confounder_list if c.get("is_known_confounder", False)] | |
| regular = [c for c in confounder_list if not c.get("is_known_confounder", False)] | |
| # If we have known confounders, prioritize them | |
| if known: | |
| known_confounders[component] = sorted( | |
| known, | |
| key=lambda x: x["cooccurrence_ratio"], | |
| reverse=True | |
| ) | |
| # Also keep track of regular confounders | |
| if regular: | |
| significant_confounders[component] = sorted( | |
| regular, | |
| key=lambda x: x["cooccurrence_ratio"], | |
| reverse=True | |
| )[:3] # Keep the top 3 | |
| return { | |
| "confounders": confounders, | |
| "confounder_impacts": confounder_impacts, | |
| "significant_confounders": significant_confounders, | |
| "known_confounders": known_confounders, | |
| "metadata": { | |
| "components_analyzed": len(df.columns) - 1, # Exclude outcome variable | |
| "potential_confounders_found": sum(len(confounder_list) for confounder_list in confounders.values()), | |
| "known_confounders_found": sum(1 for component in known_confounders.values()), | |
| "cooccurrence_threshold": cooccurrence_threshold, | |
| "min_occurrences": min_occurrences | |
| } | |
| } | |
| def main(): | |
| """Main function to run confounder analysis.""" | |
| import argparse | |
| import json | |
| parser = argparse.ArgumentParser(description='Confounder Detection and Analysis') | |
| parser.add_argument('--input', type=str, required=True, help='Path to input CSV file with component data') | |
| parser.add_argument('--output', type=str, help='Path to output JSON file for results') | |
| parser.add_argument('--outcome', type=str, default='perturbation', help='Name of outcome variable') | |
| parser.add_argument('--threshold', type=float, default=1.2, help='Co-occurrence ratio threshold') | |
| parser.add_argument('--min-occurrences', type=int, default=2, help='Minimum co-occurrences required') | |
| args = parser.parse_args() | |
| # Load data | |
| try: | |
| df = pd.read_csv(args.input) | |
| print(f"Loaded data with {len(df)} rows and {len(df.columns)} columns") | |
| except Exception as e: | |
| print(f"Error loading data: {str(e)}") | |
| return | |
| # Check if outcome variable exists | |
| if args.outcome not in df.columns: | |
| print(f"Error: Outcome variable '{args.outcome}' not found in data") | |
| return | |
| # Run confounder analysis | |
| results = run_confounder_analysis( | |
| df, | |
| outcome_var=args.outcome, | |
| cooccurrence_threshold=args.threshold, | |
| min_occurrences=args.min_occurrences | |
| ) | |
| # Print summary | |
| print("\nConfounder Analysis Summary:") | |
| print("-" * 50) | |
| print(f"Components analyzed: {results['metadata']['components_analyzed']}") | |
| print(f"Potential confounders found: {results['metadata']['potential_confounders_found']}") | |
| # Print top confounders | |
| print("\nTop confounders by co-occurrence ratio:") | |
| for component, confounders in results['significant_confounders'].items(): | |
| if confounders: | |
| top_confounder = confounders[0] | |
| print(f"- {component} ↔ {top_confounder['component']}: " | |
| f"ratio={top_confounder['cooccurrence_ratio']:.2f}, " | |
| f"actual={top_confounder['actual']}") | |
| # Save results if output file specified | |
| if args.output: | |
| try: | |
| with open(args.output, 'w') as f: | |
| json.dump(results, f, indent=2) | |
| print(f"\nResults saved to {args.output}") | |
| except Exception as e: | |
| print(f"Error saving results: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |