Privacy_Preserving_Machine_Learning / privacy_ml_solution.py
YashChowdhary's picture
Upload 6 files
5eb498a verified
"""
Privacy-Preserving Machine Learning Solution
=============================================
Implements differential privacy and data encryption for healthcare data classification.
Designed for Hugging Face Spaces deployment (CPU-only, free tier compatible).
Author: Data Science Assignment
"""
import pandas as pd
import numpy as np
import hashlib
import base64
import warnings
from datetime import datetime
from typing import Tuple, Dict, Any
# Core ML libraries
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
# Differential Privacy library - IBM's diffprivlib
# Lightweight, sklearn-compatible, works on CPU
try:
from diffprivlib.models import LogisticRegression as DPLogisticRegression
from diffprivlib.models import GaussianNB as DPGaussianNB
DIFFPRIVLIB_AVAILABLE = True
except ImportError:
DIFFPRIVLIB_AVAILABLE = False
print("Warning: diffprivlib not installed. Install with: pip install diffprivlib")
warnings.filterwarnings('ignore')
# ============================================================================
# SECTION 1: DATA ENCRYPTION UTILITIES
# ============================================================================
class DataPrivacyProcessor:
"""
Handles multiple privacy-preserving transformations:
1. Hashing (SHA-256) for direct identifiers like SSN
2. K-anonymity style generalization for quasi-identifiers
3. Data masking for names
4. Noise addition (Laplace mechanism) for numerical values
"""
def __init__(self, epsilon: float = 1.0):
"""
Args:
epsilon: Privacy budget for differential privacy.
Lower = more privacy, less utility.
Typical range: 0.1 (high privacy) to 10 (low privacy)
"""
self.epsilon = epsilon
self.salt = "privacy_salt_2024" # Salt for hashing
def hash_identifier(self, value: str) -> str:
"""
One-way hash for direct identifiers (SSN, etc.).
Uses SHA-256 with salt to prevent rainbow table attacks.
"""
if pd.isna(value):
return "HASH_NULL"
salted = f"{self.salt}{value}"
return hashlib.sha256(salted.encode()).hexdigest()[:16]
def mask_name(self, name: str) -> str:
"""
Pseudonymizes names while keeping format for utility.
Example: 'John Smith' -> 'P_A1B2C3'
"""
if pd.isna(name):
return "P_NULL"
# Create deterministic pseudonym from hash
hash_val = hashlib.md5(f"{self.salt}{name}".encode()).hexdigest()[:6]
return f"P_{hash_val.upper()}"
def generalize_age(self, dob_str: str) -> str:
"""
K-anonymity: Generalizes exact DOB to age ranges.
Reduces re-identification risk while preserving analytical value.
"""
if pd.isna(dob_str):
return "Unknown"
try:
# Handle multiple date formats
for fmt in ['%m/%d/%Y', '%Y-%m-%d', '%d/%m/%Y']:
try:
dob = datetime.strptime(str(dob_str), fmt)
break
except ValueError:
continue
else:
return "Unknown"
age = (datetime.now() - dob).days // 365
# Create age buckets (5-year ranges for k-anonymity)
if age < 25:
return "18-24"
elif age < 35:
return "25-34"
elif age < 45:
return "35-44"
elif age < 55:
return "45-54"
elif age < 65:
return "55-64"
else:
return "65+"
except Exception:
return "Unknown"
def generalize_income(self, income: float) -> str:
"""
K-anonymity: Buckets income into ranges.
Prevents exact salary identification.
"""
if pd.isna(income):
return "Unknown"
try:
income = float(income)
if income < 30000:
return "Low (<30K)"
elif income < 50000:
return "Medium-Low (30-50K)"
elif income < 75000:
return "Medium (50-75K)"
elif income < 100000:
return "Medium-High (75-100K)"
else:
return "High (100K+)"
except (ValueError, TypeError):
return "Unknown"
def add_laplace_noise(self, value: float, sensitivity: float = 1.0) -> float:
"""
Differential Privacy: Adds calibrated Laplace noise.
Provides plausible deniability for individual records.
Args:
value: Original numeric value
sensitivity: How much one person can affect the output
"""
if pd.isna(value):
return value
scale = sensitivity / self.epsilon
noise = np.random.laplace(0, scale)
return value + noise
def encrypt_dataset(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies appropriate privacy technique to each column type.
Returns fully anonymized/encrypted dataset.
"""
encrypted_df = df.copy()
print("Applying privacy-preserving transformations...")
# 1. Hash direct identifiers (SSN) - irreversible
if 'SSN' in encrypted_df.columns:
encrypted_df['SSN_Hash'] = encrypted_df['SSN'].apply(self.hash_identifier)
encrypted_df.drop('SSN', axis=1, inplace=True)
print(" ✓ SSN hashed with SHA-256")
# 2. Pseudonymize names
if 'Name' in encrypted_df.columns:
encrypted_df['Name_Pseudo'] = encrypted_df['Name'].apply(self.mask_name)
encrypted_df.drop('Name', axis=1, inplace=True)
print(" ✓ Names pseudonymized")
# 3. Generalize DOB to age ranges (k-anonymity)
if 'DOB' in encrypted_df.columns:
encrypted_df['Age_Range'] = encrypted_df['DOB'].apply(self.generalize_age)
encrypted_df.drop('DOB', axis=1, inplace=True)
print(" ✓ DOB generalized to age ranges")
# 4. Generalize income (k-anonymity)
if 'Income' in encrypted_df.columns:
# Keep noisy version for ML, generalized for reporting
encrypted_df['Income_Noisy'] = encrypted_df['Income'].apply(
lambda x: self.add_laplace_noise(x, sensitivity=5000)
)
encrypted_df['Income_Range'] = encrypted_df['Income'].apply(self.generalize_income)
encrypted_df.drop('Income', axis=1, inplace=True)
print(" ✓ Income: noise added + generalized")
# 5. Add noise to other numerical health metrics
numeric_noise_cols = ['Heart Rate']
for col in numeric_noise_cols:
if col in encrypted_df.columns:
encrypted_df[f'{col}_Noisy'] = encrypted_df[col].apply(
lambda x: self.add_laplace_noise(x, sensitivity=5)
)
print(f" ✓ {col}: Laplace noise added")
print(f"\nPrivacy budget (epsilon) used: {self.epsilon}")
return encrypted_df
# ============================================================================
# SECTION 2: DATA PREPROCESSING
# ============================================================================
class HealthcareDataProcessor:
"""
Prepares healthcare data for ML model training.
Handles encoding, scaling, and feature engineering.
"""
def __init__(self):
self.label_encoders = {}
self.scaler = StandardScaler()
self.feature_columns = []
def load_and_clean(self, filepath: str) -> pd.DataFrame:
"""Load CSV and perform basic cleaning."""
df = pd.read_csv(filepath)
# Remove completely empty columns
df = df.dropna(axis=1, how='all')
# Remove duplicate rows
df = df.drop_duplicates()
# Clean column names
df.columns = df.columns.str.strip()
print(f"Loaded {len(df)} records with {len(df.columns)} features")
return df
def prepare_features(self, df: pd.DataFrame, target_col: str = 'Tumor Condition') -> Tuple[np.ndarray, np.ndarray]:
"""
Encodes categorical features and prepares for ML.
Returns feature matrix X and target vector y.
"""
# Identify target
if target_col not in df.columns:
raise ValueError(f"Target column '{target_col}' not found!")
# Separate features and target
y = df[target_col].copy()
X_df = df.drop(columns=[target_col])
# Remove non-predictive columns (identifiers)
cols_to_drop = ['Name', 'SSN', 'Name_Pseudo', 'SSN_Hash', 'DOB']
X_df = X_df.drop(columns=[c for c in cols_to_drop if c in X_df.columns], errors='ignore')
# Encode target variable
le_target = LabelEncoder()
y_encoded = le_target.fit_transform(y.fillna('Unknown'))
self.label_encoders['target'] = le_target
# Process each column
processed_cols = []
for col in X_df.columns:
if X_df[col].dtype in ['object', 'category']:
# Categorical: label encode
le = LabelEncoder()
X_df[col] = le.fit_transform(X_df[col].fillna('Unknown').astype(str))
self.label_encoders[col] = le
else:
# Numeric: fill NaN with median
X_df[col] = pd.to_numeric(X_df[col], errors='coerce')
X_df[col] = X_df[col].fillna(X_df[col].median())
processed_cols.append(col)
self.feature_columns = processed_cols
# Scale features
X_scaled = self.scaler.fit_transform(X_df)
print(f"Prepared {X_scaled.shape[1]} features for {X_scaled.shape[0]} samples")
return X_scaled, y_encoded
# ============================================================================
# SECTION 3: MODEL TRAINING AND EVALUATION
# ============================================================================
class PrivacyPreservingMLPipeline:
"""
Complete ML pipeline comparing:
1. Standard model (no privacy)
2. Differentially private model
3. Model trained on encrypted data
"""
def __init__(self, epsilon: float = 1.0):
self.epsilon = epsilon
self.results = {}
def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray, model_name: str) -> Dict[str, float]:
"""Calculate and store standard metrics."""
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted', zero_division=0),
'recall': recall_score(y_true, y_pred, average='weighted', zero_division=0),
'f1': f1_score(y_true, y_pred, average='weighted', zero_division=0)
}
self.results[model_name] = metrics
return metrics
def train_standard_model(self, X_train: np.ndarray, X_test: np.ndarray,
y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
"""Train standard logistic regression (no privacy)."""
print("\n" + "="*60)
print("TRAINING STANDARD MODEL (No Privacy Protection)")
print("="*60)
model = LogisticRegression(max_iter=1000, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = self.evaluate_model(y_test, y_pred, 'Standard LR')
print(f"Accuracy: {metrics['accuracy']:.4f}")
print(f"F1 Score: {metrics['f1']:.4f}")
return metrics
def train_dp_model(self, X_train: np.ndarray, X_test: np.ndarray,
y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
"""Train differentially private logistic regression."""
print("\n" + "="*60)
print(f"TRAINING DP MODEL (Epsilon = {self.epsilon})")
print("="*60)
if not DIFFPRIVLIB_AVAILABLE:
print("diffprivlib not available - skipping DP model")
return {}
# Calculate data bounds for DP (required by diffprivlib)
data_norm = np.linalg.norm(X_train, axis=1).max()
dp_model = DPLogisticRegression(
epsilon=self.epsilon,
data_norm=data_norm,
max_iter=1000,
random_state=42
)
dp_model.fit(X_train, y_train)
y_pred = dp_model.predict(X_test)
metrics = self.evaluate_model(y_test, y_pred, f'DP LR (ε={self.epsilon})')
print(f"Accuracy: {metrics['accuracy']:.4f}")
print(f"F1 Score: {metrics['f1']:.4f}")
return metrics
def train_on_encrypted_data(self, X_train: np.ndarray, X_test: np.ndarray,
y_train: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
"""Train model on encrypted/anonymized dataset."""
print("\n" + "="*60)
print("TRAINING ON ENCRYPTED DATA")
print("="*60)
# The data passed here is already encrypted/anonymized
# We use Random Forest as it handles noisy data better
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = self.evaluate_model(y_test, y_pred, 'RF on Encrypted Data')
print(f"Accuracy: {metrics['accuracy']:.4f}")
print(f"F1 Score: {metrics['f1']:.4f}")
return metrics
def compare_results(self) -> pd.DataFrame:
"""Generate comparison table of all models."""
if not self.results:
return pd.DataFrame()
comparison = pd.DataFrame(self.results).T
comparison = comparison.round(4)
print("\n" + "="*60)
print("MODEL COMPARISON RESULTS")
print("="*60)
print(comparison.to_string())
return comparison
# ============================================================================
# SECTION 4: MAIN EXECUTION
# ============================================================================
def run_complete_pipeline(data_path: str, epsilon: float = 1.0) -> Tuple[pd.DataFrame, pd.DataFrame, Dict]:
"""
Execute the complete privacy-preserving ML pipeline.
Args:
data_path: Path to the CSV dataset
epsilon: Privacy budget for differential privacy
Returns:
- Original cleaned DataFrame
- Encrypted DataFrame
- Dictionary of all results
"""
print("="*70)
print("PRIVACY-PRESERVING MACHINE LEARNING PIPELINE")
print("="*70)
print(f"Privacy budget (epsilon): {epsilon}")
print(f"Data file: {data_path}")
print("="*70)
# Step 1: Load and clean data
processor = HealthcareDataProcessor()
df_original = processor.load_and_clean(data_path)
print("\n--- ORIGINAL DATA SAMPLE ---")
print(df_original.head(3).to_string())
# Step 2: Apply privacy transformations
privacy_processor = DataPrivacyProcessor(epsilon=epsilon)
df_encrypted = privacy_processor.encrypt_dataset(df_original)
print("\n--- ENCRYPTED DATA SAMPLE ---")
print(df_encrypted.head(3).to_string())
# Save encrypted dataset
encrypted_path = data_path.replace('.csv', '_encrypted.csv')
df_encrypted.to_csv(encrypted_path, index=False)
print(f"\n✓ Encrypted dataset saved to: {encrypted_path}")
# Step 3: Prepare features from ORIGINAL data
X_orig, y_orig = processor.prepare_features(df_original.copy())
X_train_orig, X_test_orig, y_train_orig, y_test_orig = train_test_split(
X_orig, y_orig, test_size=0.2, random_state=42, stratify=y_orig
)
# Step 4: Prepare features from ENCRYPTED data
processor_enc = HealthcareDataProcessor()
df_enc_clean = df_encrypted.copy()
X_enc, y_enc = processor_enc.prepare_features(df_enc_clean)
X_train_enc, X_test_enc, y_train_enc, y_test_enc = train_test_split(
X_enc, y_enc, test_size=0.2, random_state=42, stratify=y_enc
)
# Step 5: Train and evaluate models
pipeline = PrivacyPreservingMLPipeline(epsilon=epsilon)
# Model 1: Standard (no privacy)
pipeline.train_standard_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig)
# Model 2: Differential Privacy
if DIFFPRIVLIB_AVAILABLE:
pipeline.train_dp_model(X_train_orig, X_test_orig, y_train_orig, y_test_orig)
# Model 3: Trained on encrypted data
pipeline.train_on_encrypted_data(X_train_enc, X_test_enc, y_train_enc, y_test_enc)
# Step 6: Generate comparison
comparison = pipeline.compare_results()
# Step 7: Summary
results = {
'original_shape': df_original.shape,
'encrypted_shape': df_encrypted.shape,
'epsilon': epsilon,
'model_comparison': comparison.to_dict(),
'privacy_techniques_applied': [
'SHA-256 Hashing (SSN)',
'Pseudonymization (Names)',
'K-Anonymity Generalization (DOB, Income)',
'Laplace Noise Addition (Numerical features)',
f'Differential Privacy (ε={epsilon})'
]
}
print("\n" + "="*70)
print("PIPELINE COMPLETED SUCCESSFULLY")
print("="*70)
return df_original, df_encrypted, results
# ============================================================================
# SECTION 5: COMMAND LINE INTERFACE
# ============================================================================
if __name__ == "__main__":
import sys
# Default settings
data_file = "Assignment2Dataset-1.csv"
epsilon = 1.0 # Balance between privacy and utility
# Allow command line arguments
if len(sys.argv) > 1:
data_file = sys.argv[1]
if len(sys.argv) > 2:
epsilon = float(sys.argv[2])
# Run the complete pipeline
df_orig, df_enc, results = run_complete_pipeline(data_file, epsilon)
print("\n\nFinal Summary:")
print("-" * 40)
print(f"Original records: {results['original_shape'][0]}")
print(f"Privacy techniques applied: {len(results['privacy_techniques_applied'])}")
print(f"Epsilon value: {results['epsilon']}")