import gradio as gr import numpy as np import pandas as pd from scipy.spatial import distance_matrix from scipy.special import gammaln import plotly.graph_objects as go from datetime import datetime import json import io import zipfile from typing import Dict, List, Tuple, Optional, Any import logging import traceback from dataclasses import dataclass, asdict from pathlib import Path import warnings import uuid import os import sys from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError warnings.filterwarnings('ignore') def setup_logging(): """Configure comprehensive logging system""" log_dir = Path("logs") log_dir.mkdir(exist_ok=True) formatters = { 'detailed': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'), 'simple': logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') } root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) handlers = [ (logging.StreamHandler(sys.stdout), logging.INFO, formatters['simple']), (logging.FileHandler(log_dir / 'privacy_audit_detailed.log'), logging.DEBUG, formatters['detailed']), (logging.FileHandler(log_dir / 'privacy_audit_errors.log'), logging.ERROR, formatters['detailed']) ] for handler, level, formatter in handlers: handler.setLevel(level) handler.setFormatter(formatter) root_logger.addHandler(handler) return logging.getLogger(__name__) logger = setup_logging() logger.info(f"Privacy Auditor Starting - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") @dataclass class AuditConfig: """Enterprise configuration for privacy audit""" confidence_level: float = 0.95 subsample_size: Optional[int] = None categorical_encoding: str = "onehot" numerical_scaling: str = "standard" distance_metric: str = "euclidean" enable_preprocessing_report: bool = True max_file_size_mb: int = 500 timeout_seconds: int = 300 enable_data_validation: bool = True chunk_size: int = 10000 max_categories_onehot: int = 50 def validate(self) -> List[str]: """Validate configuration parameters""" validations = [ (0.5 <= self.confidence_level <= 0.999, "Confidence level must be between 0.5 and 0.999"), (self.subsample_size is None or self.subsample_size >= 100, "Subsample size must be at least 100 if specified"), (self.max_file_size_mb >= 1, "Max file size must be at least 1 MB"), (self.timeout_seconds >= 10, "Timeout must be at least 10 seconds") ] try: return [msg for valid, msg in validations if not valid] except Exception as e: logger.error(f"Configuration validation error: {e}") return [f"Configuration validation failed: {str(e)}"] class SafeDataProcessor: """Safe data processing with comprehensive error handling""" SUPPORTED_ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252'] @classmethod def safe_read_csv(cls, file_path: str, max_rows: int = None) -> Tuple[Optional[pd.DataFrame], str]: """Safely read CSV file with error handling""" try: if not os.path.exists(file_path): return None, f"File not found: {file_path}" file_size_mb = os.path.getsize(file_path) / (1024 * 1024) logger.info(f"Reading CSV: {file_path} ({file_size_mb:.2f} MB)") for encoding in cls.SUPPORTED_ENCODINGS: try: df = pd.read_csv(file_path, encoding=encoding, nrows=max_rows, low_memory=False) logger.info(f"Loaded dataset: {df.shape[0]} rows, {df.shape[1]} columns ({encoding})") return df, "" except (UnicodeDecodeError, Exception) as e: if encoding == cls.SUPPORTED_ENCODINGS[-1]: logger.warning(f"All encodings failed, last error: {e}") continue return None, "Failed to read file with any supported encoding" except Exception as e: error_msg = f"Error reading CSV file: {str(e)}" logger.error(error_msg) return None, error_msg @staticmethod def safe_json_convert(obj: Any) -> Any: """Convert object to JSON-serializable format""" conversion_map = { np.integer: int, np.floating: float, np.ndarray: lambda x: x.tolist(), pd.Series: lambda x: x.to_dict(), pd.DataFrame: lambda x: x.to_dict() } try: for obj_type, converter in conversion_map.items(): if isinstance(obj, obj_type): return converter(obj) if hasattr(obj, 'dtype'): dtype_str = str(obj.dtype) if 'int' in dtype_str: return int(obj) elif 'float' in dtype_str: return float(obj) return str(obj) except Exception as e: logger.warning(f"JSON conversion failed for {type(obj)}: {e}") return str(obj) class DataValidator: """Enhanced data validation with detailed reporting""" @classmethod def validate_dataframe(cls, df: pd.DataFrame, name: str) -> Dict[str, Any]: """Comprehensive dataframe validation""" logger.info(f"Validating dataframe: {name}") if df.empty: return cls._empty_dataframe_result(name) issues, warnings_list = [], [] if len(df) < 10: warnings_list.append(f"{name}: Very small dataset ({len(df)} rows)") try: memory_mb = cls._calculate_memory_usage(df) cls._validate_columns(df, name, warnings_list) cls._check_data_quality(df, name, warnings_list) column_types = cls._analyze_column_types(df) return cls._build_validation_result(df, issues, warnings_list, memory_mb, column_types) except Exception as e: return cls._handle_validation_error(e, name, df, warnings_list) @staticmethod def _empty_dataframe_result(name: str) -> Dict[str, Any]: return { "valid": False, "errors": [f"{name}: Dataset is empty"], "warnings": [], "shape": (0, 0), "memory_usage_mb": 0 } @staticmethod def _calculate_memory_usage(df: pd.DataFrame) -> float: try: return df.memory_usage(deep=True).sum() / (1024 * 1024) except Exception: return 0.0 @staticmethod def _validate_columns(df: pd.DataFrame, name: str, warnings_list: List[str]): for col in df.columns: try: if df[col].dtype == 'object': sample_data = df[col].dropna().head(100) if len(sample_data) > 0: sample_types = set(type(x).__name__ for x in sample_data) if len(sample_types) > 2: warnings_list.append(f"{name}: Column '{col}' has mixed data types") if df[col].nunique() <= 1: warnings_list.append(f"{name}: Column '{col}' is constant") except Exception as e: warnings_list.append(f"{name}: Error analyzing column '{col}': {str(e)}") @staticmethod def _check_data_quality(df: pd.DataFrame, name: str, warnings_list: List[str]): try: missing_pct = (df.isnull().sum() / len(df)) * 100 high_missing = missing_pct[missing_pct > 50] if not high_missing.empty: warnings_list.append(f"{name}: High missing values: {high_missing.to_dict()}") except Exception as e: warnings_list.append(f"{name}: Error checking missing values: {str(e)}") try: duplicates = df.duplicated().sum() if duplicates > len(df) * 0.1: warnings_list.append(f"{name}: High duplicate rows ({duplicates})") except Exception as e: warnings_list.append(f"{name}: Error checking duplicates: {str(e)}") @staticmethod def _analyze_column_types(df: pd.DataFrame) -> Dict[str, int]: try: return {str(k): int(v) for k, v in df.dtypes.value_counts().to_dict().items()} except Exception: return {} @staticmethod def _build_validation_result(df: pd.DataFrame, issues: List[str], warnings_list: List[str], memory_mb: float, column_types: Dict[str, int]) -> Dict[str, Any]: return { "valid": len(issues) == 0, "errors": issues, "warnings": warnings_list, "shape": df.shape, "memory_usage_mb": memory_mb, "column_types": column_types, "null_counts": SafeDataProcessor.safe_json_convert(df.isnull().sum().to_dict()) } @staticmethod def _handle_validation_error(e: Exception, name: str, df: pd.DataFrame, warnings_list: List[str]) -> Dict[str, Any]: error_msg = f"Validation failed for {name}: {str(e)}" logger.error(error_msg) return { "valid": False, "errors": [error_msg], "warnings": warnings_list, "shape": df.shape if hasattr(df, 'shape') else (0, 0), "memory_usage_mb": 0, "column_types": {} } class EnhancedPrivacyAuditor: """Enhanced privacy auditor with comprehensive error handling and logging""" def __init__(self, config: AuditConfig = None): self.config = config or AuditConfig() self.audit_history = [] self.session_id = str(uuid.uuid4())[:8] self.current_audit_id = None logger.info(f"Initialized Privacy Auditor - Session: {self.session_id}") logger.info(f"Configuration: {asdict(self.config)}") def validate_inputs(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]: """Comprehensive input validation with enhanced error handling""" logger.info("Starting comprehensive input validation") try: validator = DataValidator() # Validate individual datasets real_validation = validator.validate_dataframe(real_data, "Real Dataset") synth_validation = validator.validate_dataframe(synthetic_data, "Synthetic Dataset") all_errors = real_validation["errors"] + synth_validation["errors"] all_warnings = real_validation["warnings"] + synth_validation["warnings"] # Cross-dataset validation if real_validation["valid"] and synth_validation["valid"]: try: real_cols = set(real_data.columns) synth_cols = set(synthetic_data.columns) missing_in_synth = real_cols - synth_cols missing_in_real = synth_cols - real_cols if missing_in_synth: all_warnings.append(f"Columns missing in synthetic data: {list(missing_in_synth)}") if missing_in_real: all_warnings.append(f"Extra columns in synthetic data: {list(missing_in_real)}") # Check data type compatibility common_cols = real_cols & synth_cols for col in common_cols: try: real_type = real_data[col].dtype synth_type = synthetic_data[col].dtype if real_type != synth_type: all_warnings.append(f"Type mismatch in column '{col}': {real_type} vs {synth_type}") except Exception as e: all_warnings.append(f"Error checking column '{col}': {str(e)}") except Exception as e: all_warnings.append(f"Cross-validation error: {str(e)}") result = { "valid": len(all_errors) == 0, "errors": all_errors, "warnings": all_warnings, "real_dataset": real_validation, "synthetic_dataset": synth_validation } logger.info(f"Validation completed - Valid: {result['valid']}, Errors: {len(all_errors)}, Warnings: {len(all_warnings)}") return result except Exception as e: error_msg = f"Input validation failed: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) return { "valid": False, "errors": [error_msg], "warnings": [], "real_dataset": {"valid": False, "errors": [error_msg]}, "synthetic_dataset": {"valid": False, "errors": [error_msg]} } def safe_preprocess_data(self, df: pd.DataFrame, is_real: bool = True) -> Tuple[pd.DataFrame, Dict[str, Any]]: """Enhanced data preprocessing with comprehensive error handling""" dataset_type = "real" if is_real else "synthetic" logger.info(f"Starting preprocessing for {dataset_type} dataset") report = { "dataset_type": dataset_type, "original_shape": df.shape, "start_time": datetime.now().isoformat(), "success": False, "steps_completed": [] } try: # Create working copy df_processed = df.copy() report["steps_completed"].append("data_copy") # Handle missing values try: missing_counts = df_processed.isnull().sum() if missing_counts.any(): logger.info(f"Handling missing values in {len(missing_counts[missing_counts > 0])} columns") for col in df_processed.columns: if missing_counts[col] > 0: try: if pd.api.types.is_numeric_dtype(df_processed[col]): fill_value = df_processed[col].median() if pd.isna(fill_value): fill_value = 0 else: mode_values = df_processed[col].mode() fill_value = mode_values[0] if len(mode_values) > 0 else 'unknown' df_processed[col].fillna(fill_value, inplace=True) except Exception as e: logger.warning(f"Failed to fill missing values in column '{col}': {e}") df_processed[col].fillna('unknown', inplace=True) report["missing_values_handled"] = SafeDataProcessor.safe_json_convert(missing_counts[missing_counts > 0].to_dict()) report["steps_completed"].append("missing_values") except Exception as e: logger.error(f"Missing value handling failed: {e}") report["errors"] = report.get("errors", []) + [f"Missing value handling: {str(e)}"] # Identify column types try: numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist() categorical_cols = df_processed.select_dtypes(exclude=[np.number]).columns.tolist() logger.info(f"Identified {len(numerical_cols)} numerical and {len(categorical_cols)} categorical columns") report["numerical_columns"] = numerical_cols report["categorical_columns"] = categorical_cols report["steps_completed"].append("column_identification") except Exception as e: logger.error(f"Column type identification failed: {e}") numerical_cols = [] categorical_cols = list(df_processed.columns) report["errors"] = report.get("errors", []) + [f"Column identification: {str(e)}"] # Handle categorical encoding if categorical_cols and self.config.categorical_encoding != "none": try: logger.info(f"Applying {self.config.categorical_encoding} encoding to categorical columns") if self.config.categorical_encoding == "onehot": # Limit categories to prevent explosion for col in categorical_cols[:]: # Copy list to modify during iteration try: unique_count = df_processed[col].nunique() if unique_count > self.config.max_categories_onehot: logger.warning(f"Column '{col}' has {unique_count} categories, limiting to top {self.config.max_categories_onehot - 1}") top_categories = df_processed[col].value_counts().head(self.config.max_categories_onehot - 1).index df_processed[col] = df_processed[col].apply( lambda x: x if x in top_categories else 'other' ) except Exception as e: logger.warning(f"Error processing column '{col}': {e}") categorical_cols.remove(col) if categorical_cols: # Only if we have categorical columns left df_processed = pd.get_dummies( df_processed, columns=categorical_cols, prefix=categorical_cols, drop_first=True, dummy_na=True ) elif self.config.categorical_encoding == "label": for col in categorical_cols: try: # Simple label encoding unique_vals = df_processed[col].unique() label_map = {val: idx for idx, val in enumerate(unique_vals)} df_processed[col] = df_processed[col].map(label_map) except Exception as e: logger.warning(f"Label encoding failed for column '{col}': {e}") # Fallback to categorical codes df_processed[col] = pd.Categorical(df_processed[col]).codes report["categorical_encoding_applied"] = self.config.categorical_encoding report["steps_completed"].append("categorical_encoding") except Exception as e: logger.error(f"Categorical encoding failed: {e}") # Fallback to simple codes for col in categorical_cols: try: df_processed[col] = pd.Categorical(df_processed[col]).codes except Exception: df_processed[col] = 0 report["categorical_encoding_fallback"] = "categorical_codes" report["errors"] = report.get("errors", []) + [f"Categorical encoding: {str(e)}"] # Update numerical columns after encoding try: numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist() logger.info(f"After encoding: {len(numerical_cols)} numerical columns") except Exception: numerical_cols = [] # Handle numerical scaling if numerical_cols and self.config.numerical_scaling != "none": try: logger.info(f"Applying {self.config.numerical_scaling} scaling to numerical columns") # Simple scaling implementations to avoid sklearn dependency if self.config.numerical_scaling == "standard": for col in numerical_cols: try: mean_val = df_processed[col].mean() std_val = df_processed[col].std() if std_val > 0: df_processed[col] = (df_processed[col] - mean_val) / std_val except Exception as e: logger.warning(f"Standard scaling failed for column '{col}': {e}") elif self.config.numerical_scaling == "minmax": for col in numerical_cols: try: min_val = df_processed[col].min() max_val = df_processed[col].max() if max_val > min_val: df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val) except Exception as e: logger.warning(f"MinMax scaling failed for column '{col}': {e}") elif self.config.numerical_scaling == "robust": for col in numerical_cols: try: median_val = df_processed[col].median() q75 = df_processed[col].quantile(0.75) q25 = df_processed[col].quantile(0.25) iqr = q75 - q25 if iqr > 0: df_processed[col] = (df_processed[col] - median_val) / iqr except Exception as e: logger.warning(f"Robust scaling failed for column '{col}': {e}") report["numerical_scaling_applied"] = self.config.numerical_scaling report["steps_completed"].append("numerical_scaling") except Exception as e: logger.error(f"Numerical scaling failed: {e}") report["errors"] = report.get("errors", []) + [f"Numerical scaling: {str(e)}"] # Final cleanup try: # Replace infinite values df_processed = df_processed.replace([np.inf, -np.inf], np.nan) # Fill remaining NaN values df_processed = df_processed.fillna(0) # Ensure all data is numeric for col in df_processed.columns: if not pd.api.types.is_numeric_dtype(df_processed[col]): try: df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce') df_processed[col] = df_processed[col].fillna(0) except Exception: df_processed[col] = 0 report["steps_completed"].append("final_cleanup") except Exception as e: logger.error(f"Final cleanup failed: {e}") report["errors"] = report.get("errors", []) + [f"Final cleanup: {str(e)}"] report.update({ "final_shape": df_processed.shape, "processing_completed": datetime.now().isoformat(), "success": True }) logger.info(f"Preprocessing completed successfully for {dataset_type} dataset: {df_processed.shape}") return df_processed, report except Exception as e: error_msg = f"Preprocessing failed for {dataset_type} dataset: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) report.update({ "error": error_msg, "processing_completed": datetime.now().isoformat(), "success": False }) return df, report def safe_compute_distances(self, X: np.ndarray, S: np.ndarray) -> Optional[np.ndarray]: """Safe distance computation with memory management""" logger.info(f"Computing {self.config.distance_metric} distances for {len(X)}x{len(S)} matrix") try: # Memory check estimated_memory = (len(X) * len(S) * 8) / (1024 ** 3) # GB logger.info(f"Estimated memory requirement: {estimated_memory:.2f} GB") if estimated_memory > 4: # > 4GB logger.info("Using chunked computation for large distance matrix") return self._chunked_distance_computation(X, S) else: return self._direct_distance_computation(X, S) except Exception as e: logger.error(f"Distance computation failed: {e}") logger.error(traceback.format_exc()) return None def _direct_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: """Direct distance computation for smaller datasets""" try: if self.config.distance_metric == "euclidean": return distance_matrix(X, S) elif self.config.distance_metric == "manhattan": return distance_matrix(X, S, p=1) elif self.config.distance_metric == "cosine": # Manual cosine distance to avoid sklearn dependency X_norm = X / (np.linalg.norm(X, axis=1, keepdims=True) + 1e-10) S_norm = S / (np.linalg.norm(S, axis=1, keepdims=True) + 1e-10) cosine_sim = np.dot(X_norm, S_norm.T) return 1 - cosine_sim else: return distance_matrix(X, S) except Exception as e: logger.error(f"Direct distance computation failed: {e}") raise def _chunked_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: """Chunked distance computation for large datasets""" try: chunk_size = min(self.config.chunk_size, len(X)) distances = [] for i in range(0, len(X), chunk_size): end_idx = min(i + chunk_size, len(X)) chunk_X = X[i:end_idx] logger.debug(f"Processing chunk {i//chunk_size + 1}/{(len(X)-1)//chunk_size + 1}") chunk_dist = self._direct_distance_computation(chunk_X, S) distances.append(chunk_dist) return np.vstack(distances) except Exception as e: logger.error(f"Chunked distance computation failed: {e}") raise def safe_compute_epsilon(self, m: int, n: int, d: int, v: float, p: float = 0.05) -> float: """Safe epsilon computation with enhanced error handling""" try: # Input validation if any(x <= 0 for x in [m, n, d]) or v < 0 or not 0 < p < 1: logger.warning(f"Invalid epsilon parameters: m={m}, n={n}, d={d}, v={v}, p={p}") return 0.0 # Handle edge cases if v == 0: logger.warning("Distance sum is zero, returning zero epsilon") return 0.0 # Compute with numerical stability try: log_gamma_term = gammaln(d/2) - gammaln(d) log_md_factorial = gammaln(m * d + 1) log_top_terms = (np.log(p) + log_md_factorial) / m log_bottom_terms = ( np.log(2) + (d / 2) * np.log(np.pi) + np.log(n) + d * np.log(v) ) eps_lower = log_gamma_term + log_top_terms - log_bottom_terms except Exception as e: logger.warning(f"Epsilon computation numerical error: {e}") return 0.0 # Ensure result is valid if not np.isfinite(eps_lower): logger.warning("Non-finite epsilon computed") return 0.0 result = float(max(0, eps_lower)) logger.debug(f"Computed epsilon: {result} for confidence {1-p}") return result except Exception as e: logger.error(f"Epsilon computation failed: {e}") return 0.0 def run_comprehensive_audit(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]: """Main audit function with comprehensive error handling and timeout""" self.current_audit_id = str(uuid.uuid4())[:12] start_time = datetime.now() logger.info(f"Starting comprehensive audit - ID: {self.current_audit_id}") def audit_worker(): try: # Input validation logger.info("Step 1/7: Input validation") validation_result = self.validate_inputs(real_data, synthetic_data) if not validation_result["valid"]: return { "audit_id": self.current_audit_id, "error": "Input validation failed", "validation_errors": validation_result["errors"], "validation_warnings": validation_result["warnings"], "timestamp": start_time.isoformat(), "step_failed": "input_validation" } # Preprocessing logger.info("Step 2/7: Data preprocessing") X_processed, real_report = self.safe_preprocess_data(real_data, is_real=True) S_processed, synth_report = self.safe_preprocess_data(synthetic_data, is_real=False) if not real_report["success"] or not synth_report["success"]: return { "audit_id": self.current_audit_id, "error": "Data preprocessing failed", "preprocessing_reports": {"real": real_report, "synthetic": synth_report}, "timestamp": start_time.isoformat(), "step_failed": "preprocessing" } # Align columns logger.info("Step 3/7: Column alignment") try: common_cols = list(set(X_processed.columns) & set(S_processed.columns)) if len(common_cols) == 0: return { "audit_id": self.current_audit_id, "error": "No common columns between datasets after preprocessing", "timestamp": start_time.isoformat(), "step_failed": "column_alignment" } X_processed = X_processed[common_cols].sort_index(axis=1) S_processed = S_processed[common_cols].sort_index(axis=1) logger.info(f"Using {len(common_cols)} common columns") except Exception as e: return { "audit_id": self.current_audit_id, "error": f"Column alignment failed: {str(e)}", "timestamp": start_time.isoformat(), "step_failed": "column_alignment" } # Convert to numpy arrays logger.info("Step 4/7: Array conversion") try: X = X_processed.astype(np.float64).values S = S_processed.astype(np.float64).values # Validate arrays if not np.isfinite(X).all(): logger.warning("Non-finite values in real data, cleaning...") X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) if not np.isfinite(S).all(): logger.warning("Non-finite values in synthetic data, cleaning...") S = np.nan_to_num(S, nan=0.0, posinf=0.0, neginf=0.0) except Exception as e: return { "audit_id": self.current_audit_id, "error": f"Array conversion failed: {str(e)}", "timestamp": start_time.isoformat(), "step_failed": "array_conversion" } # Subsampling logger.info("Step 5/7: Subsampling (if needed)") original_m = len(X) if self.config.subsample_size and len(X) > self.config.subsample_size: try: np.random.seed(42) indices = np.random.choice(len(X), self.config.subsample_size, replace=False) X = X[indices] logger.info(f"Subsampled real data from {original_m} to {len(X)} samples") except Exception as e: logger.warning(f"Subsampling failed: {e}, using full dataset") m, d = X.shape n = len(S) logger.info(f"Final dataset sizes - Real: {m}x{d}, Synthetic: {n}x{d}") # Distance computation logger.info("Step 6/7: Distance computation") dist_matrix = self.safe_compute_distances(X, S) if dist_matrix is None: return { "audit_id": self.current_audit_id, "error": "Distance computation failed", "timestamp": start_time.isoformat(), "step_failed": "distance_computation" } # Compute statistics logger.info("Step 7/7: Statistical analysis") try: nearest_distances = np.min(dist_matrix, axis=1) v = np.sum(nearest_distances) # Epsilon bounds for multiple confidence levels confidence_levels = [0.90, 0.95, 0.99] epsilon_bounds = {} for conf in confidence_levels: p = 1 - conf eps_lb = self.safe_compute_epsilon(m, n, d, v, p) epsilon_bounds[f"eps_lb_{int(conf*100)}"] = eps_lb # Distance statistics distance_stats = { "mean_nearest_distance": float(np.mean(nearest_distances)), "median_nearest_distance": float(np.median(nearest_distances)), "std_nearest_distance": float(np.std(nearest_distances)), "min_nearest_distance": float(np.min(nearest_distances)), "max_nearest_distance": float(np.max(nearest_distances)), "q25_nearest_distance": float(np.percentile(nearest_distances, 25)), "q75_nearest_distance": float(np.percentile(nearest_distances, 75)), "distance_sum": float(v), "zero_distance_count": int(np.sum(nearest_distances == 0)), "small_distance_count": int(np.sum(nearest_distances < 1e-6)) } # Risk assessment primary_epsilon = epsilon_bounds["eps_lb_95"] risk_level = self.assess_privacy_risk(primary_epsilon) except Exception as e: return { "audit_id": self.current_audit_id, "error": f"Statistical analysis failed: {str(e)}", "timestamp": start_time.isoformat(), "step_failed": "statistical_analysis" } # Compile results duration = (datetime.now() - start_time).total_seconds() result = { "audit_id": self.current_audit_id, "session_id": self.session_id, "success": True, "audit_metadata": { "timestamp": start_time.isoformat(), "duration_seconds": round(duration, 2), "distance_metric": self.config.distance_metric, "configuration": asdict(self.config) }, "dataset_info": { "real_samples_original": original_m, "real_samples_used": m, "synthetic_samples": n, "dimensions": d, "common_features": len(common_cols), "subsampling_applied": self.config.subsample_size is not None and original_m > m }, "preprocessing_reports": { "real_dataset": real_report, "synthetic_dataset": synth_report }, "validation_result": validation_result, "epsilon_bounds": epsilon_bounds, "distance_statistics": distance_stats, "privacy_assessment": { "risk_level": risk_level, "primary_epsilon": primary_epsilon, "interpretation": self.get_risk_interpretation(risk_level, primary_epsilon), "recommendations": self.get_recommendations(risk_level, distance_stats) }, "data_quality": { "potential_memorization": distance_stats["zero_distance_count"] > 0, "very_close_matches": distance_stats["small_distance_count"], "distance_distribution_skew": self._safe_compute_skewness(nearest_distances) } } self.audit_history.append(result) logger.info(f"Audit completed successfully - ID: {self.current_audit_id}, Risk: {risk_level}, Duration: {duration:.2f}s") return result except Exception as e: error_msg = f"Unexpected audit error: {str(e)}" logger.error(f"Audit failed - ID: {self.current_audit_id}: {error_msg}") logger.error(traceback.format_exc()) return { "audit_id": self.current_audit_id, "error": error_msg, "timestamp": start_time.isoformat(), "traceback": traceback.format_exc(), "step_failed": "unexpected_error" } # Run with timeout try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(audit_worker) result = future.result(timeout=self.config.timeout_seconds) return result except FutureTimeoutError: error_msg = f"Audit timed out after {self.config.timeout_seconds} seconds" logger.error(error_msg) return { "audit_id": self.current_audit_id, "error": error_msg, "timestamp": start_time.isoformat(), "step_failed": "timeout" } except Exception as e: error_msg = f"Audit execution failed: {str(e)}" logger.error(error_msg) return { "audit_id": self.current_audit_id, "error": error_msg, "timestamp": start_time.isoformat(), "step_failed": "execution_error" } def _safe_compute_skewness(self, data: np.ndarray) -> float: """Safely compute skewness""" try: if len(data) < 3: return 0.0 mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 skewness = np.mean(((data - mean) / std) ** 3) return float(skewness) if np.isfinite(skewness) else 0.0 except Exception as e: logger.warning(f"Skewness computation failed: {e}") return 0.0 def assess_privacy_risk(self, epsilon: float) -> str: """Enhanced privacy risk assessment""" risk_thresholds = [ (0.01, "EXCEPTIONAL"), (0.1, "VERY LOW"), (0.5, "LOW"), (1.0, "MEDIUM"), (2.0, "HIGH"), (5.0, "VERY HIGH") ] try: for threshold, level in risk_thresholds: if epsilon <= threshold: return level return "CRITICAL" except Exception: return "UNKNOWN" def get_risk_interpretation(self, risk_level: str, epsilon: float) -> str: """Detailed risk interpretation""" interpretations = { "EXCEPTIONAL": "Outstanding privacy preservation. Suitable for highly sensitive applications.", "VERY LOW": "Excellent privacy preservation. Strong guarantees for most sensitive data.", "LOW": "Good privacy preservation. Acceptable for most commercial applications.", "MEDIUM": "Moderate privacy risk. Consider additional privacy-enhancing techniques.", "HIGH": "High privacy risk. Significant leakage detected. Review methodology.", "VERY HIGH": "Very high privacy risk. Additional privacy measures strongly recommended.", "CRITICAL": "Critical privacy risk. Synthetic data not suitable for production use." } try: base_msg = interpretations.get(risk_level, "Unknown risk level") return f"{base_msg} (ε = {epsilon:.6f})" except Exception as e: logger.warning(f"Risk interpretation failed: {e}") return f"Risk interpretation unavailable (ε = {epsilon:.6f})" def get_recommendations(self, risk_level: str, distance_stats: Dict[str, Any]) -> List[str]: """Generate actionable recommendations""" try: recommendations = [] risk_actions = { "HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production", "VERY HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production", "CRITICAL": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production" } if risk_level in risk_actions: recommendations.extend([ risk_actions[risk_level], "Consider stronger privacy-preserving methods (DP-SGD, PATE)", "Reduce model capacity or increase privacy budget", "Review data preprocessing and feature selection" ]) self._add_distance_recommendations(recommendations, distance_stats) if risk_level in ["EXCEPTIONAL", "VERY LOW", "LOW"]: recommendations.append("Privacy level acceptable for most production applications") return recommendations or ["Review detailed analysis for specific insights"] except Exception as e: logger.warning(f"Recommendations generation failed: {e}") return ["Could not generate recommendations due to analysis error"] def _add_distance_recommendations(self, recommendations: List[str], distance_stats: Dict[str, Any]): """Add distance-based recommendations""" zero_distances = distance_stats.get("zero_distance_count", 0) small_distances = distance_stats.get("small_distance_count", 0) if zero_distances > 0: recommendations.append(f"WARNING: {zero_distances} exact matches - potential memorization") if small_distances > zero_distances: close_matches = small_distances - zero_distances recommendations.append(f"REVIEW: {close_matches} close matches - check near-memorization") # Global auditor instance try: auditor = EnhancedPrivacyAuditor() logger.info("Privacy auditor initialized successfully") except Exception as e: logger.error(f"Failed to initialize privacy auditor: {e}") auditor = None def create_safe_distance_plot(result: Dict[str, Any]) -> go.Figure: """Create enhanced privacy audit dashboard with improved data visualization""" try: if "error" in result: return _create_error_figure(result) return _create_comprehensive_dashboard(result) except Exception as e: logger.error(f"Distance plot creation failed: {e}") return _create_error_figure({"error": str(e)}) def _create_error_figure(result: Dict[str, Any]) -> go.Figure: """Create error visualization with clear messaging""" fig = go.Figure() fig.add_annotation( text=f"Audit Error
{result.get('error', 'Unknown error')}
Step: {result.get('step_failed', 'Unknown')}", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="#dc3545"), align="center", bgcolor="rgba(220, 53, 69, 0.1)", bordercolor="#dc3545", borderwidth=2 ) fig.update_layout( title="Privacy Audit Failed", xaxis=dict(visible=False), yaxis=dict(visible=False), plot_bgcolor="white", paper_bgcolor="white" ) return fig def _create_comprehensive_dashboard(result: Dict[str, Any]) -> go.Figure: """Create simplified privacy dashboard focused on key metrics""" from plotly.subplots import make_subplots # Simplified 2x2 layout focusing on essential information fig = make_subplots( rows=2, cols=2, subplot_titles=( "Distance Statistics", "Privacy Risk Level", "Data Quality Assessment", "Key Metrics Summary" ), specs=[ [{"type": "bar"}, {"type": "indicator"}], [{"type": "bar"}, {"type": "table"}] ], vertical_spacing=0.2, horizontal_spacing=0.15 ) _add_simplified_distance_analysis(fig, result) _add_simplified_risk_assessment(fig, result) _add_simplified_quality_assessment(fig, result) _add_key_metrics_table(fig, result) # Clean, minimal layout fig.update_layout( title={ "text": "Privacy Audit Results", "x": 0.5, "xanchor": "center", "font": {"size": 18, "color": "#000000"} }, height=600, showlegend=False, plot_bgcolor="white", paper_bgcolor="white", font=dict(family="Arial, sans-serif", size=12, color="#000000"), margin=dict(t=80, b=50, l=60, r=60) ) return fig def _add_simplified_distance_analysis(fig, result: Dict[str, Any]): """Add simplified distance analysis focusing on key metrics""" stats = result.get("distance_statistics", {}) # Focus on most important metrics metrics = ["Mean", "Median", "Max"] values = [ stats.get("mean_nearest_distance", 0), stats.get("median_nearest_distance", 0), stats.get("max_nearest_distance", 0) ] # Use simple, accessible colors fig.add_trace( go.Bar( x=metrics, y=values, marker_color='#2563eb', marker_line=dict(color='#1e40af', width=1), text=[f"{v:.4f}" if v > 0 else "0.0000" for v in values], textposition='outside', textfont=dict(size=11, color="#000000"), hovertemplate="%{x}
%{y:.6f}", showlegend=False ), row=1, col=1 ) fig.update_xaxes(title_text="Distance Metric", row=1, col=1, title_font_size=12) fig.update_yaxes(title_text="Value", row=1, col=1, title_font_size=12) def _add_simplified_risk_assessment(fig, result: Dict[str, Any]): """Add simplified risk assessment indicator""" risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) # Simple risk color mapping risk_colors = { "EXCEPTIONAL": "#059669", "VERY LOW": "#059669", "LOW": "#0891b2", "MEDIUM": "#ea580c", "HIGH": "#dc2626", "VERY HIGH": "#dc2626", "CRITICAL": "#991b1b", "UNKNOWN": "#6b7280" } fig.add_trace( go.Indicator( mode="number+gauge", value=epsilon, title={ "text": f"Privacy Risk: {risk_level}
Epsilon Value", "font": {"size": 14, "color": "#000000"} }, number={"font": {"size": 20, "color": risk_colors.get(risk_level, "#6b7280")}}, gauge={ "axis": {"range": [0, 5], "tickcolor": "#000000"}, "bar": {"color": risk_colors.get(risk_level, "#6b7280")}, "bgcolor": "white", "bordercolor": "#d1d5db", "borderwidth": 2 } ), row=1, col=2 ) def _add_simplified_quality_assessment(fig, result: Dict[str, Any]): """Add simplified quality assessment""" stats = result.get("distance_statistics", {}) total_samples = result.get("dataset_info", {}).get("real_samples_used", 1) zero_distances = stats.get("zero_distance_count", 0) small_distances = stats.get("small_distance_count", 0) categories = ["Safe", "Near Match", "Exact Match"] counts = [total_samples - small_distances, small_distances - zero_distances, zero_distances] fig.add_trace( go.Bar( x=categories, y=counts, marker_color=['#059669', '#ea580c', '#dc2626'], marker_line=dict(color='#000000', width=1), text=[f"{c:,}" for c in counts], textposition='outside', textfont=dict(size=11, color="#000000"), hovertemplate="%{x}
Count: %{y:,}", showlegend=False ), row=2, col=1 ) fig.update_xaxes(title_text="Sample Type", row=2, col=1, title_font_size=12) fig.update_yaxes(title_text="Count", row=2, col=1, title_font_size=12) def _add_key_metrics_table(fig, result: Dict[str, Any]): """Add key metrics summary table""" dataset_info = result.get("dataset_info", {}) stats = result.get("distance_statistics", {}) risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) metrics = [ "Real Samples", "Synthetic Samples", "Dimensions", "Risk Level", "Epsilon Value", "Exact Matches" ] values = [ f"{dataset_info.get('real_samples_used', 0):,}", f"{dataset_info.get('synthetic_samples', 0):,}", f"{dataset_info.get('dimensions', 0)}", risk_level, f"{epsilon:.6f}", f"{stats.get('zero_distance_count', 0):,}" ] fig.add_trace( go.Table( header=dict( values=["Metric", "Value"], fill_color="#f3f4f6", font=dict(size=12, color="#000000"), align="left", line_color="#d1d5db" ), cells=dict( values=[metrics, values], fill_color="white", font=dict(size=11, color="#000000"), align="left", line_color="#d1d5db", height=30 ) ), row=2, col=2 ) def _add_privacy_bounds(fig, result: Dict[str, Any]): """Add privacy bounds comparison across confidence levels""" epsilon_bounds = result.get("epsilon_bounds", {}) confidence_levels = ["90%", "95%", "99%"] epsilon_values = [ epsilon_bounds.get("eps_lb_90", 0), epsilon_bounds.get("eps_lb_95", 0), epsilon_bounds.get("eps_lb_99", 0) ] # Use gradient colors to show increasing confidence colors = ['#52c41a', '#1890ff', '#722ed1'] fig.add_trace( go.Bar( x=confidence_levels, y=epsilon_values, marker_color=colors, text=[f"ε = {v:.6f}" for v in epsilon_values], textposition='outside', hovertemplate="%{x} Confidence
ε Lower Bound: %{y:.6f}", name="Privacy Bounds" ), row=2, col=2 ) fig.update_xaxes(title_text="Confidence Level", row=2, col=2) fig.update_yaxes(title_text="ε Lower Bound", row=2, col=2, type="log" if max(epsilon_values) > 0 else "linear") def _add_processing_status(fig, result: Dict[str, Any]): """Add processing pipeline status visualization""" real_report = result.get("preprocessing_reports", {}).get("real_dataset", {}) synth_report = result.get("preprocessing_reports", {}).get("synthetic_dataset", {}) # Count completed processing steps real_steps = len(real_report.get("steps_completed", [])) synth_steps = len(synth_report.get("steps_completed", [])) total_steps = 6 # Expected number of processing steps datasets = ["Real Dataset", "Synthetic Dataset"] completion = [real_steps / total_steps * 100, synth_steps / total_steps * 100] colors = ['#28a745' if c == 100 else '#ffc107' for c in completion] fig.add_trace( go.Bar( x=datasets, y=completion, marker_color=colors, text=[f"{c:.0f}%
({int(c/100*total_steps)}/{total_steps})" for c in completion], textposition='auto', hovertemplate="%{x}
Processing: %{y:.0f}% Complete", name="Processing Status" ), row=2, col=3 ) fig.update_xaxes(title_text="Dataset Type", row=2, col=3) fig.update_yaxes(title_text="Processing Completion %", row=2, col=3, range=[0, 100]) def create_safe_epsilon_plot(result: Dict[str, Any]) -> go.Figure: """Create simplified epsilon analysis plot""" try: if "error" in result: return _create_error_figure(result) epsilon_bounds = result.get("epsilon_bounds", {}) confidence_levels = [90, 95, 99] epsilon_values = [epsilon_bounds.get(f"eps_lb_{conf}", 0) for conf in confidence_levels] fig = go.Figure() # Simple bar chart fig.add_trace(go.Bar( x=[f"{conf}%" for conf in confidence_levels], y=epsilon_values, marker_color='#2563eb', marker_line=dict(color='#1e40af', width=1), text=[f"{eps:.6f}" for eps in epsilon_values], textposition='outside', textfont=dict(size=11, color="#000000"), hovertemplate="%{x} Confidence
Epsilon: %{y:.6f}", showlegend=False )) fig.update_layout( title="Privacy Budget Analysis", xaxis_title="Confidence Level", yaxis_title="Epsilon Lower Bound", plot_bgcolor="white", paper_bgcolor="white", font=dict(family="Arial, sans-serif", size=12, color="#000000"), height=400, margin=dict(t=80, b=50, l=60, r=60) ) return fig except Exception as e: logger.error(f"Epsilon plot creation failed: {e}") return _create_error_figure({"error": str(e)}) def generate_safe_report(result: Dict[str, Any]) -> str: """Generate safe executive report with error handling""" try: if "error" in result: return f""" # Privacy Audit Failed **Error:** {result.get('error', 'Unknown error')} **Audit ID:** {result.get('audit_id', 'N/A')} **Timestamp:** {result.get('timestamp', 'N/A')} **Failed Step:** {result.get('step_failed', 'Unknown')} ## Troubleshooting Please check the following: - Both datasets are in CSV format with headers - Files are not corrupted and can be opened - Datasets have overlapping column names - Data contains numeric values or categorical data that can be encoded - File sizes are within limits ## Next Steps 1. Review the error message above 2. Check your data format and content 3. Try with smaller datasets if memory/timeout issues occur 4. Contact support if the issue persists --- *Report generated by Enterprise Privacy Auditor* """ # Extract key information risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) # Build comprehensive report report = f""" # Privacy Audit Executive Summary ## Overall Assessment: {risk_level} RISK **Audit ID:** {result.get('audit_id', 'N/A')} **Session ID:** {result.get('session_id', 'N/A')} **Conducted:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')} **Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds --- ## Key Findings ### Privacy Metrics - **Primary ε-DP Bound (95% confidence):** {epsilon:.6f} - **Risk Assessment:** {result.get('privacy_assessment', {}).get('interpretation', 'N/A')} ### Dataset Overview - **Real Data Samples (Original):** {result.get('dataset_info', {}).get('real_samples_original', 'N/A'):,} - **Real Data Samples (Used):** {result.get('dataset_info', {}).get('real_samples_used', 'N/A'):,} - **Synthetic Data Samples:** {result.get('dataset_info', {}).get('synthetic_samples', 'N/A'):,} - **Feature Dimensions:** {result.get('dataset_info', {}).get('dimensions', 'N/A')} - **Common Features:** {result.get('dataset_info', {}).get('common_features', 'N/A')} ### Data Quality Indicators - **Exact Matches (Memorization):** {result.get('distance_statistics', {}).get('zero_distance_count', 'N/A')} - **Very Close Matches:** {result.get('distance_statistics', {}).get('small_distance_count', 'N/A')} - **Mean Nearest Distance:** {result.get('distance_statistics', {}).get('mean_nearest_distance', 0):.6f} --- ## Recommendations """ # Add recommendations recommendations = result.get('privacy_assessment', {}).get('recommendations', []) if recommendations: for i, rec in enumerate(recommendations, 1): report += f"{i}. {rec}\n" else: report += "No specific recommendations available.\n" report += f""" --- ## Detailed Analysis ### Multi-Confidence Privacy Bounds | Confidence Level | ε Lower Bound | Risk Level | |------------------|---------------|------------|""" # Add epsilon bounds table epsilon_bounds = result.get('epsilon_bounds', {}) for conf in [90, 95, 99]: eps_val = epsilon_bounds.get(f'eps_lb_{conf}', 0) risk = auditor.assess_privacy_risk(eps_val) if auditor else "UNKNOWN" report += f"\n| {conf}% | {eps_val:.6f} | {risk} |" # Add distance statistics dist_stats = result.get('distance_statistics', {}) report += f""" ### Distance Statistics Summary - **Mean:** {dist_stats.get('mean_nearest_distance', 0):.6f} - **Median:** {dist_stats.get('median_nearest_distance', 0):.6f} - **Standard Deviation:** {dist_stats.get('std_nearest_distance', 0):.6f} - **Range:** [{dist_stats.get('min_nearest_distance', 0):.6f}, {dist_stats.get('max_nearest_distance', 0):.6f}] - **25th Percentile:** {dist_stats.get('q25_nearest_distance', 0):.6f} - **75th Percentile:** {dist_stats.get('q75_nearest_distance', 0):.6f} ### Data Quality Assessment - **Potential Memorization:** {"Yes" if result.get('data_quality', {}).get('potential_memorization', False) else "No"} - **Distribution Skewness:** {result.get('data_quality', {}).get('distance_distribution_skew', 0):.4f} --- ## Configuration Used **Preprocessing:** - Categorical Encoding: {result.get('audit_metadata', {}).get('configuration', {}).get('categorical_encoding', 'N/A')} - Numerical Scaling: {result.get('audit_metadata', {}).get('configuration', {}).get('numerical_scaling', 'N/A')} - Distance Metric: {result.get('audit_metadata', {}).get('configuration', {}).get('distance_metric', 'N/A')} **Audit Parameters:** - Confidence Level: {result.get('audit_metadata', {}).get('configuration', {}).get('confidence_level', 'N/A')} - Subsample Size: {result.get('audit_metadata', {}).get('configuration', {}).get('subsample_size', 'None (full dataset)')} - Timeout: {result.get('audit_metadata', {}).get('configuration', {}).get('timeout_seconds', 'N/A')} seconds --- ## Methodology This audit implements the state-of-the-art one-run nearest-neighbor ε-DP auditor. The method provides rigorous lower bounds on the privacy parameter ε, indicating the minimum privacy budget required under differential privacy guarantees. **Key Benefits:** - Single-run analysis (no multiple generations needed) - Rigorous mathematical guarantees - Suitable for enterprise environments - Comprehensive preprocessing and validation --- ## Support Information For questions about this audit or to report issues: - Review the detailed technical logs - Check the preprocessing reports for data quality issues - Ensure your data meets the format requirements --- *Report generated by Enterprise Privacy Auditor v2.0* *Session: {result.get('session_id', 'N/A')} | Audit: {result.get('audit_id', 'N/A')}* """ return report except Exception as e: logger.error(f"Report generation failed: {e}") return f""" # Report Generation Failed An error occurred while generating the executive report: **Error:** {str(e)} ## Raw Audit Data ```json {json.dumps(result, indent=2, default=str)} ``` --- *Please contact support for assistance* """ def safe_export_results(result: Dict[str, Any]) -> Optional[str]: """Safe export with comprehensive error handling""" try: logger.info("Generating export package") # Create temporary file for export import tempfile export_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.zip', delete=False) with zipfile.ZipFile(export_file, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Core results (with safe JSON conversion) try: safe_result = {} for key, value in result.items(): safe_result[key] = SafeDataProcessor.safe_json_convert(value) zip_file.writestr( "audit_results.json", json.dumps(safe_result, indent=2, default=str) ) logger.debug("Added audit results to export") except Exception as e: logger.warning(f"Failed to add audit results: {e}") zip_file.writestr("audit_results_error.txt", f"Failed to export results: {str(e)}") # Executive report try: exec_report = generate_safe_report(result) zip_file.writestr("executive_summary.md", exec_report) logger.debug("Added executive report to export") except Exception as e: logger.warning(f"Failed to add executive report: {e}") zip_file.writestr("executive_summary_error.txt", f"Failed to generate report: {str(e)}") # Technical details try: tech_details = f""" # Technical Privacy Audit Report ## Audit Metadata - **Audit ID:** {result.get('audit_id', 'N/A')} - **Session ID:** {result.get('session_id', 'N/A')} - **Timestamp:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')} - **Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds - **Success:** {result.get('success', False)} ## Configuration Details {json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)} ## Dataset Information {json.dumps(result.get('dataset_info', {}), indent=2, default=str)} ## Validation Results {json.dumps(result.get('validation_result', {}), indent=2, default=str)} ## Distance Statistics {json.dumps(result.get('distance_statistics', {}), indent=2, default=str)} ## Privacy Assessment {json.dumps(result.get('privacy_assessment', {}), indent=2, default=str)} """ zip_file.writestr("technical_details.md", tech_details) logger.debug("Added technical details to export") except Exception as e: logger.warning(f"Failed to add technical details: {e}") # Key metrics CSV try: if "error" not in result: metrics_data = { 'Metric': [ 'Audit_ID', 'Risk_Level', 'Primary_Epsilon', 'Mean_Distance', 'Zero_Distances', 'Close_Matches', 'Duration_Seconds', 'Real_Samples', 'Synthetic_Samples', 'Dimensions' ], 'Value': [ result.get('audit_id', ''), result.get('privacy_assessment', {}).get('risk_level', ''), result.get('privacy_assessment', {}).get('primary_epsilon', 0), result.get('distance_statistics', {}).get('mean_nearest_distance', 0), result.get('distance_statistics', {}).get('zero_distance_count', 0), result.get('distance_statistics', {}).get('small_distance_count', 0), result.get('audit_metadata', {}).get('duration_seconds', 0), result.get('dataset_info', {}).get('real_samples_used', 0), result.get('dataset_info', {}).get('synthetic_samples', 0), result.get('dataset_info', {}).get('dimensions', 0) ] } metrics_df = pd.DataFrame(metrics_data) csv_buffer = io.StringIO() metrics_df.to_csv(csv_buffer, index=False) zip_file.writestr("key_metrics.csv", csv_buffer.getvalue()) logger.debug("Added metrics CSV to export") except Exception as e: logger.warning(f"Failed to add metrics CSV: {e}") # Audit log try: log_content = f""" Privacy Audit Log - {result.get('audit_id', 'N/A')} {'='*60} Audit Started: {result.get('audit_metadata', {}).get('timestamp', 'N/A')} Session ID: {result.get('session_id', 'N/A')} Configuration: {json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)} Dataset Information: - Real samples (original): {result.get('dataset_info', {}).get('real_samples_original', 'N/A')} - Real samples (used): {result.get('dataset_info', {}).get('real_samples_used', 'N/A')} - Synthetic samples: {result.get('dataset_info', {}).get('synthetic_samples', 'N/A')} - Dimensions: {result.get('dataset_info', {}).get('dimensions', 'N/A')} {"Success: Audit completed successfully" if "error" not in result else f"Failed: {result.get('error', 'Unknown error')}"} Duration: {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds Privacy Results: - Risk Level: {result.get('privacy_assessment', {}).get('risk_level', 'N/A')} - Primary ε: {result.get('privacy_assessment', {}).get('primary_epsilon', 'N/A')} Validation Warnings: {chr(10).join(result.get('validation_result', {}).get('warnings', ['None']))} Export completed: {datetime.now().isoformat()} """ zip_file.writestr("audit.log", log_content) logger.debug("Added audit log to export") except Exception as e: logger.warning(f"Failed to add audit log: {e}") export_file.close() logger.info("Export package generated successfully") return export_file.name except Exception as e: logger.error(f"Export generation failed: {e}") logger.error(traceback.format_exc()) # Create minimal error export try: import tempfile error_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) error_file.write(f"Export failed: {str(e)}\n\nTimestamp: {datetime.now().isoformat()}") if result: error_file.write(f"\n\nRaw result: {str(result)}") error_file.close() return error_file.name except Exception: return None def run_enhanced_audit(real_file, synthetic_file, confidence, subsample_size, categorical_encoding, numerical_scaling, distance_metric): """Enhanced main audit interface with comprehensive error handling and status updates""" # Input validation if not real_file or not synthetic_file: error_msg = "Please upload both real and synthetic datasets" logger.warning(error_msg) empty_result = {"error": error_msg, "step_failed": "file_upload"} return empty_result, None, None, f"ERROR: {error_msg}", None logger.info("Starting enhanced privacy audit") logger.info(f"Files: Real={real_file.name}, Synthetic={synthetic_file.name}") try: # Validate and update configuration try: new_config = AuditConfig( confidence_level=confidence, subsample_size=int(subsample_size) if subsample_size > 0 else None, categorical_encoding=categorical_encoding, numerical_scaling=numerical_scaling, distance_metric=distance_metric ) config_errors = new_config.validate() if config_errors: error_msg = f"Configuration errors: {'; '.join(config_errors)}" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "configuration"} return error_result, None, None, f"ERROR: {error_msg}", None if auditor: auditor.config = new_config logger.info("Configuration updated successfully") else: error_msg = "Auditor not initialized" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "initialization"} return error_result, None, None, f"ERROR: {error_msg}", None except Exception as e: error_msg = f"Configuration error: {str(e)}" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "configuration"} return error_result, None, None, f"ERROR: {error_msg}", None # Load datasets with enhanced error handling try: logger.info("Loading datasets...") real_df, real_error = SafeDataProcessor.safe_read_csv(real_file.name) if real_df is None: error_msg = f"Failed to load real dataset: {real_error}" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "data_loading"} return error_result, None, None, f"ERROR: {error_msg}", None synth_df, synth_error = SafeDataProcessor.safe_read_csv(synthetic_file.name) if synth_df is None: error_msg = f"Failed to load synthetic dataset: {synth_error}" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "data_loading"} return error_result, None, None, f"ERROR: {error_msg}", None logger.info(f"Datasets loaded successfully - Real: {real_df.shape}, Synthetic: {synth_df.shape}") except Exception as e: error_msg = f"Data loading error: {str(e)}" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "data_loading"} return error_result, None, None, f"ERROR: {error_msg}", None # Check file sizes try: real_size_mb = real_df.memory_usage(deep=True).sum() / 1024 / 1024 synth_size_mb = synth_df.memory_usage(deep=True).sum() / 1024 / 1024 logger.info(f"Memory usage - Real: {real_size_mb:.2f}MB, Synthetic: {synth_size_mb:.2f}MB") if real_size_mb > auditor.config.max_file_size_mb or synth_size_mb > auditor.config.max_file_size_mb: error_msg = f"File size exceeds limit ({auditor.config.max_file_size_mb}MB). Real: {real_size_mb:.1f}MB, Synthetic: {synth_size_mb:.1f}MB" logger.error(error_msg) error_result = {"error": error_msg, "step_failed": "size_check"} return error_result, None, None, f"ERROR: {error_msg}", None except Exception as e: logger.warning(f"Size check failed: {e}") # Continue anyway # Run comprehensive audit logger.info("Starting comprehensive privacy audit...") result = auditor.run_comprehensive_audit(real_df, synth_df) # Check for audit errors if "error" in result: error_msg = result["error"] step_failed = result.get("step_failed", "unknown") logger.error(f"Audit failed at step '{step_failed}': {error_msg}") return result, None, None, f"ERROR: Audit failed at {step_failed}: {error_msg}", None # Generate visualizations safely dist_plot = None eps_plot = None try: logger.info("Generating visualizations...") dist_plot = create_safe_distance_plot(result) eps_plot = create_safe_epsilon_plot(result) logger.info("Visualizations generated successfully") except Exception as e: logger.warning(f"Visualization generation failed: {e}") # Continue without visualizations # Generate report safely try: logger.info("Generating executive report...") report = generate_safe_report(result) logger.info("Report generated successfully") except Exception as e: logger.warning(f"Report generation failed: {e}") report = f"ERROR: Report generation failed: {str(e)}" # Generate export safely export_data = None try: logger.info("Generating export package...") export_data = safe_export_results(result) if export_data: logger.info("Export package generated successfully") else: logger.warning("Export generation returned no data") except Exception as e: logger.warning(f"Export generation failed: {e}") # Log success risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") logger.info(f"Audit completed - ID: {result.get('audit_id')}, Risk: {risk_level}") return result, dist_plot, eps_plot, report, export_data except Exception as e: error_msg = f"Unexpected error in audit interface: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) error_result = { "error": error_msg, "step_failed": "unexpected_error", "traceback": traceback.format_exc(), "timestamp": datetime.now().isoformat() } return error_result, None, None, f"ERROR: {error_msg}", None def create_enhanced_interface(): """Create the enhanced Gradio interface with improved UX""" # Custom CSS for better UI custom_css = """ .main-header { text-align: center; margin-bottom: 30px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 15px; margin: 20px; } .config-section { background-color: #f8f9fa; padding: 20px; border-radius: 10px; margin: 10px 0; border-left: 4px solid #007bff; } .results-section { background-color: #e8f5e8; padding: 20px; border-radius: 10px; margin: 10px 0; border-left: 4px solid #28a745; } .error-section { background-color: #ffe6e6; padding: 20px; border-radius: 10px; margin: 10px 0; border-left: 4px solid #dc3545; } .status-box { padding: 15px; border-radius: 8px; margin: 10px 0; font-weight: 500; } .upload-section { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; margin: 10px 0; background-color: #fafafa; } """ with gr.Blocks( title="Enterprise Privacy Auditor", theme=gr.themes.Soft(), css=custom_css ) as demo: gr.HTML("""

