Privacy Auditor for Generative Models
Advanced Privacy Assessment Tool for Synthetic Data Generation
Implementing state-of-the-art one-run nearest-neighbor ε-DP auditing with enterprise features
Secure • Comprehensive • Fast • Configurable
import gradio as gr
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.special import gammaln
import plotly.graph_objects as go
from datetime import datetime
import json
import io
import zipfile
from typing import Dict, List, Tuple, Optional, Any
import logging
import traceback
from dataclasses import dataclass, asdict
from pathlib import Path
import warnings
import uuid
import os
import sys
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
warnings.filterwarnings('ignore')
def setup_logging():
"""Configure comprehensive logging system"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
formatters = {
'detailed': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'),
'simple': logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
}
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
handlers = [
(logging.StreamHandler(sys.stdout), logging.INFO, formatters['simple']),
(logging.FileHandler(log_dir / 'privacy_audit_detailed.log'), logging.DEBUG, formatters['detailed']),
(logging.FileHandler(log_dir / 'privacy_audit_errors.log'), logging.ERROR, formatters['detailed'])
]
for handler, level, formatter in handlers:
handler.setLevel(level)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
return logging.getLogger(__name__)
logger = setup_logging()
logger.info(f"Privacy Auditor Starting - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
@dataclass
class AuditConfig:
"""Enterprise configuration for privacy audit"""
confidence_level: float = 0.95
subsample_size: Optional[int] = None
categorical_encoding: str = "onehot"
numerical_scaling: str = "standard"
distance_metric: str = "euclidean"
enable_preprocessing_report: bool = True
max_file_size_mb: int = 500
timeout_seconds: int = 300
enable_data_validation: bool = True
chunk_size: int = 10000
max_categories_onehot: int = 50
def validate(self) -> List[str]:
"""Validate configuration parameters"""
validations = [
(0.5 <= self.confidence_level <= 0.999, "Confidence level must be between 0.5 and 0.999"),
(self.subsample_size is None or self.subsample_size >= 100, "Subsample size must be at least 100 if specified"),
(self.max_file_size_mb >= 1, "Max file size must be at least 1 MB"),
(self.timeout_seconds >= 10, "Timeout must be at least 10 seconds")
]
try:
return [msg for valid, msg in validations if not valid]
except Exception as e:
logger.error(f"Configuration validation error: {e}")
return [f"Configuration validation failed: {str(e)}"]
class SafeDataProcessor:
"""Safe data processing with comprehensive error handling"""
SUPPORTED_ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']
@classmethod
def safe_read_csv(cls, file_path: str, max_rows: int = None) -> Tuple[Optional[pd.DataFrame], str]:
"""Safely read CSV file with error handling"""
try:
if not os.path.exists(file_path):
return None, f"File not found: {file_path}"
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
logger.info(f"Reading CSV: {file_path} ({file_size_mb:.2f} MB)")
for encoding in cls.SUPPORTED_ENCODINGS:
try:
df = pd.read_csv(file_path, encoding=encoding, nrows=max_rows, low_memory=False)
logger.info(f"Loaded dataset: {df.shape[0]} rows, {df.shape[1]} columns ({encoding})")
return df, ""
except (UnicodeDecodeError, Exception) as e:
if encoding == cls.SUPPORTED_ENCODINGS[-1]:
logger.warning(f"All encodings failed, last error: {e}")
continue
return None, "Failed to read file with any supported encoding"
except Exception as e:
error_msg = f"Error reading CSV file: {str(e)}"
logger.error(error_msg)
return None, error_msg
@staticmethod
def safe_json_convert(obj: Any) -> Any:
"""Convert object to JSON-serializable format"""
conversion_map = {
np.integer: int,
np.floating: float,
np.ndarray: lambda x: x.tolist(),
pd.Series: lambda x: x.to_dict(),
pd.DataFrame: lambda x: x.to_dict()
}
try:
for obj_type, converter in conversion_map.items():
if isinstance(obj, obj_type):
return converter(obj)
if hasattr(obj, 'dtype'):
dtype_str = str(obj.dtype)
if 'int' in dtype_str:
return int(obj)
elif 'float' in dtype_str:
return float(obj)
return str(obj)
except Exception as e:
logger.warning(f"JSON conversion failed for {type(obj)}: {e}")
return str(obj)
class DataValidator:
"""Enhanced data validation with detailed reporting"""
@classmethod
def validate_dataframe(cls, df: pd.DataFrame, name: str) -> Dict[str, Any]:
"""Comprehensive dataframe validation"""
logger.info(f"Validating dataframe: {name}")
if df.empty:
return cls._empty_dataframe_result(name)
issues, warnings_list = [], []
if len(df) < 10:
warnings_list.append(f"{name}: Very small dataset ({len(df)} rows)")
try:
memory_mb = cls._calculate_memory_usage(df)
cls._validate_columns(df, name, warnings_list)
cls._check_data_quality(df, name, warnings_list)
column_types = cls._analyze_column_types(df)
return cls._build_validation_result(df, issues, warnings_list, memory_mb, column_types)
except Exception as e:
return cls._handle_validation_error(e, name, df, warnings_list)
@staticmethod
def _empty_dataframe_result(name: str) -> Dict[str, Any]:
return {
"valid": False,
"errors": [f"{name}: Dataset is empty"],
"warnings": [],
"shape": (0, 0),
"memory_usage_mb": 0
}
@staticmethod
def _calculate_memory_usage(df: pd.DataFrame) -> float:
try:
return df.memory_usage(deep=True).sum() / (1024 * 1024)
except Exception:
return 0.0
@staticmethod
def _validate_columns(df: pd.DataFrame, name: str, warnings_list: List[str]):
for col in df.columns:
try:
if df[col].dtype == 'object':
sample_data = df[col].dropna().head(100)
if len(sample_data) > 0:
sample_types = set(type(x).__name__ for x in sample_data)
if len(sample_types) > 2:
warnings_list.append(f"{name}: Column '{col}' has mixed data types")
if df[col].nunique() <= 1:
warnings_list.append(f"{name}: Column '{col}' is constant")
except Exception as e:
warnings_list.append(f"{name}: Error analyzing column '{col}': {str(e)}")
@staticmethod
def _check_data_quality(df: pd.DataFrame, name: str, warnings_list: List[str]):
try:
missing_pct = (df.isnull().sum() / len(df)) * 100
high_missing = missing_pct[missing_pct > 50]
if not high_missing.empty:
warnings_list.append(f"{name}: High missing values: {high_missing.to_dict()}")
except Exception as e:
warnings_list.append(f"{name}: Error checking missing values: {str(e)}")
try:
duplicates = df.duplicated().sum()
if duplicates > len(df) * 0.1:
warnings_list.append(f"{name}: High duplicate rows ({duplicates})")
except Exception as e:
warnings_list.append(f"{name}: Error checking duplicates: {str(e)}")
@staticmethod
def _analyze_column_types(df: pd.DataFrame) -> Dict[str, int]:
try:
return {str(k): int(v) for k, v in df.dtypes.value_counts().to_dict().items()}
except Exception:
return {}
@staticmethod
def _build_validation_result(df: pd.DataFrame, issues: List[str], warnings_list: List[str],
memory_mb: float, column_types: Dict[str, int]) -> Dict[str, Any]:
return {
"valid": len(issues) == 0,
"errors": issues,
"warnings": warnings_list,
"shape": df.shape,
"memory_usage_mb": memory_mb,
"column_types": column_types,
"null_counts": SafeDataProcessor.safe_json_convert(df.isnull().sum().to_dict())
}
@staticmethod
def _handle_validation_error(e: Exception, name: str, df: pd.DataFrame,
warnings_list: List[str]) -> Dict[str, Any]:
error_msg = f"Validation failed for {name}: {str(e)}"
logger.error(error_msg)
return {
"valid": False,
"errors": [error_msg],
"warnings": warnings_list,
"shape": df.shape if hasattr(df, 'shape') else (0, 0),
"memory_usage_mb": 0,
"column_types": {}
}
class EnhancedPrivacyAuditor:
"""Enhanced privacy auditor with comprehensive error handling and logging"""
def __init__(self, config: AuditConfig = None):
self.config = config or AuditConfig()
self.audit_history = []
self.session_id = str(uuid.uuid4())[:8]
self.current_audit_id = None
logger.info(f"Initialized Privacy Auditor - Session: {self.session_id}")
logger.info(f"Configuration: {asdict(self.config)}")
def validate_inputs(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]:
"""Comprehensive input validation with enhanced error handling"""
logger.info("Starting comprehensive input validation")
try:
validator = DataValidator()
# Validate individual datasets
real_validation = validator.validate_dataframe(real_data, "Real Dataset")
synth_validation = validator.validate_dataframe(synthetic_data, "Synthetic Dataset")
all_errors = real_validation["errors"] + synth_validation["errors"]
all_warnings = real_validation["warnings"] + synth_validation["warnings"]
# Cross-dataset validation
if real_validation["valid"] and synth_validation["valid"]:
try:
real_cols = set(real_data.columns)
synth_cols = set(synthetic_data.columns)
missing_in_synth = real_cols - synth_cols
missing_in_real = synth_cols - real_cols
if missing_in_synth:
all_warnings.append(f"Columns missing in synthetic data: {list(missing_in_synth)}")
if missing_in_real:
all_warnings.append(f"Extra columns in synthetic data: {list(missing_in_real)}")
# Check data type compatibility
common_cols = real_cols & synth_cols
for col in common_cols:
try:
real_type = real_data[col].dtype
synth_type = synthetic_data[col].dtype
if real_type != synth_type:
all_warnings.append(f"Type mismatch in column '{col}': {real_type} vs {synth_type}")
except Exception as e:
all_warnings.append(f"Error checking column '{col}': {str(e)}")
except Exception as e:
all_warnings.append(f"Cross-validation error: {str(e)}")
result = {
"valid": len(all_errors) == 0,
"errors": all_errors,
"warnings": all_warnings,
"real_dataset": real_validation,
"synthetic_dataset": synth_validation
}
logger.info(f"Validation completed - Valid: {result['valid']}, Errors: {len(all_errors)}, Warnings: {len(all_warnings)}")
return result
except Exception as e:
error_msg = f"Input validation failed: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return {
"valid": False,
"errors": [error_msg],
"warnings": [],
"real_dataset": {"valid": False, "errors": [error_msg]},
"synthetic_dataset": {"valid": False, "errors": [error_msg]}
}
def safe_preprocess_data(self, df: pd.DataFrame, is_real: bool = True) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""Enhanced data preprocessing with comprehensive error handling"""
dataset_type = "real" if is_real else "synthetic"
logger.info(f"Starting preprocessing for {dataset_type} dataset")
report = {
"dataset_type": dataset_type,
"original_shape": df.shape,
"start_time": datetime.now().isoformat(),
"success": False,
"steps_completed": []
}
try:
# Create working copy
df_processed = df.copy()
report["steps_completed"].append("data_copy")
# Handle missing values
try:
missing_counts = df_processed.isnull().sum()
if missing_counts.any():
logger.info(f"Handling missing values in {len(missing_counts[missing_counts > 0])} columns")
for col in df_processed.columns:
if missing_counts[col] > 0:
try:
if pd.api.types.is_numeric_dtype(df_processed[col]):
fill_value = df_processed[col].median()
if pd.isna(fill_value):
fill_value = 0
else:
mode_values = df_processed[col].mode()
fill_value = mode_values[0] if len(mode_values) > 0 else 'unknown'
df_processed[col].fillna(fill_value, inplace=True)
except Exception as e:
logger.warning(f"Failed to fill missing values in column '{col}': {e}")
df_processed[col].fillna('unknown', inplace=True)
report["missing_values_handled"] = SafeDataProcessor.safe_json_convert(missing_counts[missing_counts > 0].to_dict())
report["steps_completed"].append("missing_values")
except Exception as e:
logger.error(f"Missing value handling failed: {e}")
report["errors"] = report.get("errors", []) + [f"Missing value handling: {str(e)}"]
# Identify column types
try:
numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df_processed.select_dtypes(exclude=[np.number]).columns.tolist()
logger.info(f"Identified {len(numerical_cols)} numerical and {len(categorical_cols)} categorical columns")
report["numerical_columns"] = numerical_cols
report["categorical_columns"] = categorical_cols
report["steps_completed"].append("column_identification")
except Exception as e:
logger.error(f"Column type identification failed: {e}")
numerical_cols = []
categorical_cols = list(df_processed.columns)
report["errors"] = report.get("errors", []) + [f"Column identification: {str(e)}"]
# Handle categorical encoding
if categorical_cols and self.config.categorical_encoding != "none":
try:
logger.info(f"Applying {self.config.categorical_encoding} encoding to categorical columns")
if self.config.categorical_encoding == "onehot":
# Limit categories to prevent explosion
for col in categorical_cols[:]: # Copy list to modify during iteration
try:
unique_count = df_processed[col].nunique()
if unique_count > self.config.max_categories_onehot:
logger.warning(f"Column '{col}' has {unique_count} categories, limiting to top {self.config.max_categories_onehot - 1}")
top_categories = df_processed[col].value_counts().head(self.config.max_categories_onehot - 1).index
df_processed[col] = df_processed[col].apply(
lambda x: x if x in top_categories else 'other'
)
except Exception as e:
logger.warning(f"Error processing column '{col}': {e}")
categorical_cols.remove(col)
if categorical_cols: # Only if we have categorical columns left
df_processed = pd.get_dummies(
df_processed,
columns=categorical_cols,
prefix=categorical_cols,
drop_first=True,
dummy_na=True
)
elif self.config.categorical_encoding == "label":
for col in categorical_cols:
try:
# Simple label encoding
unique_vals = df_processed[col].unique()
label_map = {val: idx for idx, val in enumerate(unique_vals)}
df_processed[col] = df_processed[col].map(label_map)
except Exception as e:
logger.warning(f"Label encoding failed for column '{col}': {e}")
# Fallback to categorical codes
df_processed[col] = pd.Categorical(df_processed[col]).codes
report["categorical_encoding_applied"] = self.config.categorical_encoding
report["steps_completed"].append("categorical_encoding")
except Exception as e:
logger.error(f"Categorical encoding failed: {e}")
# Fallback to simple codes
for col in categorical_cols:
try:
df_processed[col] = pd.Categorical(df_processed[col]).codes
except Exception:
df_processed[col] = 0
report["categorical_encoding_fallback"] = "categorical_codes"
report["errors"] = report.get("errors", []) + [f"Categorical encoding: {str(e)}"]
# Update numerical columns after encoding
try:
numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist()
logger.info(f"After encoding: {len(numerical_cols)} numerical columns")
except Exception:
numerical_cols = []
# Handle numerical scaling
if numerical_cols and self.config.numerical_scaling != "none":
try:
logger.info(f"Applying {self.config.numerical_scaling} scaling to numerical columns")
# Simple scaling implementations to avoid sklearn dependency
if self.config.numerical_scaling == "standard":
for col in numerical_cols:
try:
mean_val = df_processed[col].mean()
std_val = df_processed[col].std()
if std_val > 0:
df_processed[col] = (df_processed[col] - mean_val) / std_val
except Exception as e:
logger.warning(f"Standard scaling failed for column '{col}': {e}")
elif self.config.numerical_scaling == "minmax":
for col in numerical_cols:
try:
min_val = df_processed[col].min()
max_val = df_processed[col].max()
if max_val > min_val:
df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val)
except Exception as e:
logger.warning(f"MinMax scaling failed for column '{col}': {e}")
elif self.config.numerical_scaling == "robust":
for col in numerical_cols:
try:
median_val = df_processed[col].median()
q75 = df_processed[col].quantile(0.75)
q25 = df_processed[col].quantile(0.25)
iqr = q75 - q25
if iqr > 0:
df_processed[col] = (df_processed[col] - median_val) / iqr
except Exception as e:
logger.warning(f"Robust scaling failed for column '{col}': {e}")
report["numerical_scaling_applied"] = self.config.numerical_scaling
report["steps_completed"].append("numerical_scaling")
except Exception as e:
logger.error(f"Numerical scaling failed: {e}")
report["errors"] = report.get("errors", []) + [f"Numerical scaling: {str(e)}"]
# Final cleanup
try:
# Replace infinite values
df_processed = df_processed.replace([np.inf, -np.inf], np.nan)
# Fill remaining NaN values
df_processed = df_processed.fillna(0)
# Ensure all data is numeric
for col in df_processed.columns:
if not pd.api.types.is_numeric_dtype(df_processed[col]):
try:
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce')
df_processed[col] = df_processed[col].fillna(0)
except Exception:
df_processed[col] = 0
report["steps_completed"].append("final_cleanup")
except Exception as e:
logger.error(f"Final cleanup failed: {e}")
report["errors"] = report.get("errors", []) + [f"Final cleanup: {str(e)}"]
report.update({
"final_shape": df_processed.shape,
"processing_completed": datetime.now().isoformat(),
"success": True
})
logger.info(f"Preprocessing completed successfully for {dataset_type} dataset: {df_processed.shape}")
return df_processed, report
except Exception as e:
error_msg = f"Preprocessing failed for {dataset_type} dataset: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
report.update({
"error": error_msg,
"processing_completed": datetime.now().isoformat(),
"success": False
})
return df, report
def safe_compute_distances(self, X: np.ndarray, S: np.ndarray) -> Optional[np.ndarray]:
"""Safe distance computation with memory management"""
logger.info(f"Computing {self.config.distance_metric} distances for {len(X)}x{len(S)} matrix")
try:
# Memory check
estimated_memory = (len(X) * len(S) * 8) / (1024 ** 3) # GB
logger.info(f"Estimated memory requirement: {estimated_memory:.2f} GB")
if estimated_memory > 4: # > 4GB
logger.info("Using chunked computation for large distance matrix")
return self._chunked_distance_computation(X, S)
else:
return self._direct_distance_computation(X, S)
except Exception as e:
logger.error(f"Distance computation failed: {e}")
logger.error(traceback.format_exc())
return None
def _direct_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""Direct distance computation for smaller datasets"""
try:
if self.config.distance_metric == "euclidean":
return distance_matrix(X, S)
elif self.config.distance_metric == "manhattan":
return distance_matrix(X, S, p=1)
elif self.config.distance_metric == "cosine":
# Manual cosine distance to avoid sklearn dependency
X_norm = X / (np.linalg.norm(X, axis=1, keepdims=True) + 1e-10)
S_norm = S / (np.linalg.norm(S, axis=1, keepdims=True) + 1e-10)
cosine_sim = np.dot(X_norm, S_norm.T)
return 1 - cosine_sim
else:
return distance_matrix(X, S)
except Exception as e:
logger.error(f"Direct distance computation failed: {e}")
raise
def _chunked_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""Chunked distance computation for large datasets"""
try:
chunk_size = min(self.config.chunk_size, len(X))
distances = []
for i in range(0, len(X), chunk_size):
end_idx = min(i + chunk_size, len(X))
chunk_X = X[i:end_idx]
logger.debug(f"Processing chunk {i//chunk_size + 1}/{(len(X)-1)//chunk_size + 1}")
chunk_dist = self._direct_distance_computation(chunk_X, S)
distances.append(chunk_dist)
return np.vstack(distances)
except Exception as e:
logger.error(f"Chunked distance computation failed: {e}")
raise
def safe_compute_epsilon(self, m: int, n: int, d: int, v: float, p: float = 0.05) -> float:
"""Safe epsilon computation with enhanced error handling"""
try:
# Input validation
if any(x <= 0 for x in [m, n, d]) or v < 0 or not 0 < p < 1:
logger.warning(f"Invalid epsilon parameters: m={m}, n={n}, d={d}, v={v}, p={p}")
return 0.0
# Handle edge cases
if v == 0:
logger.warning("Distance sum is zero, returning zero epsilon")
return 0.0
# Compute with numerical stability
try:
log_gamma_term = gammaln(d/2) - gammaln(d)
log_md_factorial = gammaln(m * d + 1)
log_top_terms = (np.log(p) + log_md_factorial) / m
log_bottom_terms = (
np.log(2) + (d / 2) * np.log(np.pi) +
np.log(n) + d * np.log(v)
)
eps_lower = log_gamma_term + log_top_terms - log_bottom_terms
except Exception as e:
logger.warning(f"Epsilon computation numerical error: {e}")
return 0.0
# Ensure result is valid
if not np.isfinite(eps_lower):
logger.warning("Non-finite epsilon computed")
return 0.0
result = float(max(0, eps_lower))
logger.debug(f"Computed epsilon: {result} for confidence {1-p}")
return result
except Exception as e:
logger.error(f"Epsilon computation failed: {e}")
return 0.0
def run_comprehensive_audit(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]:
"""Main audit function with comprehensive error handling and timeout"""
self.current_audit_id = str(uuid.uuid4())[:12]
start_time = datetime.now()
logger.info(f"Starting comprehensive audit - ID: {self.current_audit_id}")
def audit_worker():
try:
# Input validation
logger.info("Step 1/7: Input validation")
validation_result = self.validate_inputs(real_data, synthetic_data)
if not validation_result["valid"]:
return {
"audit_id": self.current_audit_id,
"error": "Input validation failed",
"validation_errors": validation_result["errors"],
"validation_warnings": validation_result["warnings"],
"timestamp": start_time.isoformat(),
"step_failed": "input_validation"
}
# Preprocessing
logger.info("Step 2/7: Data preprocessing")
X_processed, real_report = self.safe_preprocess_data(real_data, is_real=True)
S_processed, synth_report = self.safe_preprocess_data(synthetic_data, is_real=False)
if not real_report["success"] or not synth_report["success"]:
return {
"audit_id": self.current_audit_id,
"error": "Data preprocessing failed",
"preprocessing_reports": {"real": real_report, "synthetic": synth_report},
"timestamp": start_time.isoformat(),
"step_failed": "preprocessing"
}
# Align columns
logger.info("Step 3/7: Column alignment")
try:
common_cols = list(set(X_processed.columns) & set(S_processed.columns))
if len(common_cols) == 0:
return {
"audit_id": self.current_audit_id,
"error": "No common columns between datasets after preprocessing",
"timestamp": start_time.isoformat(),
"step_failed": "column_alignment"
}
X_processed = X_processed[common_cols].sort_index(axis=1)
S_processed = S_processed[common_cols].sort_index(axis=1)
logger.info(f"Using {len(common_cols)} common columns")
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Column alignment failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "column_alignment"
}
# Convert to numpy arrays
logger.info("Step 4/7: Array conversion")
try:
X = X_processed.astype(np.float64).values
S = S_processed.astype(np.float64).values
# Validate arrays
if not np.isfinite(X).all():
logger.warning("Non-finite values in real data, cleaning...")
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
if not np.isfinite(S).all():
logger.warning("Non-finite values in synthetic data, cleaning...")
S = np.nan_to_num(S, nan=0.0, posinf=0.0, neginf=0.0)
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Array conversion failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "array_conversion"
}
# Subsampling
logger.info("Step 5/7: Subsampling (if needed)")
original_m = len(X)
if self.config.subsample_size and len(X) > self.config.subsample_size:
try:
np.random.seed(42)
indices = np.random.choice(len(X), self.config.subsample_size, replace=False)
X = X[indices]
logger.info(f"Subsampled real data from {original_m} to {len(X)} samples")
except Exception as e:
logger.warning(f"Subsampling failed: {e}, using full dataset")
m, d = X.shape
n = len(S)
logger.info(f"Final dataset sizes - Real: {m}x{d}, Synthetic: {n}x{d}")
# Distance computation
logger.info("Step 6/7: Distance computation")
dist_matrix = self.safe_compute_distances(X, S)
if dist_matrix is None:
return {
"audit_id": self.current_audit_id,
"error": "Distance computation failed",
"timestamp": start_time.isoformat(),
"step_failed": "distance_computation"
}
# Compute statistics
logger.info("Step 7/7: Statistical analysis")
try:
nearest_distances = np.min(dist_matrix, axis=1)
v = np.sum(nearest_distances)
# Epsilon bounds for multiple confidence levels
confidence_levels = [0.90, 0.95, 0.99]
epsilon_bounds = {}
for conf in confidence_levels:
p = 1 - conf
eps_lb = self.safe_compute_epsilon(m, n, d, v, p)
epsilon_bounds[f"eps_lb_{int(conf*100)}"] = eps_lb
# Distance statistics
distance_stats = {
"mean_nearest_distance": float(np.mean(nearest_distances)),
"median_nearest_distance": float(np.median(nearest_distances)),
"std_nearest_distance": float(np.std(nearest_distances)),
"min_nearest_distance": float(np.min(nearest_distances)),
"max_nearest_distance": float(np.max(nearest_distances)),
"q25_nearest_distance": float(np.percentile(nearest_distances, 25)),
"q75_nearest_distance": float(np.percentile(nearest_distances, 75)),
"distance_sum": float(v),
"zero_distance_count": int(np.sum(nearest_distances == 0)),
"small_distance_count": int(np.sum(nearest_distances < 1e-6))
}
# Risk assessment
primary_epsilon = epsilon_bounds["eps_lb_95"]
risk_level = self.assess_privacy_risk(primary_epsilon)
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Statistical analysis failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "statistical_analysis"
}
# Compile results
duration = (datetime.now() - start_time).total_seconds()
result = {
"audit_id": self.current_audit_id,
"session_id": self.session_id,
"success": True,
"audit_metadata": {
"timestamp": start_time.isoformat(),
"duration_seconds": round(duration, 2),
"distance_metric": self.config.distance_metric,
"configuration": asdict(self.config)
},
"dataset_info": {
"real_samples_original": original_m,
"real_samples_used": m,
"synthetic_samples": n,
"dimensions": d,
"common_features": len(common_cols),
"subsampling_applied": self.config.subsample_size is not None and original_m > m
},
"preprocessing_reports": {
"real_dataset": real_report,
"synthetic_dataset": synth_report
},
"validation_result": validation_result,
"epsilon_bounds": epsilon_bounds,
"distance_statistics": distance_stats,
"privacy_assessment": {
"risk_level": risk_level,
"primary_epsilon": primary_epsilon,
"interpretation": self.get_risk_interpretation(risk_level, primary_epsilon),
"recommendations": self.get_recommendations(risk_level, distance_stats)
},
"data_quality": {
"potential_memorization": distance_stats["zero_distance_count"] > 0,
"very_close_matches": distance_stats["small_distance_count"],
"distance_distribution_skew": self._safe_compute_skewness(nearest_distances)
}
}
self.audit_history.append(result)
logger.info(f"Audit completed successfully - ID: {self.current_audit_id}, Risk: {risk_level}, Duration: {duration:.2f}s")
return result
except Exception as e:
error_msg = f"Unexpected audit error: {str(e)}"
logger.error(f"Audit failed - ID: {self.current_audit_id}: {error_msg}")
logger.error(traceback.format_exc())
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"traceback": traceback.format_exc(),
"step_failed": "unexpected_error"
}
# Run with timeout
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(audit_worker)
result = future.result(timeout=self.config.timeout_seconds)
return result
except FutureTimeoutError:
error_msg = f"Audit timed out after {self.config.timeout_seconds} seconds"
logger.error(error_msg)
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"step_failed": "timeout"
}
except Exception as e:
error_msg = f"Audit execution failed: {str(e)}"
logger.error(error_msg)
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"step_failed": "execution_error"
}
def _safe_compute_skewness(self, data: np.ndarray) -> float:
"""Safely compute skewness"""
try:
if len(data) < 3:
return 0.0
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0.0
skewness = np.mean(((data - mean) / std) ** 3)
return float(skewness) if np.isfinite(skewness) else 0.0
except Exception as e:
logger.warning(f"Skewness computation failed: {e}")
return 0.0
def assess_privacy_risk(self, epsilon: float) -> str:
"""Enhanced privacy risk assessment"""
risk_thresholds = [
(0.01, "EXCEPTIONAL"),
(0.1, "VERY LOW"),
(0.5, "LOW"),
(1.0, "MEDIUM"),
(2.0, "HIGH"),
(5.0, "VERY HIGH")
]
try:
for threshold, level in risk_thresholds:
if epsilon <= threshold:
return level
return "CRITICAL"
except Exception:
return "UNKNOWN"
def get_risk_interpretation(self, risk_level: str, epsilon: float) -> str:
"""Detailed risk interpretation"""
interpretations = {
"EXCEPTIONAL": "Outstanding privacy preservation. Suitable for highly sensitive applications.",
"VERY LOW": "Excellent privacy preservation. Strong guarantees for most sensitive data.",
"LOW": "Good privacy preservation. Acceptable for most commercial applications.",
"MEDIUM": "Moderate privacy risk. Consider additional privacy-enhancing techniques.",
"HIGH": "High privacy risk. Significant leakage detected. Review methodology.",
"VERY HIGH": "Very high privacy risk. Additional privacy measures strongly recommended.",
"CRITICAL": "Critical privacy risk. Synthetic data not suitable for production use."
}
try:
base_msg = interpretations.get(risk_level, "Unknown risk level")
return f"{base_msg} (ε = {epsilon:.6f})"
except Exception as e:
logger.warning(f"Risk interpretation failed: {e}")
return f"Risk interpretation unavailable (ε = {epsilon:.6f})"
def get_recommendations(self, risk_level: str, distance_stats: Dict[str, Any]) -> List[str]:
"""Generate actionable recommendations"""
try:
recommendations = []
risk_actions = {
"HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production",
"VERY HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production",
"CRITICAL": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production"
}
if risk_level in risk_actions:
recommendations.extend([
risk_actions[risk_level],
"Consider stronger privacy-preserving methods (DP-SGD, PATE)",
"Reduce model capacity or increase privacy budget",
"Review data preprocessing and feature selection"
])
self._add_distance_recommendations(recommendations, distance_stats)
if risk_level in ["EXCEPTIONAL", "VERY LOW", "LOW"]:
recommendations.append("Privacy level acceptable for most production applications")
return recommendations or ["Review detailed analysis for specific insights"]
except Exception as e:
logger.warning(f"Recommendations generation failed: {e}")
return ["Could not generate recommendations due to analysis error"]
def _add_distance_recommendations(self, recommendations: List[str], distance_stats: Dict[str, Any]):
"""Add distance-based recommendations"""
zero_distances = distance_stats.get("zero_distance_count", 0)
small_distances = distance_stats.get("small_distance_count", 0)
if zero_distances > 0:
recommendations.append(f"WARNING: {zero_distances} exact matches - potential memorization")
if small_distances > zero_distances:
close_matches = small_distances - zero_distances
recommendations.append(f"REVIEW: {close_matches} close matches - check near-memorization")
# Global auditor instance
try:
auditor = EnhancedPrivacyAuditor()
logger.info("Privacy auditor initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize privacy auditor: {e}")
auditor = None
def create_safe_distance_plot(result: Dict[str, Any]) -> go.Figure:
"""Create enhanced privacy audit dashboard with improved data visualization"""
try:
if "error" in result:
return _create_error_figure(result)
return _create_comprehensive_dashboard(result)
except Exception as e:
logger.error(f"Distance plot creation failed: {e}")
return _create_error_figure({"error": str(e)})
def _create_error_figure(result: Dict[str, Any]) -> go.Figure:
"""Create error visualization with clear messaging"""
fig = go.Figure()
fig.add_annotation(
text=f"Audit Error
{result.get('error', 'Unknown error')}
Step: {result.get('step_failed', 'Unknown')}",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="#dc3545"),
align="center",
bgcolor="rgba(220, 53, 69, 0.1)",
bordercolor="#dc3545",
borderwidth=2
)
fig.update_layout(
title="Privacy Audit Failed",
xaxis=dict(visible=False),
yaxis=dict(visible=False),
plot_bgcolor="white",
paper_bgcolor="white"
)
return fig
def _create_comprehensive_dashboard(result: Dict[str, Any]) -> go.Figure:
"""Create simplified privacy dashboard focused on key metrics"""
from plotly.subplots import make_subplots
# Simplified 2x2 layout focusing on essential information
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
"Distance Statistics",
"Privacy Risk Level",
"Data Quality Assessment",
"Key Metrics Summary"
),
specs=[
[{"type": "bar"}, {"type": "indicator"}],
[{"type": "bar"}, {"type": "table"}]
],
vertical_spacing=0.2,
horizontal_spacing=0.15
)
_add_simplified_distance_analysis(fig, result)
_add_simplified_risk_assessment(fig, result)
_add_simplified_quality_assessment(fig, result)
_add_key_metrics_table(fig, result)
# Clean, minimal layout
fig.update_layout(
title={
"text": "Privacy Audit Results",
"x": 0.5,
"xanchor": "center",
"font": {"size": 18, "color": "#000000"}
},
height=600,
showlegend=False,
plot_bgcolor="white",
paper_bgcolor="white",
font=dict(family="Arial, sans-serif", size=12, color="#000000"),
margin=dict(t=80, b=50, l=60, r=60)
)
return fig
def _add_simplified_distance_analysis(fig, result: Dict[str, Any]):
"""Add simplified distance analysis focusing on key metrics"""
stats = result.get("distance_statistics", {})
# Focus on most important metrics
metrics = ["Mean", "Median", "Max"]
values = [
stats.get("mean_nearest_distance", 0),
stats.get("median_nearest_distance", 0),
stats.get("max_nearest_distance", 0)
]
# Use simple, accessible colors
fig.add_trace(
go.Bar(
x=metrics,
y=values,
marker_color='#2563eb',
marker_line=dict(color='#1e40af', width=1),
text=[f"{v:.4f}" if v > 0 else "0.0000" for v in values],
textposition='outside',
textfont=dict(size=11, color="#000000"),
hovertemplate="%{x}
%{y:.6f}
Epsilon Value",
"font": {"size": 14, "color": "#000000"}
},
number={"font": {"size": 20, "color": risk_colors.get(risk_level, "#6b7280")}},
gauge={
"axis": {"range": [0, 5], "tickcolor": "#000000"},
"bar": {"color": risk_colors.get(risk_level, "#6b7280")},
"bgcolor": "white",
"bordercolor": "#d1d5db",
"borderwidth": 2
}
),
row=1, col=2
)
def _add_simplified_quality_assessment(fig, result: Dict[str, Any]):
"""Add simplified quality assessment"""
stats = result.get("distance_statistics", {})
total_samples = result.get("dataset_info", {}).get("real_samples_used", 1)
zero_distances = stats.get("zero_distance_count", 0)
small_distances = stats.get("small_distance_count", 0)
categories = ["Safe", "Near Match", "Exact Match"]
counts = [total_samples - small_distances, small_distances - zero_distances, zero_distances]
fig.add_trace(
go.Bar(
x=categories,
y=counts,
marker_color=['#059669', '#ea580c', '#dc2626'],
marker_line=dict(color='#000000', width=1),
text=[f"{c:,}" for c in counts],
textposition='outside',
textfont=dict(size=11, color="#000000"),
hovertemplate="%{x}
Count: %{y:,}
ε Lower Bound: %{y:.6f}
({int(c/100*total_steps)}/{total_steps})" for c in completion],
textposition='auto',
hovertemplate="%{x}
Processing: %{y:.0f}% Complete
Epsilon: %{y:.6f}
Implementing state-of-the-art one-run nearest-neighbor ε-DP auditing with enterprise features
Secure • Comprehensive • Fast • Configurable