| """ |
| Privacy-Preserving Machine Learning Solution |
| ============================================= |
| Implements differential privacy and data encryption for healthcare data classification. |
| Designed for Hugging Face Spaces deployment (CPU-only, free tier compatible). |
| |
| Author: Data Science Assignment |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| import hashlib |
| import base64 |
| import warnings |
| from datetime import datetime |
| from typing import Tuple, Dict, Any |
|
|
| |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import LabelEncoder, StandardScaler |
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report |
|
|
| |
| |
| try: |
| from diffprivlib.models import LogisticRegression as DPLogisticRegression |
| from diffprivlib.models import GaussianNB as DPGaussianNB |
| DIFFPRIVLIB_AVAILABLE = True |
| except ImportError: |
| DIFFPRIVLIB_AVAILABLE = False |
| print("Warning: diffprivlib not installed. Install with: pip install diffprivlib") |
|
|
| warnings.filterwarnings('ignore') |
|
|
|
|
| |
| |
| |
|
|
| class DataPrivacyProcessor: |
| """ |
| Handles multiple privacy-preserving transformations: |
| 1. Hashing (SHA-256) for direct identifiers like SSN |
| 2. K-anonymity style generalization for quasi-identifiers |
| 3. Data masking for names |
| 4. Noise addition (Laplace mechanism) for numerical values |
| """ |
| |
| def __init__(self, epsilon: float = 1.0): |
| """ |
| Args: |
| epsilon: Privacy budget for differential privacy. |
| Lower = more privacy, less utility. |
| Typical range: 0.1 (high privacy) to 10 (low privacy) |
| """ |
| self.epsilon = epsilon |
| self.salt = "privacy_salt_2024" |
| |
| def hash_identifier(self, value: str) -> str: |
| """ |
| One-way hash for direct identifiers (SSN, etc.). |
| Uses SHA-256 with salt to prevent rainbow table attacks. |
| """ |
| if pd.isna(value): |
| return "HASH_NULL" |
| salted = f"{self.salt}{value}" |
| return hashlib.sha256(salted.encode()).hexdigest()[:16] |
| |
| def mask_name(self, name: str) -> str: |
| """ |
| Pseudonymizes names while keeping format for utility. |
| Example: 'John Smith' -> 'P_A1B2C3' |
| """ |
| if pd.isna(name): |
| return "P_NULL" |
| |
| hash_val = hashlib.md5(f"{self.salt}{name}".encode()).hexdigest()[:6] |
| return f"P_{hash_val.upper()}" |
| |
| def generalize_age(self, dob_str: str) -> str: |
| """ |
| K-anonymity: Generalizes exact DOB to age ranges. |
| Reduces re-identification risk while preserving analytical value. |
| """ |
| if pd.isna(dob_str): |
| return "Unknown" |
| try: |
| |
| for fmt in ['%m/%d/%Y', '%Y-%m-%d', '%d/%m/%Y']: |
| try: |
| dob = datetime.strptime(str(dob_str), fmt) |
| break |
| except ValueError: |
| continue |
| else: |
| return "Unknown" |
| |
| age = (datetime.now() - dob).days // 365 |
| |
| |
| if age < 25: |
| return "18-24" |
| elif age < 35: |
| return "25-34" |
| elif age < 45: |
| return "35-44" |
| elif age < 55: |
| return "45-54" |
| elif age < 65: |
| return "55-64" |
| else: |
| return "65+" |
| except Exception: |
| return "Unknown" |
| |
| def generalize_income(self, income: float) -> str: |
| """ |
| K-anonymity: Buckets income into ranges. |
| Prevents exact salary identification. |
| """ |
| if pd.isna(income): |
| return "Unknown" |
| try: |
| income = float(income) |
| if income < 30000: |
| return "Low (<30K)" |
| elif income < 50000: |
| return "Medium-Low (30-50K)" |
| elif income < 75000: |
| return "Medium (50-75K)" |
| elif income < 100000: |
| return "Medium-High (75-100K)" |
| else: |
| return "High (100K+)" |
| except (ValueError, TypeError): |
| return "Unknown" |
| |
| def add_laplace_noise(self, value: float, sensitivity: float = 1.0) -> float: |
| """ |
| Differential Privacy: Adds calibrated Laplace noise. |
| Provides plausible deniability for individual records. |
| |
| Args: |
| value: Original numeric value |
| sensitivity: How much one person can affect the output |
| """ |
| if pd.isna(value): |
| return value |
| scale = sensitivity / self.epsilon |
| noise = np.random.laplace(0, scale) |
| return value + noise |
| |
| def encrypt_dataset(self, df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Applies appropriate privacy technique to each column type. |
| Returns fully anonymized/encrypted dataset. |
| """ |
| encrypted_df = df.copy() |
| |
| print("Applying privacy-preserving transformations...") |
| |
| |
| if 'SSN' in encrypted_df.columns: |
| encrypted_df['SSN_Hash'] = encrypted_df['SSN'].apply(self.hash_identifier) |
| encrypted_df.drop('SSN', axis=1, inplace=True) |
| print(" ✓ SSN hashed with SHA-256") |
| |
| |
| if 'Name' in encrypted_df.columns: |
| encrypted_df['Name_Pseudo'] = encrypted_df['Name'].apply(self.mask_name) |
| encrypted_df.drop('Name', axis=1, inplace=True) |
| print(" ✓ Names pseudonymized") |
| |
| |
| if 'DOB' in encrypted_df.columns: |
| encrypted_df['Age_Range'] = encrypted_df['DOB'].apply(self.generalize_age) |
| encrypted_df.drop('DOB', axis=1, inplace=True) |
| print(" ✓ DOB generalized to age ranges") |
| |
| |
| if 'Income' in encrypted_df.columns: |
| |
| encrypted_df['Income_Noisy'] = encrypted_df['Income'].apply( |
| lambda x: self.add_laplace_noise(x, sensitivity=5000) |
| ) |
| encrypted_df['Income_Range'] = encrypted_df['Income'].apply(self.generalize_income) |
| encrypted_df.drop('Income', axis=1, inplace=True) |
| print(" ✓ Income: noise added + generalized") |
| |
| |
| numeric_noise_cols = ['Heart Rate'] |
| for col in numeric_noise_cols: |
| if col in encrypted_df.columns: |
| encrypted_df[f'{col}_Noisy'] = encrypted_df[col].apply( |
| lambda x: self.add_laplace_noise(x, sensitivity=5) |
| ) |
| print(f" ✓ {col}: Laplace noise added") |
| |
| print(f"\nPrivacy budget (epsilon) used: {self.epsilon}") |
| return encrypted_df |
|
|
|
|
| |
| |
| |
|
|
| class HealthcareDataProcessor: |
| """ |
| Prepares healthcare data for ML model training. |
| Handles encoding, scaling, and feature engineering. |
| """ |
| |
| def __init__(self): |
| self.label_encoders = {} |
| self.scaler = StandardScaler() |
| self.feature_columns = [] |
| |
| def load_and_clean(self, filepath: str) -> pd.DataFrame: |
| """Load CSV and perform basic cleaning.""" |
| df = pd.read_csv(filepath) |
| |
| |
| df = df.dropna(axis=1, how='all') |
| |
| |
| df = df.drop_duplicates() |
| |
| |
| df.columns = df.columns.str.strip() |
| |
| print(f"Loaded {len(df)} records with {len(df.columns)} features") |
| return df |
| |
| def prepare_features(self, df: pd.DataFrame, target_col: str = 'Tumor Condition') -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Encodes categorical features and prepares for ML. |
| Returns feature matrix X and target vector y. |
| """ |
| |
| if target_col not in df.columns: |
| raise ValueError(f"Target column '{target_col}' not found!") |
| |
| |
| y = df[target_col].copy() |
| X_df = df.drop(columns=[target_col]) |
| |
| |
| cols_to_drop = ['Name', 'SSN', 'Name_Pseudo', 'SSN_Hash', 'DOB'] |
| X_df = X_df.drop(columns=[c for c in cols_to_drop if c in X_df.columns], errors='ignore') |
| |
| |
| le_target = LabelEncoder() |
| y_encoded = le_target.fit_transform(y.fillna('Unknown')) |
| self.label_encoders['target'] = le_target |
| |
| |
| processed_cols = [] |
| for col in X_df.columns: |
| if X_df[col].dtype in ['object', 'category']: |
| |
| le = LabelEncoder() |
| X_df[col] = le.fit_transform(X_df[col].fillna('Unknown').astype(str)) |
| self.label_encoders[col] = le |
| else: |
| |
| X_df[col] = pd.to_numeric(X_df[col], errors='coerce') |
| X_df[col] = X_df[col].fillna(X_df[col].median()) |
| processed_cols.append(col) |
| |
| self.feature_columns = processed_cols |
| |
| |
| X_scaled = self.scaler.fit_transform(X_df) |
| |
| print(f"Prepared {X_scaled.shape[1]} features for {X_scaled.shape[0]} samples") |
| return X_scaled, y_encoded |
|
|
|
|
| |
| |
| |
|
|
| class PrivacyPreservingMLPipeline: |
| """ |
| Complete ML pipeline comparing: |
| 1. Standard model (no privacy) |
| 2. Differentially private model |
| 3. Model trained on encrypted data |
| """ |
| |
| def __init__(self, epsilon: float = 1.0): |
| self.epsilon = epsilon |
| self.results = {} |
| |
| def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray, model_name: str) -> Dict[str, float]: |
| """Calculate and store standard metrics.""" |
| metrics = { |
| 'accuracy': accuracy_score(y_true, y_pred), |
| 'precision': precision_score(y_true, y_pred, average='weighted', zero_division=0), |
| 'recall': recall_score(y_true, y_pred, average='weighted', zero_division=0), |
| 'f1': f1_score(y_true, y_pred, average='weighted', zero_division=0) |
| } |
| self.results[model_name] = metrics |
| return metrics |
| |
| def train_standard_model(self, X_train: np.ndarray, X_test: np.ndarray, |
| y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: |
| """Train standard logistic regression (no privacy).""" |
| print("\n" + "="*60) |
| print("TRAINING STANDARD MODEL (No Privacy Protection)") |
| print("="*60) |
| |
| model = LogisticRegression(max_iter=1000, random_state=42) |
| model.fit(X_train, y_train) |
| y_pred = model.predict(X_test) |
| |
| metrics = self.evaluate_model(y_test, y_pred, 'Standard LR') |
| print(f"Accuracy: {metrics['accuracy']:.4f}") |
| print(f"F1 Score: {metrics['f1']:.4f}") |
| |
| return metrics |
| |
| def train_dp_model(self, X_train: np.ndarray, X_test: np.ndarray, |
| y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: |
| """Train differentially private logistic regression.""" |
| print("\n" + "="*60) |
| print(f"TRAINING DP MODEL (Epsilon = {self.epsilon})") |
| print("="*60) |
| |
| if not DIFFPRIVLIB_AVAILABLE: |
| print("diffprivlib not available - skipping DP model") |
| return {} |
| |
| |
| data_norm = np.linalg.norm(X_train, axis=1).max() |
| |
| dp_model = DPLogisticRegression( |
| epsilon=self.epsilon, |
| data_norm=data_norm, |
| max_iter=1000, |
| random_state=42 |
| ) |
| |
| dp_model.fit(X_train, y_train) |
| y_pred = dp_model.predict(X_test) |
| |
| metrics = self.evaluate_model(y_test, y_pred, f'DP LR (ε={self.epsilon})') |
| print(f"Accuracy: {metrics['accuracy']:.4f}") |
| print(f"F1 Score: {metrics['f1']:.4f}") |
| |
| return metrics |
| |
| def train_on_encrypted_data(self, X_train: np.ndarray, X_test: np.ndarray, |
| y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]: |
| """Train model on encrypted/anonymized dataset.""" |
| print("\n" + "="*60) |
| print("TRAINING ON ENCRYPTED DATA") |
| print("="*60) |
| |
| |
| |
| model = RandomForestClassifier( |
| n_estimators=100, |
| max_depth=10, |
| random_state=42, |
| n_jobs=-1 |
| ) |
| |
| model.fit(X_train, y_train) |
| y_pred = model.predict(X_test) |
| |
| metrics = self.evaluate_model(y_test, y_pred, 'RF on Encrypted Data') |
| print(f"Accuracy: {metrics['accuracy']:.4f}") |
| print(f"F1 Score: {metrics['f1']:.4f}") |
| |
| return metrics |
| |
| def compare_results(self) -> pd.DataFrame: |
| """Generate comparison table of all models.""" |
| if not self.results: |
| return pd.DataFrame() |
| |
| comparison = pd.DataFrame(self.results).T |
| comparison = comparison.round(4) |
| |
| print("\n" + "="*60) |
| print("MODEL COMPARISON RESULTS") |
| print("="*60) |
| print(comparison.to_string()) |
| |
| return comparison |
|
|
|
|
| |
| |
| |
|
|
| def run_complete_pipeline(data_path: str, epsilon: float = 1.0) -> Tuple[pd.DataFrame, pd.DataFrame, Dict]: |
| """ |
| Execute the complete privacy-preserving ML pipeline. |
| |
| Args: |
| data_path: Path to the CSV dataset |
| epsilon: Privacy budget for differential privacy |
| |
| Returns: |
| - Original cleaned DataFrame |
| - Encrypted DataFrame |
| - Dictionary of all results |
| """ |
| print("="*70) |
| print("PRIVACY-PRESERVING MACHINE LEARNING PIPELINE") |
| print("="*70) |
| print(f"Privacy budget (epsilon): {epsilon}") |
| print(f"Data file: {data_path}") |
| print("="*70) |
| |
| |
| processor = HealthcareDataProcessor() |
| df_original = processor.load_and_clean(data_path) |
| |
| print("\n--- ORIGINAL DATA SAMPLE ---") |
| print(df_original.head(3).to_string()) |
| |
| |
| privacy_processor = DataPrivacyProcessor(epsilon=epsilon) |
| df_encrypted = privacy_processor.encrypt_dataset(df_original) |
| |
| print("\n--- ENCRYPTED DATA SAMPLE ---") |
| print(df_encrypted.head(3).to_string()) |
| |
| |
| encrypted_path = data_path.replace('.csv', '_encrypted.csv') |
| df_encrypted.to_csv(encrypted_path, index=False) |
| print(f"\n✓ Encrypted dataset saved to: {encrypted_path}") |
| |
| |
| X_orig, y_orig = processor.prepare_features(df_original.copy()) |
| X_train_orig, X_test_orig, y_train_orig, y_test_orig = train_test_split( |
| X_orig, y_orig, test_size=0.2, random_state=42, stratify=y_orig |
| ) |
| |
| |
| processor_enc = HealthcareDataProcessor() |
| df_enc_clean = df_encrypted.copy() |
| X_enc, y_enc = processor_enc.prepare_features(df_enc_clean) |
| X_train_enc, X_test_enc, y_train_enc, y_test_enc = train_test_split( |
| X_enc, y_enc, test_size=0.2, random_state=42, stratify=y_enc |
| ) |
| |
| |
| pipeline = PrivacyPreservingMLPipeline(epsilon=epsilon) |
| |
| |
| pipeline.train_standard_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig) |
| |
| |
| if DIFFPRIVLIB_AVAILABLE: |
| pipeline.train_dp_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig) |
| |
| |
| pipeline.train_on_encrypted_data(X_train_enc, X_test_enc, y_train_enc, y_test_enc) |
| |
| |
| comparison = pipeline.compare_results() |
| |
| |
| results = { |
| 'original_shape': df_original.shape, |
| 'encrypted_shape': df_encrypted.shape, |
| 'epsilon': epsilon, |
| 'model_comparison': comparison.to_dict(), |
| 'privacy_techniques_applied': [ |
| 'SHA-256 Hashing (SSN)', |
| 'Pseudonymization (Names)', |
| 'K-Anonymity Generalization (DOB, Income)', |
| 'Laplace Noise Addition (Numerical features)', |
| f'Differential Privacy (ε={epsilon})' |
| ] |
| } |
| |
| print("\n" + "="*70) |
| print("PIPELINE COMPLETED SUCCESSFULLY") |
| print("="*70) |
| |
| return df_original, df_encrypted, results |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import sys |
| |
| |
| data_file = "Assignment2Dataset-1.csv" |
| epsilon = 1.0 |
| |
| |
| if len(sys.argv) > 1: |
| data_file = sys.argv[1] |
| if len(sys.argv) > 2: |
| epsilon = float(sys.argv[2]) |
| |
| |
| df_orig, df_enc, results = run_complete_pipeline(data_file, epsilon) |
| |
| print("\n\nFinal Summary:") |
| print("-" * 40) |
| print(f"Original records: {results['original_shape'][0]}") |
| print(f"Privacy techniques applied: {len(results['privacy_techniques_applied'])}") |
| print(f"Epsilon value: {results['epsilon']}") |
|
|