Privacy Auditor for Generative Models

Advanced Privacy Assessment Tool for Synthetic Data Generation

Implementing state-of-the-art one-run nearest-neighbor ε-DP auditing with enterprise features

Secure • Comprehensive • Fast • Configurable

""") # Main audit interface with gr.Tab("Privacy Audit", elem_id="audit-tab"): with gr.Row(): # Left column - Configuration and upload with gr.Column(scale=1): gr.HTML('
') gr.Markdown("### Dataset Upload") real_file = gr.File( label="Real/Original Dataset (CSV)", file_types=[".csv"], elem_id="real-file" ) gr.Markdown("*Upload the original dataset used for training or reference*") synth_file = gr.File( label="Synthetic Dataset (CSV)", file_types=[".csv"], elem_id="synth-file" ) gr.Markdown("*Upload the synthetic dataset to be audited for privacy*") gr.HTML('
') gr.HTML('
') gr.Markdown("### Advanced Configuration") with gr.Group(): gr.Markdown("#### Privacy Parameters") confidence = gr.Slider( 0.80, 0.999, value=0.95, step=0.001, label="Primary Confidence Level", info="Higher values provide more conservative privacy bounds" ) subsample_size = gr.Number( value=0, minimum=0, maximum=100000, step=1000, label="Subsample Size (0 = use all data)", info="Limit real data samples for faster computation on large datasets" ) with gr.Group(): gr.Markdown("#### Data Processing") categorical_encoding = gr.Dropdown( choices=[ ("One-Hot Encoding (recommended)", "onehot"), ("Label Encoding (memory efficient)", "label") ], value="onehot", label="Categorical Variable Encoding", info="How to handle non-numeric categorical variables" ) numerical_scaling = gr.Dropdown( choices=[ ("Standard Scaling (recommended)", "standard"), ("Min-Max Scaling", "minmax"), ("Robust Scaling (outlier resistant)", "robust"), ("No Scaling", "none") ], value="standard", label="Numerical Feature Scaling", info="Normalization method for numerical features" ) distance_metric = gr.Dropdown( choices=[ ("Euclidean (recommended)", "euclidean"), ("Manhattan (robust to outliers)", "manhattan"), ("Cosine (for high-dimensional data)", "cosine") ], value="euclidean", label="Distance Metric", info="Method for computing distances between data points" ) gr.HTML('
') # Prominent run button run_btn = gr.Button( "Run Comprehensive Privacy Audit", variant="primary", size="lg", elem_id="run-audit-btn" ) gr.Markdown(""" ### Quick Start Guide 1. Upload both datasets in CSV format with headers 2. Keep file sizes under 500MB for optimal performance 3. Review configuration settings (defaults work for most cases) 4. Run audit and review comprehensive results """) # Right column - Results and status with gr.Column(scale=2): gr.HTML('
') gr.Markdown("### Audit Results & Status") # Status display audit_status = gr.Markdown( "**Ready to run audit**\n\nPlease upload your datasets and configure the audit parameters.", elem_classes=["status-box"] ) # Detailed results with gr.Group(): audit_results = gr.JSON( label="Detailed Audit Results", elem_id="audit-results" ) gr.HTML('
') gr.Markdown("### Interactive Visualizations") # Visualization tabs with gr.Tabs(): with gr.Tab("Privacy Dashboard"): distance_plot = gr.Plot( label="Comprehensive Privacy Analysis", elem_id="distance-plot" ) with gr.Tab("Risk Analysis"): epsilon_plot = gr.Plot( label="Privacy Bounds & Risk Assessment", elem_id="epsilon-plot" ) # Executive report tab with gr.Tab("Executive Report", elem_id="report-tab"): gr.Markdown("### Executive Summary & Detailed Analysis") gr.Markdown("*Complete report will be generated after running the audit*") audit_report = gr.Markdown( """ **No audit completed yet** Run a privacy audit to generate a comprehensive executive report including: - Privacy risk assessment and recommendations - Statistical analysis and data quality metrics - Technical details and configuration summary - Actionable insights for improving privacy """, elem_id="audit-report" ) gr.Markdown("### Export & Download") export_btn = gr.File( label="Download Complete Audit Package", elem_id="export-file", visible=False ) gr.Markdown(""" Complete audit package includes: - Executive summary report (Markdown) - Technical analysis report (Markdown) - Key metrics spreadsheet (CSV) - Audit configuration details (JSON) - Comprehensive audit log (Text) - Raw results data (JSON) """) # Documentation tab with gr.Tab("Documentation", elem_id="docs-tab"): gr.Markdown(""" ## Enterprise Privacy Auditor ### Methodology This tool implements the **state-of-the-art one-run nearest-neighbor ε-DP auditor** providing rigorous lower bounds on privacy parameters without requiring multiple dataset generations. ### Enterprise Features - **Local Processing**: All data remains secure on your infrastructure - **Comprehensive Logging**: Detailed audit trails and error reporting - **Scalable Architecture**: Memory-efficient processing for large datasets - **Configurable Pipeline**: Flexible preprocessing and analysis options ### Privacy Risk Framework | Risk Level | ε Range | Interpretation | Action Required | |------------|---------|----------------|-----------------| | EXCEPTIONAL | ε ≤ 0.01 | Outstanding privacy | Suitable for highly sensitive data | | VERY LOW | 0.01 < ε ≤ 0.1 | Excellent privacy | Good for most enterprise use | | LOW | 0.1 < ε ≤ 0.5 | Acceptable privacy | Monitor for sensitive applications | | MEDIUM | 0.5 < ε ≤ 1.0 | Moderate risk | Consider additional measures | | HIGH | 1.0 < ε ≤ 2.0 | High risk | Review methodology | | VERY HIGH | 2.0 < ε ≤ 5.0 | Very high risk | Additional privacy required | | CRITICAL | ε > 5.0 | Critical risk | Immediate action required | ### Configuration Guide #### Distance Metrics - **Euclidean**: Best for continuous numerical data - **Manhattan**: Robust to outliers, good for mixed data - **Cosine**: Ideal for high-dimensional sparse data #### Preprocessing Options - **One-Hot Encoding**: Creates binary features (recommended for <50 categories) - **Label Encoding**: Assigns integer codes (memory efficient) - **Standard Scaling**: Zero mean, unit variance (recommended) - **Min-Max Scaling**: Scale to [0,1] range - **Robust Scaling**: Uses median and IQR (outlier resistant) ### Best Practices 1. **Data Preparation**: Ensure CSV format with headers, similar structure between datasets 2. **Memory Management**: Use subsampling for datasets >100K samples 3. **Configuration**: Start with defaults, adjust based on your data characteristics 4. **Interpretation**: Review both statistical results and actionable recommendations ### Support - Review error logs for troubleshooting - Check preprocessing reports for data quality issues - Ensure data meets format requirements """) # Event handlers with enhanced error feedback - FIXED OUTPUT COUNT def update_status_and_run(*args): """Update status during audit execution""" try: # Update status to running yield ( gr.update(value="Audit in progress. Processing your datasets and running privacy analysis."), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False) ) # Run the actual audit result = run_enhanced_audit(*args) # Update status based on result if result[0] and "error" not in result[0]: risk_level = result[0].get("privacy_assessment", {}).get("risk_level", "UNKNOWN") epsilon = result[0].get("privacy_assessment", {}).get("primary_epsilon", 0) status_msg = f"Audit completed successfully.\n\nRisk Level: {risk_level}\nEpsilon-DP Bound: {epsilon:.6f}" else: error_msg = result[0].get("error", "Unknown error") if result[0] else "Unknown error" status_msg = f"Audit failed: {error_msg}" # Make export visible if successful export_visible = result[4] is not None yield ( gr.update(value=status_msg), result[0], # audit_results result[1], # distance_plot result[2], # epsilon_plot result[3], # audit_report gr.update(value=result[4], visible=export_visible) if export_visible else gr.update(visible=False) ) except Exception as e: error_msg = f"Interface error: {str(e)}" logger.error(error_msg) yield ( gr.update(value=f"Interface Error: {error_msg}"), {"error": error_msg}, None, None, f"Error: {error_msg}", gr.update(visible=False) ) # Connect the interface - FIXED: Now returns 6 outputs run_btn.click( fn=update_status_and_run, inputs=[ real_file, synth_file, confidence, subsample_size, categorical_encoding, numerical_scaling, distance_metric ], outputs=[ audit_status, audit_results, distance_plot, epsilon_plot, audit_report, export_btn ] ) return demo # Launch the application if __name__ == "__main__": try: logger.info("Creating enhanced Gradio interface...") demo = create_enhanced_interface() logger.info("Launching Privacy Auditor application...") demo.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True ) except Exception as e: logger.error(f"Failed to launch application: {e}") logger.error(traceback.format_exc()) print(f"Application failed to start: {e}")