""" Privacy-Preserving Machine Learning Solution ============================================= Implements differential privacy and data encryption for healthcare data classification. Designed for Hugging Face Spaces deployment (CPU-only, free tier compatible). Author: Data Science Assignment """ import pandas as pd import numpy as np import hashlib import base64 import warnings from datetime import datetime from typing import Tuple, Dict, Any # Core ML libraries from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report # Differential Privacy library - IBM's diffprivlib # Lightweight, sklearn-compatible, works on CPU try: from diffprivlib.models import LogisticRegression as DPLogisticRegression from diffprivlib.models import GaussianNB as DPGaussianNB DIFFPRIVLIB_AVAILABLE = True except ImportError: DIFFPRIVLIB_AVAILABLE = False print("Warning: diffprivlib not installed. Install with: pip install diffprivlib") warnings.filterwarnings('ignore') # ============================================================================ # SECTION 1: DATA ENCRYPTION UTILITIES # ============================================================================ class DataPrivacyProcessor: """ Handles multiple privacy-preserving transformations: 1. Hashing (SHA-256) for direct identifiers like SSN 2. K-anonymity style generalization for quasi-identifiers 3. Data masking for names 4. Noise addition (Laplace mechanism) for numerical values """ def __init__(self, epsilon: float = 1.0): """ Args: epsilon: Privacy budget for differential privacy. Lower = more privacy, less utility. Typical range: 0.1 (high privacy) to 10 (low privacy) """ self.epsilon = epsilon self.salt = "privacy_salt_2024" # Salt for hashing def hash_identifier(self, value: str) -> str: """ One-way hash for direct identifiers (SSN, etc.). Uses SHA-256 with salt to prevent rainbow table attacks. """ if pd.isna(value): return "HASH_NULL" salted = f"{self.salt}{value}" return hashlib.sha256(salted.encode()).hexdigest()[:16] def mask_name(self, name: str) -> str: """ Pseudonymizes names while keeping format for utility. Example: 'John Smith' -> 'P_A1B2C3' """ if pd.isna(name): return "P_NULL" # Create deterministic pseudonym from hash hash_val = hashlib.md5(f"{self.salt}{name}".encode()).hexdigest()[:6] return f"P_{hash_val.upper()}" def generalize_age(self, dob_str: str) -> str: """ K-anonymity: Generalizes exact DOB to age ranges. Reduces re-identification risk while preserving analytical value. """ if pd.isna(dob_str): return "Unknown" try: # Handle multiple date formats for fmt in ['%m/%d/%Y', '%Y-%m-%d', '%d/%m/%Y']: try: dob = datetime.strptime(str(dob_str), fmt) break except ValueError: continue else: return "Unknown" age = (datetime.now() - dob).days // 365 # Create age buckets (5-year ranges for k-anonymity) if age < 25: return "18-24" elif age < 35: return "25-34" elif age < 45: return "35-44" elif age < 55: return "45-54" elif age < 65: return "55-64" else: return "65+" except Exception: return "Unknown" def generalize_income(self, income: float) -> str: """ K-anonymity: Buckets income into ranges. Prevents exact salary identification. """ if pd.isna(income): return "Unknown" try: income = float(income) if income < 30000: return "Low (<30K)" elif income < 50000: return "Medium-Low (30-50K)" elif income < 75000: return "Medium (50-75K)" elif income < 100000: return "Medium-High (75-100K)" else: return "High (100K+)" except (ValueError, TypeError): return "Unknown" def add_laplace_noise(self, value: float, sensitivity: float = 1.0) -> float: """ Differential Privacy: Adds calibrated Laplace noise. Provides plausible deniability for individual records. Args: value: Original numeric value sensitivity: How much one person can affect the output """ if pd.isna(value): return value scale = sensitivity / self.epsilon noise = np.random.laplace(0, scale) return value + noise def encrypt_dataset(self, df: pd.DataFrame) -> pd.DataFrame: """ Applies appropriate privacy technique to each column type. Returns fully anonymized/encrypted dataset. """ encrypted_df = df.copy() print("Applying privacy-preserving transformations...") # 1. Hash direct identifiers (SSN) - irreversible if 'SSN' in encrypted_df.columns: encrypted_df['SSN_Hash'] = encrypted_df['SSN'].apply(self.hash_identifier) encrypted_df.drop('SSN', axis=1, inplace=True) print(" ✓ SSN hashed with SHA-256") # 2. Pseudonymize names if 'Name' in encrypted_df.columns: encrypted_df['Name_Pseudo'] = encrypted_df['Name'].apply(self.mask_name) encrypted_df.drop('Name', axis=1, inplace=True) print(" ✓ Names pseudonymized") # 3. Generalize DOB to age ranges (k-anonymity) if 'DOB' in encrypted_df.columns: encrypted_df['Age_Range'] = encrypted_df['DOB'].apply(self.generalize_age) encrypted_df.drop('DOB', axis=1, inplace=True) print(" ✓ DOB generalized to age ranges") # 4. Generalize income (k-anonymity) if 'Income' in encrypted_df.columns: # Keep noisy version for ML, generalized for reporting encrypted_df['Income_Noisy'] = encrypted_df['Income'].apply( lambda x: self.add_laplace_noise(x, sensitivity=5000) ) encrypted_df['Income_Range'] = encrypted_df['Income'].apply(self.generalize_income) encrypted_df.drop('Income', axis=1, inplace=True) print(" ✓ Income: noise added + generalized") # 5. Add noise to other numerical health metrics numeric_noise_cols = ['Heart Rate'] for col in numeric_noise_cols: if col in encrypted_df.columns: encrypted_df[f'{col}_Noisy'] = encrypted_df[col].apply( lambda x: self.add_laplace_noise(x, sensitivity=5) ) print(f" ✓ {col}: Laplace noise added") print(f"\nPrivacy budget (epsilon) used: {self.epsilon}") return encrypted_df # ============================================================================ # SECTION 2: DATA PREPROCESSING # ============================================================================ class HealthcareDataProcessor: """ Prepares healthcare data for ML model training. Handles encoding, scaling, and feature engineering. """ def __init__(self): self.label_encoders = {} self.scaler = StandardScaler() self.feature_columns = [] def load_and_clean(self, filepath: str) -> pd.DataFrame: """Load CSV and perform basic cleaning.""" df = pd.read_csv(filepath) # Remove completely empty columns df = df.dropna(axis=1, how='all') # Remove duplicate rows df = df.drop_duplicates() # Clean column names df.columns = df.columns.str.strip() print(f"Loaded {len(df)} records with {len(df.columns)} features") return df def prepare_features(self, df: pd.DataFrame, target_col: str = 'Tumor Condition') -> Tuple[np.ndarray, np.ndarray]: """ Encodes categorical features and prepares for ML. Returns feature matrix X and target vector y. """ # Identify target if target_col not in df.columns: raise ValueError(f"Target column '{target_col}' not found!") # Separate features and target y = df[target_col].copy() X_df = df.drop(columns=[target_col]) # Remove non-predictive columns (identifiers) cols_to_drop = ['Name', 'SSN', 'Name_Pseudo', 'SSN_Hash', 'DOB'] X_df = X_df.drop(columns=[c for c in cols_to_drop if c in X_df.columns], errors='ignore') # Encode target variable le_target = LabelEncoder() y_encoded = le_target.fit_transform(y.fillna('Unknown')) self.label_encoders['target'] = le_target # Process each column processed_cols = [] for col in X_df.columns: if X_df[col].dtype in ['object', 'category']: # Categorical: label encode le = LabelEncoder() X_df[col] = le.fit_transform(X_df[col].fillna('Unknown').astype(str)) self.label_encoders[col] = le else: # Numeric: fill NaN with median X_df[col] = pd.to_numeric(X_df[col], errors='coerce') X_df[col] = X_df[col].fillna(X_df[col].median()) processed_cols.append(col) self.feature_columns = processed_cols # Scale features X_scaled = self.scaler.fit_transform(X_df) print(f"Prepared {X_scaled.shape[1]} features for {X_scaled.shape[0]} samples") return X_scaled, y_encoded # ============================================================================ # SECTION 3: MODEL TRAINING AND EVALUATION # ============================================================================ class PrivacyPreservingMLPipeline: """ Complete ML pipeline comparing: 1. Standard model (no privacy) 2. Differentially private model 3. Model trained on encrypted data """ def __init__(self, epsilon: float = 1.0): self.epsilon = epsilon self.results = {} def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray, model_name: str) -> Dict[str, float]: """Calculate and store standard metrics.""" metrics = { 'accuracy': accuracy_score(y_true, y_pred), 'precision': precision_score(y_true, y_pred, average='weighted', zero_division=0), 'recall': recall_score(y_true, y_pred, average='weighted', zero_division=0), 'f1': f1_score(y_true, y_pred, average='weighted', zero_division=0) } self.results[model_name] = metrics return metrics def train_standard_model(self, X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: """Train standard logistic regression (no privacy).""" print("\n" + "="*60) print("TRAINING STANDARD MODEL (No Privacy Protection)") print("="*60) model = LogisticRegression(max_iter=1000, random_state=42) model.fit(X_train, y_train) y_pred = model.predict(X_test) metrics = self.evaluate_model(y_test, y_pred, 'Standard LR') print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"F1 Score: {metrics['f1']:.4f}") return metrics def train_dp_model(self, X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: """Train differentially private logistic regression.""" print("\n" + "="*60) print(f"TRAINING DP MODEL (Epsilon = {self.epsilon})") print("="*60) if not DIFFPRIVLIB_AVAILABLE: print("diffprivlib not available - skipping DP model") return {} # Calculate data bounds for DP (required by diffprivlib) data_norm = np.linalg.norm(X_train, axis=1).max() dp_model = DPLogisticRegression( epsilon=self.epsilon, data_norm=data_norm, max_iter=1000, random_state=42 ) dp_model.fit(X_train, y_train) y_pred = dp_model.predict(X_test) metrics = self.evaluate_model(y_test, y_pred, f'DP LR (ε={self.epsilon})') print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"F1 Score: {metrics['f1']:.4f}") return metrics def train_on_encrypted_data(self, X_train: np.ndarray, X_test: np.ndarray, y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: """Train model on encrypted/anonymized dataset.""" print("\n" + "="*60) print("TRAINING ON ENCRYPTED DATA") print("="*60) # The data passed here is already encrypted/anonymized # We use Random Forest as it handles noisy data better model = RandomForestClassifier( n_estimators=100, max_depth=10, random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) y_pred = model.predict(X_test) metrics = self.evaluate_model(y_test, y_pred, 'RF on Encrypted Data') print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"F1 Score: {metrics['f1']:.4f}") return metrics def compare_results(self) -> pd.DataFrame: """Generate comparison table of all models.""" if not self.results: return pd.DataFrame() comparison = pd.DataFrame(self.results).T comparison = comparison.round(4) print("\n" + "="*60) print("MODEL COMPARISON RESULTS") print("="*60) print(comparison.to_string()) return comparison # ============================================================================ # SECTION 4: MAIN EXECUTION # ============================================================================ def run_complete_pipeline(data_path: str, epsilon: float = 1.0) -> Tuple[pd.DataFrame, pd.DataFrame, Dict]: """ Execute the complete privacy-preserving ML pipeline. Args: data_path: Path to the CSV dataset epsilon: Privacy budget for differential privacy Returns: - Original cleaned DataFrame - Encrypted DataFrame - Dictionary of all results """ print("="*70) print("PRIVACY-PRESERVING MACHINE LEARNING PIPELINE") print("="*70) print(f"Privacy budget (epsilon): {epsilon}") print(f"Data file: {data_path}") print("="*70) # Step 1: Load and clean data processor = HealthcareDataProcessor() df_original = processor.load_and_clean(data_path) print("\n--- ORIGINAL DATA SAMPLE ---") print(df_original.head(3).to_string()) # Step 2: Apply privacy transformations privacy_processor = DataPrivacyProcessor(epsilon=epsilon) df_encrypted = privacy_processor.encrypt_dataset(df_original) print("\n--- ENCRYPTED DATA SAMPLE ---") print(df_encrypted.head(3).to_string()) # Save encrypted dataset encrypted_path = data_path.replace('.csv', '_encrypted.csv') df_encrypted.to_csv(encrypted_path, index=False) print(f"\n✓ Encrypted dataset saved to: {encrypted_path}") # Step 3: Prepare features from ORIGINAL data X_orig, y_orig = processor.prepare_features(df_original.copy()) X_train_orig, X_test_orig, y_train_orig, y_test_orig = train_test_split( X_orig, y_orig, test_size=0.2, random_state=42, stratify=y_orig ) # Step 4: Prepare features from ENCRYPTED data processor_enc = HealthcareDataProcessor() df_enc_clean = df_encrypted.copy() X_enc, y_enc = processor_enc.prepare_features(df_enc_clean) X_train_enc, X_test_enc, y_train_enc, y_test_enc = train_test_split( X_enc, y_enc, test_size=0.2, random_state=42, stratify=y_enc ) # Step 5: Train and evaluate models pipeline = PrivacyPreservingMLPipeline(epsilon=epsilon) # Model 1: Standard (no privacy) pipeline.train_standard_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig) # Model 2: Differential Privacy if DIFFPRIVLIB_AVAILABLE: pipeline.train_dp_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig) # Model 3: Trained on encrypted data pipeline.train_on_encrypted_data(X_train_enc, X_test_enc, y_train_enc, y_test_enc) # Step 6: Generate comparison comparison = pipeline.compare_results() # Step 7: Summary results = { 'original_shape': df_original.shape, 'encrypted_shape': df_encrypted.shape, 'epsilon': epsilon, 'model_comparison': comparison.to_dict(), 'privacy_techniques_applied': [ 'SHA-256 Hashing (SSN)', 'Pseudonymization (Names)', 'K-Anonymity Generalization (DOB, Income)', 'Laplace Noise Addition (Numerical features)', f'Differential Privacy (ε={epsilon})' ] } print("\n" + "="*70) print("PIPELINE COMPLETED SUCCESSFULLY") print("="*70) return df_original, df_encrypted, results # ============================================================================ # SECTION 5: COMMAND LINE INTERFACE # ============================================================================ if __name__ == "__main__": import sys # Default settings data_file = "Assignment2Dataset-1.csv" epsilon = 1.0 # Balance between privacy and utility # Allow command line arguments if len(sys.argv) > 1: data_file = sys.argv[1] if len(sys.argv) > 2: epsilon = float(sys.argv[2]) # Run the complete pipeline df_orig, df_enc, results = run_complete_pipeline(data_file, epsilon) print("\n\nFinal Summary:") print("-" * 40) print(f"Original records: {results['original_shape'][0]}") print(f"Privacy techniques applied: {len(results['privacy_techniques_applied'])}") print(f"Epsilon value: {results['epsilon']}")