Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.spatial import distance_matrix | |
| from scipy.special import gammaln | |
| import plotly.graph_objects as go | |
| from datetime import datetime | |
| import json | |
| import io | |
| import zipfile | |
| from typing import Dict, List, Tuple, Optional, Any | |
| import logging | |
| import traceback | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import warnings | |
| import uuid | |
| import os | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
| warnings.filterwarnings('ignore') | |
| def setup_logging(): | |
| """Configure comprehensive logging system""" | |
| log_dir = Path("logs") | |
| log_dir.mkdir(exist_ok=True) | |
| formatters = { | |
| 'detailed': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'), | |
| 'simple': logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| } | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.INFO) | |
| handlers = [ | |
| (logging.StreamHandler(sys.stdout), logging.INFO, formatters['simple']), | |
| (logging.FileHandler(log_dir / 'privacy_audit_detailed.log'), logging.DEBUG, formatters['detailed']), | |
| (logging.FileHandler(log_dir / 'privacy_audit_errors.log'), logging.ERROR, formatters['detailed']) | |
| ] | |
| for handler, level, formatter in handlers: | |
| handler.setLevel(level) | |
| handler.setFormatter(formatter) | |
| root_logger.addHandler(handler) | |
| return logging.getLogger(__name__) | |
| logger = setup_logging() | |
| logger.info(f"Privacy Auditor Starting - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| class AuditConfig: | |
| """Enterprise configuration for privacy audit""" | |
| confidence_level: float = 0.95 | |
| subsample_size: Optional[int] = None | |
| categorical_encoding: str = "onehot" | |
| numerical_scaling: str = "standard" | |
| distance_metric: str = "euclidean" | |
| enable_preprocessing_report: bool = True | |
| max_file_size_mb: int = 500 | |
| timeout_seconds: int = 300 | |
| enable_data_validation: bool = True | |
| chunk_size: int = 10000 | |
| max_categories_onehot: int = 50 | |
| def validate(self) -> List[str]: | |
| """Validate configuration parameters""" | |
| validations = [ | |
| (0.5 <= self.confidence_level <= 0.999, "Confidence level must be between 0.5 and 0.999"), | |
| (self.subsample_size is None or self.subsample_size >= 100, "Subsample size must be at least 100 if specified"), | |
| (self.max_file_size_mb >= 1, "Max file size must be at least 1 MB"), | |
| (self.timeout_seconds >= 10, "Timeout must be at least 10 seconds") | |
| ] | |
| try: | |
| return [msg for valid, msg in validations if not valid] | |
| except Exception as e: | |
| logger.error(f"Configuration validation error: {e}") | |
| return [f"Configuration validation failed: {str(e)}"] | |
| class SafeDataProcessor: | |
| """Safe data processing with comprehensive error handling""" | |
| SUPPORTED_ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252'] | |
| def safe_read_csv(cls, file_path: str, max_rows: int = None) -> Tuple[Optional[pd.DataFrame], str]: | |
| """Safely read CSV file with error handling""" | |
| try: | |
| if not os.path.exists(file_path): | |
| return None, f"File not found: {file_path}" | |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
| logger.info(f"Reading CSV: {file_path} ({file_size_mb:.2f} MB)") | |
| for encoding in cls.SUPPORTED_ENCODINGS: | |
| try: | |
| df = pd.read_csv(file_path, encoding=encoding, nrows=max_rows, low_memory=False) | |
| logger.info(f"Loaded dataset: {df.shape[0]} rows, {df.shape[1]} columns ({encoding})") | |
| return df, "" | |
| except (UnicodeDecodeError, Exception) as e: | |
| if encoding == cls.SUPPORTED_ENCODINGS[-1]: | |
| logger.warning(f"All encodings failed, last error: {e}") | |
| continue | |
| return None, "Failed to read file with any supported encoding" | |
| except Exception as e: | |
| error_msg = f"Error reading CSV file: {str(e)}" | |
| logger.error(error_msg) | |
| return None, error_msg | |
| def safe_json_convert(obj: Any) -> Any: | |
| """Convert object to JSON-serializable format""" | |
| conversion_map = { | |
| np.integer: int, | |
| np.floating: float, | |
| np.ndarray: lambda x: x.tolist(), | |
| pd.Series: lambda x: x.to_dict(), | |
| pd.DataFrame: lambda x: x.to_dict() | |
| } | |
| try: | |
| for obj_type, converter in conversion_map.items(): | |
| if isinstance(obj, obj_type): | |
| return converter(obj) | |
| if hasattr(obj, 'dtype'): | |
| dtype_str = str(obj.dtype) | |
| if 'int' in dtype_str: | |
| return int(obj) | |
| elif 'float' in dtype_str: | |
| return float(obj) | |
| return str(obj) | |
| except Exception as e: | |
| logger.warning(f"JSON conversion failed for {type(obj)}: {e}") | |
| return str(obj) | |
| class DataValidator: | |
| """Enhanced data validation with detailed reporting""" | |
| def validate_dataframe(cls, df: pd.DataFrame, name: str) -> Dict[str, Any]: | |
| """Comprehensive dataframe validation""" | |
| logger.info(f"Validating dataframe: {name}") | |
| if df.empty: | |
| return cls._empty_dataframe_result(name) | |
| issues, warnings_list = [], [] | |
| if len(df) < 10: | |
| warnings_list.append(f"{name}: Very small dataset ({len(df)} rows)") | |
| try: | |
| memory_mb = cls._calculate_memory_usage(df) | |
| cls._validate_columns(df, name, warnings_list) | |
| cls._check_data_quality(df, name, warnings_list) | |
| column_types = cls._analyze_column_types(df) | |
| return cls._build_validation_result(df, issues, warnings_list, memory_mb, column_types) | |
| except Exception as e: | |
| return cls._handle_validation_error(e, name, df, warnings_list) | |
| def _empty_dataframe_result(name: str) -> Dict[str, Any]: | |
| return { | |
| "valid": False, | |
| "errors": [f"{name}: Dataset is empty"], | |
| "warnings": [], | |
| "shape": (0, 0), | |
| "memory_usage_mb": 0 | |
| } | |
| def _calculate_memory_usage(df: pd.DataFrame) -> float: | |
| try: | |
| return df.memory_usage(deep=True).sum() / (1024 * 1024) | |
| except Exception: | |
| return 0.0 | |
| def _validate_columns(df: pd.DataFrame, name: str, warnings_list: List[str]): | |
| for col in df.columns: | |
| try: | |
| if df[col].dtype == 'object': | |
| sample_data = df[col].dropna().head(100) | |
| if len(sample_data) > 0: | |
| sample_types = set(type(x).__name__ for x in sample_data) | |
| if len(sample_types) > 2: | |
| warnings_list.append(f"{name}: Column '{col}' has mixed data types") | |
| if df[col].nunique() <= 1: | |
| warnings_list.append(f"{name}: Column '{col}' is constant") | |
| except Exception as e: | |
| warnings_list.append(f"{name}: Error analyzing column '{col}': {str(e)}") | |
| def _check_data_quality(df: pd.DataFrame, name: str, warnings_list: List[str]): | |
| try: | |
| missing_pct = (df.isnull().sum() / len(df)) * 100 | |
| high_missing = missing_pct[missing_pct > 50] | |
| if not high_missing.empty: | |
| warnings_list.append(f"{name}: High missing values: {high_missing.to_dict()}") | |
| except Exception as e: | |
| warnings_list.append(f"{name}: Error checking missing values: {str(e)}") | |
| try: | |
| duplicates = df.duplicated().sum() | |
| if duplicates > len(df) * 0.1: | |
| warnings_list.append(f"{name}: High duplicate rows ({duplicates})") | |
| except Exception as e: | |
| warnings_list.append(f"{name}: Error checking duplicates: {str(e)}") | |
| def _analyze_column_types(df: pd.DataFrame) -> Dict[str, int]: | |
| try: | |
| return {str(k): int(v) for k, v in df.dtypes.value_counts().to_dict().items()} | |
| except Exception: | |
| return {} | |
| def _build_validation_result(df: pd.DataFrame, issues: List[str], warnings_list: List[str], | |
| memory_mb: float, column_types: Dict[str, int]) -> Dict[str, Any]: | |
| return { | |
| "valid": len(issues) == 0, | |
| "errors": issues, | |
| "warnings": warnings_list, | |
| "shape": df.shape, | |
| "memory_usage_mb": memory_mb, | |
| "column_types": column_types, | |
| "null_counts": SafeDataProcessor.safe_json_convert(df.isnull().sum().to_dict()) | |
| } | |
| def _handle_validation_error(e: Exception, name: str, df: pd.DataFrame, | |
| warnings_list: List[str]) -> Dict[str, Any]: | |
| error_msg = f"Validation failed for {name}: {str(e)}" | |
| logger.error(error_msg) | |
| return { | |
| "valid": False, | |
| "errors": [error_msg], | |
| "warnings": warnings_list, | |
| "shape": df.shape if hasattr(df, 'shape') else (0, 0), | |
| "memory_usage_mb": 0, | |
| "column_types": {} | |
| } | |
| class EnhancedPrivacyAuditor: | |
| """Enhanced privacy auditor with comprehensive error handling and logging""" | |
| def __init__(self, config: AuditConfig = None): | |
| self.config = config or AuditConfig() | |
| self.audit_history = [] | |
| self.session_id = str(uuid.uuid4())[:8] | |
| self.current_audit_id = None | |
| logger.info(f"Initialized Privacy Auditor - Session: {self.session_id}") | |
| logger.info(f"Configuration: {asdict(self.config)}") | |
| def validate_inputs(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Comprehensive input validation with enhanced error handling""" | |
| logger.info("Starting comprehensive input validation") | |
| try: | |
| validator = DataValidator() | |
| # Validate individual datasets | |
| real_validation = validator.validate_dataframe(real_data, "Real Dataset") | |
| synth_validation = validator.validate_dataframe(synthetic_data, "Synthetic Dataset") | |
| all_errors = real_validation["errors"] + synth_validation["errors"] | |
| all_warnings = real_validation["warnings"] + synth_validation["warnings"] | |
| # Cross-dataset validation | |
| if real_validation["valid"] and synth_validation["valid"]: | |
| try: | |
| real_cols = set(real_data.columns) | |
| synth_cols = set(synthetic_data.columns) | |
| missing_in_synth = real_cols - synth_cols | |
| missing_in_real = synth_cols - real_cols | |
| if missing_in_synth: | |
| all_warnings.append(f"Columns missing in synthetic data: {list(missing_in_synth)}") | |
| if missing_in_real: | |
| all_warnings.append(f"Extra columns in synthetic data: {list(missing_in_real)}") | |
| # Check data type compatibility | |
| common_cols = real_cols & synth_cols | |
| for col in common_cols: | |
| try: | |
| real_type = real_data[col].dtype | |
| synth_type = synthetic_data[col].dtype | |
| if real_type != synth_type: | |
| all_warnings.append(f"Type mismatch in column '{col}': {real_type} vs {synth_type}") | |
| except Exception as e: | |
| all_warnings.append(f"Error checking column '{col}': {str(e)}") | |
| except Exception as e: | |
| all_warnings.append(f"Cross-validation error: {str(e)}") | |
| result = { | |
| "valid": len(all_errors) == 0, | |
| "errors": all_errors, | |
| "warnings": all_warnings, | |
| "real_dataset": real_validation, | |
| "synthetic_dataset": synth_validation | |
| } | |
| logger.info(f"Validation completed - Valid: {result['valid']}, Errors: {len(all_errors)}, Warnings: {len(all_warnings)}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Input validation failed: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| return { | |
| "valid": False, | |
| "errors": [error_msg], | |
| "warnings": [], | |
| "real_dataset": {"valid": False, "errors": [error_msg]}, | |
| "synthetic_dataset": {"valid": False, "errors": [error_msg]} | |
| } | |
| def safe_preprocess_data(self, df: pd.DataFrame, is_real: bool = True) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| """Enhanced data preprocessing with comprehensive error handling""" | |
| dataset_type = "real" if is_real else "synthetic" | |
| logger.info(f"Starting preprocessing for {dataset_type} dataset") | |
| report = { | |
| "dataset_type": dataset_type, | |
| "original_shape": df.shape, | |
| "start_time": datetime.now().isoformat(), | |
| "success": False, | |
| "steps_completed": [] | |
| } | |
| try: | |
| # Create working copy | |
| df_processed = df.copy() | |
| report["steps_completed"].append("data_copy") | |
| # Handle missing values | |
| try: | |
| missing_counts = df_processed.isnull().sum() | |
| if missing_counts.any(): | |
| logger.info(f"Handling missing values in {len(missing_counts[missing_counts > 0])} columns") | |
| for col in df_processed.columns: | |
| if missing_counts[col] > 0: | |
| try: | |
| if pd.api.types.is_numeric_dtype(df_processed[col]): | |
| fill_value = df_processed[col].median() | |
| if pd.isna(fill_value): | |
| fill_value = 0 | |
| else: | |
| mode_values = df_processed[col].mode() | |
| fill_value = mode_values[0] if len(mode_values) > 0 else 'unknown' | |
| df_processed[col].fillna(fill_value, inplace=True) | |
| except Exception as e: | |
| logger.warning(f"Failed to fill missing values in column '{col}': {e}") | |
| df_processed[col].fillna('unknown', inplace=True) | |
| report["missing_values_handled"] = SafeDataProcessor.safe_json_convert(missing_counts[missing_counts > 0].to_dict()) | |
| report["steps_completed"].append("missing_values") | |
| except Exception as e: | |
| logger.error(f"Missing value handling failed: {e}") | |
| report["errors"] = report.get("errors", []) + [f"Missing value handling: {str(e)}"] | |
| # Identify column types | |
| try: | |
| numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist() | |
| categorical_cols = df_processed.select_dtypes(exclude=[np.number]).columns.tolist() | |
| logger.info(f"Identified {len(numerical_cols)} numerical and {len(categorical_cols)} categorical columns") | |
| report["numerical_columns"] = numerical_cols | |
| report["categorical_columns"] = categorical_cols | |
| report["steps_completed"].append("column_identification") | |
| except Exception as e: | |
| logger.error(f"Column type identification failed: {e}") | |
| numerical_cols = [] | |
| categorical_cols = list(df_processed.columns) | |
| report["errors"] = report.get("errors", []) + [f"Column identification: {str(e)}"] | |
| # Handle categorical encoding | |
| if categorical_cols and self.config.categorical_encoding != "none": | |
| try: | |
| logger.info(f"Applying {self.config.categorical_encoding} encoding to categorical columns") | |
| if self.config.categorical_encoding == "onehot": | |
| # Limit categories to prevent explosion | |
| for col in categorical_cols[:]: # Copy list to modify during iteration | |
| try: | |
| unique_count = df_processed[col].nunique() | |
| if unique_count > self.config.max_categories_onehot: | |
| logger.warning(f"Column '{col}' has {unique_count} categories, limiting to top {self.config.max_categories_onehot - 1}") | |
| top_categories = df_processed[col].value_counts().head(self.config.max_categories_onehot - 1).index | |
| df_processed[col] = df_processed[col].apply( | |
| lambda x: x if x in top_categories else 'other' | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Error processing column '{col}': {e}") | |
| categorical_cols.remove(col) | |
| if categorical_cols: # Only if we have categorical columns left | |
| df_processed = pd.get_dummies( | |
| df_processed, | |
| columns=categorical_cols, | |
| prefix=categorical_cols, | |
| drop_first=True, | |
| dummy_na=True | |
| ) | |
| elif self.config.categorical_encoding == "label": | |
| for col in categorical_cols: | |
| try: | |
| # Simple label encoding | |
| unique_vals = df_processed[col].unique() | |
| label_map = {val: idx for idx, val in enumerate(unique_vals)} | |
| df_processed[col] = df_processed[col].map(label_map) | |
| except Exception as e: | |
| logger.warning(f"Label encoding failed for column '{col}': {e}") | |
| # Fallback to categorical codes | |
| df_processed[col] = pd.Categorical(df_processed[col]).codes | |
| report["categorical_encoding_applied"] = self.config.categorical_encoding | |
| report["steps_completed"].append("categorical_encoding") | |
| except Exception as e: | |
| logger.error(f"Categorical encoding failed: {e}") | |
| # Fallback to simple codes | |
| for col in categorical_cols: | |
| try: | |
| df_processed[col] = pd.Categorical(df_processed[col]).codes | |
| except Exception: | |
| df_processed[col] = 0 | |
| report["categorical_encoding_fallback"] = "categorical_codes" | |
| report["errors"] = report.get("errors", []) + [f"Categorical encoding: {str(e)}"] | |
| # Update numerical columns after encoding | |
| try: | |
| numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist() | |
| logger.info(f"After encoding: {len(numerical_cols)} numerical columns") | |
| except Exception: | |
| numerical_cols = [] | |
| # Handle numerical scaling | |
| if numerical_cols and self.config.numerical_scaling != "none": | |
| try: | |
| logger.info(f"Applying {self.config.numerical_scaling} scaling to numerical columns") | |
| # Simple scaling implementations to avoid sklearn dependency | |
| if self.config.numerical_scaling == "standard": | |
| for col in numerical_cols: | |
| try: | |
| mean_val = df_processed[col].mean() | |
| std_val = df_processed[col].std() | |
| if std_val > 0: | |
| df_processed[col] = (df_processed[col] - mean_val) / std_val | |
| except Exception as e: | |
| logger.warning(f"Standard scaling failed for column '{col}': {e}") | |
| elif self.config.numerical_scaling == "minmax": | |
| for col in numerical_cols: | |
| try: | |
| min_val = df_processed[col].min() | |
| max_val = df_processed[col].max() | |
| if max_val > min_val: | |
| df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val) | |
| except Exception as e: | |
| logger.warning(f"MinMax scaling failed for column '{col}': {e}") | |
| elif self.config.numerical_scaling == "robust": | |
| for col in numerical_cols: | |
| try: | |
| median_val = df_processed[col].median() | |
| q75 = df_processed[col].quantile(0.75) | |
| q25 = df_processed[col].quantile(0.25) | |
| iqr = q75 - q25 | |
| if iqr > 0: | |
| df_processed[col] = (df_processed[col] - median_val) / iqr | |
| except Exception as e: | |
| logger.warning(f"Robust scaling failed for column '{col}': {e}") | |
| report["numerical_scaling_applied"] = self.config.numerical_scaling | |
| report["steps_completed"].append("numerical_scaling") | |
| except Exception as e: | |
| logger.error(f"Numerical scaling failed: {e}") | |
| report["errors"] = report.get("errors", []) + [f"Numerical scaling: {str(e)}"] | |
| # Final cleanup | |
| try: | |
| # Replace infinite values | |
| df_processed = df_processed.replace([np.inf, -np.inf], np.nan) | |
| # Fill remaining NaN values | |
| df_processed = df_processed.fillna(0) | |
| # Ensure all data is numeric | |
| for col in df_processed.columns: | |
| if not pd.api.types.is_numeric_dtype(df_processed[col]): | |
| try: | |
| df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce') | |
| df_processed[col] = df_processed[col].fillna(0) | |
| except Exception: | |
| df_processed[col] = 0 | |
| report["steps_completed"].append("final_cleanup") | |
| except Exception as e: | |
| logger.error(f"Final cleanup failed: {e}") | |
| report["errors"] = report.get("errors", []) + [f"Final cleanup: {str(e)}"] | |
| report.update({ | |
| "final_shape": df_processed.shape, | |
| "processing_completed": datetime.now().isoformat(), | |
| "success": True | |
| }) | |
| logger.info(f"Preprocessing completed successfully for {dataset_type} dataset: {df_processed.shape}") | |
| return df_processed, report | |
| except Exception as e: | |
| error_msg = f"Preprocessing failed for {dataset_type} dataset: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| report.update({ | |
| "error": error_msg, | |
| "processing_completed": datetime.now().isoformat(), | |
| "success": False | |
| }) | |
| return df, report | |
| def safe_compute_distances(self, X: np.ndarray, S: np.ndarray) -> Optional[np.ndarray]: | |
| """Safe distance computation with memory management""" | |
| logger.info(f"Computing {self.config.distance_metric} distances for {len(X)}x{len(S)} matrix") | |
| try: | |
| # Memory check | |
| estimated_memory = (len(X) * len(S) * 8) / (1024 ** 3) # GB | |
| logger.info(f"Estimated memory requirement: {estimated_memory:.2f} GB") | |
| if estimated_memory > 4: # > 4GB | |
| logger.info("Using chunked computation for large distance matrix") | |
| return self._chunked_distance_computation(X, S) | |
| else: | |
| return self._direct_distance_computation(X, S) | |
| except Exception as e: | |
| logger.error(f"Distance computation failed: {e}") | |
| logger.error(traceback.format_exc()) | |
| return None | |
| def _direct_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: | |
| """Direct distance computation for smaller datasets""" | |
| try: | |
| if self.config.distance_metric == "euclidean": | |
| return distance_matrix(X, S) | |
| elif self.config.distance_metric == "manhattan": | |
| return distance_matrix(X, S, p=1) | |
| elif self.config.distance_metric == "cosine": | |
| # Manual cosine distance to avoid sklearn dependency | |
| X_norm = X / (np.linalg.norm(X, axis=1, keepdims=True) + 1e-10) | |
| S_norm = S / (np.linalg.norm(S, axis=1, keepdims=True) + 1e-10) | |
| cosine_sim = np.dot(X_norm, S_norm.T) | |
| return 1 - cosine_sim | |
| else: | |
| return distance_matrix(X, S) | |
| except Exception as e: | |
| logger.error(f"Direct distance computation failed: {e}") | |
| raise | |
| def _chunked_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: | |
| """Chunked distance computation for large datasets""" | |
| try: | |
| chunk_size = min(self.config.chunk_size, len(X)) | |
| distances = [] | |
| for i in range(0, len(X), chunk_size): | |
| end_idx = min(i + chunk_size, len(X)) | |
| chunk_X = X[i:end_idx] | |
| logger.debug(f"Processing chunk {i//chunk_size + 1}/{(len(X)-1)//chunk_size + 1}") | |
| chunk_dist = self._direct_distance_computation(chunk_X, S) | |
| distances.append(chunk_dist) | |
| return np.vstack(distances) | |
| except Exception as e: | |
| logger.error(f"Chunked distance computation failed: {e}") | |
| raise | |
| def safe_compute_epsilon(self, m: int, n: int, d: int, v: float, p: float = 0.05) -> float: | |
| """Safe epsilon computation with enhanced error handling""" | |
| try: | |
| # Input validation | |
| if any(x <= 0 for x in [m, n, d]) or v < 0 or not 0 < p < 1: | |
| logger.warning(f"Invalid epsilon parameters: m={m}, n={n}, d={d}, v={v}, p={p}") | |
| return 0.0 | |
| # Handle edge cases | |
| if v == 0: | |
| logger.warning("Distance sum is zero, returning zero epsilon") | |
| return 0.0 | |
| # Compute with numerical stability | |
| try: | |
| log_gamma_term = gammaln(d/2) - gammaln(d) | |
| log_md_factorial = gammaln(m * d + 1) | |
| log_top_terms = (np.log(p) + log_md_factorial) / m | |
| log_bottom_terms = ( | |
| np.log(2) + (d / 2) * np.log(np.pi) + | |
| np.log(n) + d * np.log(v) | |
| ) | |
| eps_lower = log_gamma_term + log_top_terms - log_bottom_terms | |
| except Exception as e: | |
| logger.warning(f"Epsilon computation numerical error: {e}") | |
| return 0.0 | |
| # Ensure result is valid | |
| if not np.isfinite(eps_lower): | |
| logger.warning("Non-finite epsilon computed") | |
| return 0.0 | |
| result = float(max(0, eps_lower)) | |
| logger.debug(f"Computed epsilon: {result} for confidence {1-p}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Epsilon computation failed: {e}") | |
| return 0.0 | |
| def run_comprehensive_audit(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Main audit function with comprehensive error handling and timeout""" | |
| self.current_audit_id = str(uuid.uuid4())[:12] | |
| start_time = datetime.now() | |
| logger.info(f"Starting comprehensive audit - ID: {self.current_audit_id}") | |
| def audit_worker(): | |
| try: | |
| # Input validation | |
| logger.info("Step 1/7: Input validation") | |
| validation_result = self.validate_inputs(real_data, synthetic_data) | |
| if not validation_result["valid"]: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": "Input validation failed", | |
| "validation_errors": validation_result["errors"], | |
| "validation_warnings": validation_result["warnings"], | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "input_validation" | |
| } | |
| # Preprocessing | |
| logger.info("Step 2/7: Data preprocessing") | |
| X_processed, real_report = self.safe_preprocess_data(real_data, is_real=True) | |
| S_processed, synth_report = self.safe_preprocess_data(synthetic_data, is_real=False) | |
| if not real_report["success"] or not synth_report["success"]: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": "Data preprocessing failed", | |
| "preprocessing_reports": {"real": real_report, "synthetic": synth_report}, | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "preprocessing" | |
| } | |
| # Align columns | |
| logger.info("Step 3/7: Column alignment") | |
| try: | |
| common_cols = list(set(X_processed.columns) & set(S_processed.columns)) | |
| if len(common_cols) == 0: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": "No common columns between datasets after preprocessing", | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "column_alignment" | |
| } | |
| X_processed = X_processed[common_cols].sort_index(axis=1) | |
| S_processed = S_processed[common_cols].sort_index(axis=1) | |
| logger.info(f"Using {len(common_cols)} common columns") | |
| except Exception as e: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": f"Column alignment failed: {str(e)}", | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "column_alignment" | |
| } | |
| # Convert to numpy arrays | |
| logger.info("Step 4/7: Array conversion") | |
| try: | |
| X = X_processed.astype(np.float64).values | |
| S = S_processed.astype(np.float64).values | |
| # Validate arrays | |
| if not np.isfinite(X).all(): | |
| logger.warning("Non-finite values in real data, cleaning...") | |
| X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) | |
| if not np.isfinite(S).all(): | |
| logger.warning("Non-finite values in synthetic data, cleaning...") | |
| S = np.nan_to_num(S, nan=0.0, posinf=0.0, neginf=0.0) | |
| except Exception as e: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": f"Array conversion failed: {str(e)}", | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "array_conversion" | |
| } | |
| # Subsampling | |
| logger.info("Step 5/7: Subsampling (if needed)") | |
| original_m = len(X) | |
| if self.config.subsample_size and len(X) > self.config.subsample_size: | |
| try: | |
| np.random.seed(42) | |
| indices = np.random.choice(len(X), self.config.subsample_size, replace=False) | |
| X = X[indices] | |
| logger.info(f"Subsampled real data from {original_m} to {len(X)} samples") | |
| except Exception as e: | |
| logger.warning(f"Subsampling failed: {e}, using full dataset") | |
| m, d = X.shape | |
| n = len(S) | |
| logger.info(f"Final dataset sizes - Real: {m}x{d}, Synthetic: {n}x{d}") | |
| # Distance computation | |
| logger.info("Step 6/7: Distance computation") | |
| dist_matrix = self.safe_compute_distances(X, S) | |
| if dist_matrix is None: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": "Distance computation failed", | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "distance_computation" | |
| } | |
| # Compute statistics | |
| logger.info("Step 7/7: Statistical analysis") | |
| try: | |
| nearest_distances = np.min(dist_matrix, axis=1) | |
| v = np.sum(nearest_distances) | |
| # Epsilon bounds for multiple confidence levels | |
| confidence_levels = [0.90, 0.95, 0.99] | |
| epsilon_bounds = {} | |
| for conf in confidence_levels: | |
| p = 1 - conf | |
| eps_lb = self.safe_compute_epsilon(m, n, d, v, p) | |
| epsilon_bounds[f"eps_lb_{int(conf*100)}"] = eps_lb | |
| # Distance statistics | |
| distance_stats = { | |
| "mean_nearest_distance": float(np.mean(nearest_distances)), | |
| "median_nearest_distance": float(np.median(nearest_distances)), | |
| "std_nearest_distance": float(np.std(nearest_distances)), | |
| "min_nearest_distance": float(np.min(nearest_distances)), | |
| "max_nearest_distance": float(np.max(nearest_distances)), | |
| "q25_nearest_distance": float(np.percentile(nearest_distances, 25)), | |
| "q75_nearest_distance": float(np.percentile(nearest_distances, 75)), | |
| "distance_sum": float(v), | |
| "zero_distance_count": int(np.sum(nearest_distances == 0)), | |
| "small_distance_count": int(np.sum(nearest_distances < 1e-6)) | |
| } | |
| # Risk assessment | |
| primary_epsilon = epsilon_bounds["eps_lb_95"] | |
| risk_level = self.assess_privacy_risk(primary_epsilon) | |
| except Exception as e: | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": f"Statistical analysis failed: {str(e)}", | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "statistical_analysis" | |
| } | |
| # Compile results | |
| duration = (datetime.now() - start_time).total_seconds() | |
| result = { | |
| "audit_id": self.current_audit_id, | |
| "session_id": self.session_id, | |
| "success": True, | |
| "audit_metadata": { | |
| "timestamp": start_time.isoformat(), | |
| "duration_seconds": round(duration, 2), | |
| "distance_metric": self.config.distance_metric, | |
| "configuration": asdict(self.config) | |
| }, | |
| "dataset_info": { | |
| "real_samples_original": original_m, | |
| "real_samples_used": m, | |
| "synthetic_samples": n, | |
| "dimensions": d, | |
| "common_features": len(common_cols), | |
| "subsampling_applied": self.config.subsample_size is not None and original_m > m | |
| }, | |
| "preprocessing_reports": { | |
| "real_dataset": real_report, | |
| "synthetic_dataset": synth_report | |
| }, | |
| "validation_result": validation_result, | |
| "epsilon_bounds": epsilon_bounds, | |
| "distance_statistics": distance_stats, | |
| "privacy_assessment": { | |
| "risk_level": risk_level, | |
| "primary_epsilon": primary_epsilon, | |
| "interpretation": self.get_risk_interpretation(risk_level, primary_epsilon), | |
| "recommendations": self.get_recommendations(risk_level, distance_stats) | |
| }, | |
| "data_quality": { | |
| "potential_memorization": distance_stats["zero_distance_count"] > 0, | |
| "very_close_matches": distance_stats["small_distance_count"], | |
| "distance_distribution_skew": self._safe_compute_skewness(nearest_distances) | |
| } | |
| } | |
| self.audit_history.append(result) | |
| logger.info(f"Audit completed successfully - ID: {self.current_audit_id}, Risk: {risk_level}, Duration: {duration:.2f}s") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Unexpected audit error: {str(e)}" | |
| logger.error(f"Audit failed - ID: {self.current_audit_id}: {error_msg}") | |
| logger.error(traceback.format_exc()) | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": error_msg, | |
| "timestamp": start_time.isoformat(), | |
| "traceback": traceback.format_exc(), | |
| "step_failed": "unexpected_error" | |
| } | |
| # Run with timeout | |
| try: | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(audit_worker) | |
| result = future.result(timeout=self.config.timeout_seconds) | |
| return result | |
| except FutureTimeoutError: | |
| error_msg = f"Audit timed out after {self.config.timeout_seconds} seconds" | |
| logger.error(error_msg) | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": error_msg, | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "timeout" | |
| } | |
| except Exception as e: | |
| error_msg = f"Audit execution failed: {str(e)}" | |
| logger.error(error_msg) | |
| return { | |
| "audit_id": self.current_audit_id, | |
| "error": error_msg, | |
| "timestamp": start_time.isoformat(), | |
| "step_failed": "execution_error" | |
| } | |
| def _safe_compute_skewness(self, data: np.ndarray) -> float: | |
| """Safely compute skewness""" | |
| try: | |
| if len(data) < 3: | |
| return 0.0 | |
| mean = np.mean(data) | |
| std = np.std(data) | |
| if std == 0: | |
| return 0.0 | |
| skewness = np.mean(((data - mean) / std) ** 3) | |
| return float(skewness) if np.isfinite(skewness) else 0.0 | |
| except Exception as e: | |
| logger.warning(f"Skewness computation failed: {e}") | |
| return 0.0 | |
| def assess_privacy_risk(self, epsilon: float) -> str: | |
| """Enhanced privacy risk assessment""" | |
| risk_thresholds = [ | |
| (0.01, "EXCEPTIONAL"), | |
| (0.1, "VERY LOW"), | |
| (0.5, "LOW"), | |
| (1.0, "MEDIUM"), | |
| (2.0, "HIGH"), | |
| (5.0, "VERY HIGH") | |
| ] | |
| try: | |
| for threshold, level in risk_thresholds: | |
| if epsilon <= threshold: | |
| return level | |
| return "CRITICAL" | |
| except Exception: | |
| return "UNKNOWN" | |
| def get_risk_interpretation(self, risk_level: str, epsilon: float) -> str: | |
| """Detailed risk interpretation""" | |
| interpretations = { | |
| "EXCEPTIONAL": "Outstanding privacy preservation. Suitable for highly sensitive applications.", | |
| "VERY LOW": "Excellent privacy preservation. Strong guarantees for most sensitive data.", | |
| "LOW": "Good privacy preservation. Acceptable for most commercial applications.", | |
| "MEDIUM": "Moderate privacy risk. Consider additional privacy-enhancing techniques.", | |
| "HIGH": "High privacy risk. Significant leakage detected. Review methodology.", | |
| "VERY HIGH": "Very high privacy risk. Additional privacy measures strongly recommended.", | |
| "CRITICAL": "Critical privacy risk. Synthetic data not suitable for production use." | |
| } | |
| try: | |
| base_msg = interpretations.get(risk_level, "Unknown risk level") | |
| return f"{base_msg} (ε = {epsilon:.6f})" | |
| except Exception as e: | |
| logger.warning(f"Risk interpretation failed: {e}") | |
| return f"Risk interpretation unavailable (ε = {epsilon:.6f})" | |
| def get_recommendations(self, risk_level: str, distance_stats: Dict[str, Any]) -> List[str]: | |
| """Generate actionable recommendations""" | |
| try: | |
| recommendations = [] | |
| risk_actions = { | |
| "HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production", | |
| "VERY HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production", | |
| "CRITICAL": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production" | |
| } | |
| if risk_level in risk_actions: | |
| recommendations.extend([ | |
| risk_actions[risk_level], | |
| "Consider stronger privacy-preserving methods (DP-SGD, PATE)", | |
| "Reduce model capacity or increase privacy budget", | |
| "Review data preprocessing and feature selection" | |
| ]) | |
| self._add_distance_recommendations(recommendations, distance_stats) | |
| if risk_level in ["EXCEPTIONAL", "VERY LOW", "LOW"]: | |
| recommendations.append("Privacy level acceptable for most production applications") | |
| return recommendations or ["Review detailed analysis for specific insights"] | |
| except Exception as e: | |
| logger.warning(f"Recommendations generation failed: {e}") | |
| return ["Could not generate recommendations due to analysis error"] | |
| def _add_distance_recommendations(self, recommendations: List[str], distance_stats: Dict[str, Any]): | |
| """Add distance-based recommendations""" | |
| zero_distances = distance_stats.get("zero_distance_count", 0) | |
| small_distances = distance_stats.get("small_distance_count", 0) | |
| if zero_distances > 0: | |
| recommendations.append(f"WARNING: {zero_distances} exact matches - potential memorization") | |
| if small_distances > zero_distances: | |
| close_matches = small_distances - zero_distances | |
| recommendations.append(f"REVIEW: {close_matches} close matches - check near-memorization") | |
| # Global auditor instance | |
| try: | |
| auditor = EnhancedPrivacyAuditor() | |
| logger.info("Privacy auditor initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize privacy auditor: {e}") | |
| auditor = None | |
| def create_safe_distance_plot(result: Dict[str, Any]) -> go.Figure: | |
| """Create enhanced privacy audit dashboard with improved data visualization""" | |
| try: | |
| if "error" in result: | |
| return _create_error_figure(result) | |
| return _create_comprehensive_dashboard(result) | |
| except Exception as e: | |
| logger.error(f"Distance plot creation failed: {e}") | |
| return _create_error_figure({"error": str(e)}) | |
| def _create_error_figure(result: Dict[str, Any]) -> go.Figure: | |
| """Create error visualization with clear messaging""" | |
| fig = go.Figure() | |
| fig.add_annotation( | |
| text=f"<b>Audit Error</b><br>{result.get('error', 'Unknown error')}<br><span style='font-size:12px'>Step: {result.get('step_failed', 'Unknown')}</span>", | |
| x=0.5, y=0.5, showarrow=False, | |
| font=dict(size=16, color="#dc3545"), | |
| align="center", | |
| bgcolor="rgba(220, 53, 69, 0.1)", | |
| bordercolor="#dc3545", | |
| borderwidth=2 | |
| ) | |
| fig.update_layout( | |
| title="Privacy Audit Failed", | |
| xaxis=dict(visible=False), | |
| yaxis=dict(visible=False), | |
| plot_bgcolor="white", | |
| paper_bgcolor="white" | |
| ) | |
| return fig | |
| def _create_comprehensive_dashboard(result: Dict[str, Any]) -> go.Figure: | |
| """Create simplified privacy dashboard focused on key metrics""" | |
| from plotly.subplots import make_subplots | |
| # Simplified 2x2 layout focusing on essential information | |
| fig = make_subplots( | |
| rows=2, cols=2, | |
| subplot_titles=( | |
| "Distance Statistics", | |
| "Privacy Risk Level", | |
| "Data Quality Assessment", | |
| "Key Metrics Summary" | |
| ), | |
| specs=[ | |
| [{"type": "bar"}, {"type": "indicator"}], | |
| [{"type": "bar"}, {"type": "table"}] | |
| ], | |
| vertical_spacing=0.2, | |
| horizontal_spacing=0.15 | |
| ) | |
| _add_simplified_distance_analysis(fig, result) | |
| _add_simplified_risk_assessment(fig, result) | |
| _add_simplified_quality_assessment(fig, result) | |
| _add_key_metrics_table(fig, result) | |
| # Clean, minimal layout | |
| fig.update_layout( | |
| title={ | |
| "text": "Privacy Audit Results", | |
| "x": 0.5, | |
| "xanchor": "center", | |
| "font": {"size": 18, "color": "#000000"} | |
| }, | |
| height=600, | |
| showlegend=False, | |
| plot_bgcolor="white", | |
| paper_bgcolor="white", | |
| font=dict(family="Arial, sans-serif", size=12, color="#000000"), | |
| margin=dict(t=80, b=50, l=60, r=60) | |
| ) | |
| return fig | |
| def _add_simplified_distance_analysis(fig, result: Dict[str, Any]): | |
| """Add simplified distance analysis focusing on key metrics""" | |
| stats = result.get("distance_statistics", {}) | |
| # Focus on most important metrics | |
| metrics = ["Mean", "Median", "Max"] | |
| values = [ | |
| stats.get("mean_nearest_distance", 0), | |
| stats.get("median_nearest_distance", 0), | |
| stats.get("max_nearest_distance", 0) | |
| ] | |
| # Use simple, accessible colors | |
| fig.add_trace( | |
| go.Bar( | |
| x=metrics, | |
| y=values, | |
| marker_color='#2563eb', | |
| marker_line=dict(color='#1e40af', width=1), | |
| text=[f"{v:.4f}" if v > 0 else "0.0000" for v in values], | |
| textposition='outside', | |
| textfont=dict(size=11, color="#000000"), | |
| hovertemplate="<b>%{x}</b><br>%{y:.6f}<extra></extra>", | |
| showlegend=False | |
| ), | |
| row=1, col=1 | |
| ) | |
| fig.update_xaxes(title_text="Distance Metric", row=1, col=1, title_font_size=12) | |
| fig.update_yaxes(title_text="Value", row=1, col=1, title_font_size=12) | |
| def _add_simplified_risk_assessment(fig, result: Dict[str, Any]): | |
| """Add simplified risk assessment indicator""" | |
| risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") | |
| epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) | |
| # Simple risk color mapping | |
| risk_colors = { | |
| "EXCEPTIONAL": "#059669", "VERY LOW": "#059669", "LOW": "#0891b2", | |
| "MEDIUM": "#ea580c", "HIGH": "#dc2626", "VERY HIGH": "#dc2626", | |
| "CRITICAL": "#991b1b", "UNKNOWN": "#6b7280" | |
| } | |
| fig.add_trace( | |
| go.Indicator( | |
| mode="number+gauge", | |
| value=epsilon, | |
| title={ | |
| "text": f"Privacy Risk: {risk_level}<br>Epsilon Value", | |
| "font": {"size": 14, "color": "#000000"} | |
| }, | |
| number={"font": {"size": 20, "color": risk_colors.get(risk_level, "#6b7280")}}, | |
| gauge={ | |
| "axis": {"range": [0, 5], "tickcolor": "#000000"}, | |
| "bar": {"color": risk_colors.get(risk_level, "#6b7280")}, | |
| "bgcolor": "white", | |
| "bordercolor": "#d1d5db", | |
| "borderwidth": 2 | |
| } | |
| ), | |
| row=1, col=2 | |
| ) | |
| def _add_simplified_quality_assessment(fig, result: Dict[str, Any]): | |
| """Add simplified quality assessment""" | |
| stats = result.get("distance_statistics", {}) | |
| total_samples = result.get("dataset_info", {}).get("real_samples_used", 1) | |
| zero_distances = stats.get("zero_distance_count", 0) | |
| small_distances = stats.get("small_distance_count", 0) | |
| categories = ["Safe", "Near Match", "Exact Match"] | |
| counts = [total_samples - small_distances, small_distances - zero_distances, zero_distances] | |
| fig.add_trace( | |
| go.Bar( | |
| x=categories, | |
| y=counts, | |
| marker_color=['#059669', '#ea580c', '#dc2626'], | |
| marker_line=dict(color='#000000', width=1), | |
| text=[f"{c:,}" for c in counts], | |
| textposition='outside', | |
| textfont=dict(size=11, color="#000000"), | |
| hovertemplate="<b>%{x}</b><br>Count: %{y:,}<extra></extra>", | |
| showlegend=False | |
| ), | |
| row=2, col=1 | |
| ) | |
| fig.update_xaxes(title_text="Sample Type", row=2, col=1, title_font_size=12) | |
| fig.update_yaxes(title_text="Count", row=2, col=1, title_font_size=12) | |
| def _add_key_metrics_table(fig, result: Dict[str, Any]): | |
| """Add key metrics summary table""" | |
| dataset_info = result.get("dataset_info", {}) | |
| stats = result.get("distance_statistics", {}) | |
| risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") | |
| epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) | |
| metrics = [ | |
| "Real Samples", | |
| "Synthetic Samples", | |
| "Dimensions", | |
| "Risk Level", | |
| "Epsilon Value", | |
| "Exact Matches" | |
| ] | |
| values = [ | |
| f"{dataset_info.get('real_samples_used', 0):,}", | |
| f"{dataset_info.get('synthetic_samples', 0):,}", | |
| f"{dataset_info.get('dimensions', 0)}", | |
| risk_level, | |
| f"{epsilon:.6f}", | |
| f"{stats.get('zero_distance_count', 0):,}" | |
| ] | |
| fig.add_trace( | |
| go.Table( | |
| header=dict( | |
| values=["<b>Metric</b>", "<b>Value</b>"], | |
| fill_color="#f3f4f6", | |
| font=dict(size=12, color="#000000"), | |
| align="left", | |
| line_color="#d1d5db" | |
| ), | |
| cells=dict( | |
| values=[metrics, values], | |
| fill_color="white", | |
| font=dict(size=11, color="#000000"), | |
| align="left", | |
| line_color="#d1d5db", | |
| height=30 | |
| ) | |
| ), | |
| row=2, col=2 | |
| ) | |
| def _add_privacy_bounds(fig, result: Dict[str, Any]): | |
| """Add privacy bounds comparison across confidence levels""" | |
| epsilon_bounds = result.get("epsilon_bounds", {}) | |
| confidence_levels = ["90%", "95%", "99%"] | |
| epsilon_values = [ | |
| epsilon_bounds.get("eps_lb_90", 0), | |
| epsilon_bounds.get("eps_lb_95", 0), | |
| epsilon_bounds.get("eps_lb_99", 0) | |
| ] | |
| # Use gradient colors to show increasing confidence | |
| colors = ['#52c41a', '#1890ff', '#722ed1'] | |
| fig.add_trace( | |
| go.Bar( | |
| x=confidence_levels, | |
| y=epsilon_values, | |
| marker_color=colors, | |
| text=[f"ε = {v:.6f}" for v in epsilon_values], | |
| textposition='outside', | |
| hovertemplate="<b>%{x} Confidence</b><br>ε Lower Bound: %{y:.6f}<extra></extra>", | |
| name="Privacy Bounds" | |
| ), | |
| row=2, col=2 | |
| ) | |
| fig.update_xaxes(title_text="Confidence Level", row=2, col=2) | |
| fig.update_yaxes(title_text="ε Lower Bound", row=2, col=2, type="log" if max(epsilon_values) > 0 else "linear") | |
| def _add_processing_status(fig, result: Dict[str, Any]): | |
| """Add processing pipeline status visualization""" | |
| real_report = result.get("preprocessing_reports", {}).get("real_dataset", {}) | |
| synth_report = result.get("preprocessing_reports", {}).get("synthetic_dataset", {}) | |
| # Count completed processing steps | |
| real_steps = len(real_report.get("steps_completed", [])) | |
| synth_steps = len(synth_report.get("steps_completed", [])) | |
| total_steps = 6 # Expected number of processing steps | |
| datasets = ["Real Dataset", "Synthetic Dataset"] | |
| completion = [real_steps / total_steps * 100, synth_steps / total_steps * 100] | |
| colors = ['#28a745' if c == 100 else '#ffc107' for c in completion] | |
| fig.add_trace( | |
| go.Bar( | |
| x=datasets, | |
| y=completion, | |
| marker_color=colors, | |
| text=[f"{c:.0f}%<br>({int(c/100*total_steps)}/{total_steps})" for c in completion], | |
| textposition='auto', | |
| hovertemplate="<b>%{x}</b><br>Processing: %{y:.0f}% Complete<extra></extra>", | |
| name="Processing Status" | |
| ), | |
| row=2, col=3 | |
| ) | |
| fig.update_xaxes(title_text="Dataset Type", row=2, col=3) | |
| fig.update_yaxes(title_text="Processing Completion %", row=2, col=3, range=[0, 100]) | |
| def create_safe_epsilon_plot(result: Dict[str, Any]) -> go.Figure: | |
| """Create simplified epsilon analysis plot""" | |
| try: | |
| if "error" in result: | |
| return _create_error_figure(result) | |
| epsilon_bounds = result.get("epsilon_bounds", {}) | |
| confidence_levels = [90, 95, 99] | |
| epsilon_values = [epsilon_bounds.get(f"eps_lb_{conf}", 0) for conf in confidence_levels] | |
| fig = go.Figure() | |
| # Simple bar chart | |
| fig.add_trace(go.Bar( | |
| x=[f"{conf}%" for conf in confidence_levels], | |
| y=epsilon_values, | |
| marker_color='#2563eb', | |
| marker_line=dict(color='#1e40af', width=1), | |
| text=[f"{eps:.6f}" for eps in epsilon_values], | |
| textposition='outside', | |
| textfont=dict(size=11, color="#000000"), | |
| hovertemplate="<b>%{x} Confidence</b><br>Epsilon: %{y:.6f}<extra></extra>", | |
| showlegend=False | |
| )) | |
| fig.update_layout( | |
| title="Privacy Budget Analysis", | |
| xaxis_title="Confidence Level", | |
| yaxis_title="Epsilon Lower Bound", | |
| plot_bgcolor="white", | |
| paper_bgcolor="white", | |
| font=dict(family="Arial, sans-serif", size=12, color="#000000"), | |
| height=400, | |
| margin=dict(t=80, b=50, l=60, r=60) | |
| ) | |
| return fig | |
| except Exception as e: | |
| logger.error(f"Epsilon plot creation failed: {e}") | |
| return _create_error_figure({"error": str(e)}) | |
| def generate_safe_report(result: Dict[str, Any]) -> str: | |
| """Generate safe executive report with error handling""" | |
| try: | |
| if "error" in result: | |
| return f""" | |
| # Privacy Audit Failed | |
| **Error:** {result.get('error', 'Unknown error')} | |
| **Audit ID:** {result.get('audit_id', 'N/A')} | |
| **Timestamp:** {result.get('timestamp', 'N/A')} | |
| **Failed Step:** {result.get('step_failed', 'Unknown')} | |
| ## Troubleshooting | |
| Please check the following: | |
| - Both datasets are in CSV format with headers | |
| - Files are not corrupted and can be opened | |
| - Datasets have overlapping column names | |
| - Data contains numeric values or categorical data that can be encoded | |
| - File sizes are within limits | |
| ## Next Steps | |
| 1. Review the error message above | |
| 2. Check your data format and content | |
| 3. Try with smaller datasets if memory/timeout issues occur | |
| 4. Contact support if the issue persists | |
| --- | |
| *Report generated by Enterprise Privacy Auditor* | |
| """ | |
| # Extract key information | |
| risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") | |
| epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0) | |
| # Build comprehensive report | |
| report = f""" | |
| # Privacy Audit Executive Summary | |
| ## Overall Assessment: {risk_level} RISK | |
| **Audit ID:** {result.get('audit_id', 'N/A')} | |
| **Session ID:** {result.get('session_id', 'N/A')} | |
| **Conducted:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')} | |
| **Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds | |
| --- | |
| ## Key Findings | |
| ### Privacy Metrics | |
| - **Primary ε-DP Bound (95% confidence):** {epsilon:.6f} | |
| - **Risk Assessment:** {result.get('privacy_assessment', {}).get('interpretation', 'N/A')} | |
| ### Dataset Overview | |
| - **Real Data Samples (Original):** {result.get('dataset_info', {}).get('real_samples_original', 'N/A'):,} | |
| - **Real Data Samples (Used):** {result.get('dataset_info', {}).get('real_samples_used', 'N/A'):,} | |
| - **Synthetic Data Samples:** {result.get('dataset_info', {}).get('synthetic_samples', 'N/A'):,} | |
| - **Feature Dimensions:** {result.get('dataset_info', {}).get('dimensions', 'N/A')} | |
| - **Common Features:** {result.get('dataset_info', {}).get('common_features', 'N/A')} | |
| ### Data Quality Indicators | |
| - **Exact Matches (Memorization):** {result.get('distance_statistics', {}).get('zero_distance_count', 'N/A')} | |
| - **Very Close Matches:** {result.get('distance_statistics', {}).get('small_distance_count', 'N/A')} | |
| - **Mean Nearest Distance:** {result.get('distance_statistics', {}).get('mean_nearest_distance', 0):.6f} | |
| --- | |
| ## Recommendations | |
| """ | |
| # Add recommendations | |
| recommendations = result.get('privacy_assessment', {}).get('recommendations', []) | |
| if recommendations: | |
| for i, rec in enumerate(recommendations, 1): | |
| report += f"{i}. {rec}\n" | |
| else: | |
| report += "No specific recommendations available.\n" | |
| report += f""" | |
| --- | |
| ## Detailed Analysis | |
| ### Multi-Confidence Privacy Bounds | |
| | Confidence Level | ε Lower Bound | Risk Level | | |
| |------------------|---------------|------------|""" | |
| # Add epsilon bounds table | |
| epsilon_bounds = result.get('epsilon_bounds', {}) | |
| for conf in [90, 95, 99]: | |
| eps_val = epsilon_bounds.get(f'eps_lb_{conf}', 0) | |
| risk = auditor.assess_privacy_risk(eps_val) if auditor else "UNKNOWN" | |
| report += f"\n| {conf}% | {eps_val:.6f} | {risk} |" | |
| # Add distance statistics | |
| dist_stats = result.get('distance_statistics', {}) | |
| report += f""" | |
| ### Distance Statistics Summary | |
| - **Mean:** {dist_stats.get('mean_nearest_distance', 0):.6f} | |
| - **Median:** {dist_stats.get('median_nearest_distance', 0):.6f} | |
| - **Standard Deviation:** {dist_stats.get('std_nearest_distance', 0):.6f} | |
| - **Range:** [{dist_stats.get('min_nearest_distance', 0):.6f}, {dist_stats.get('max_nearest_distance', 0):.6f}] | |
| - **25th Percentile:** {dist_stats.get('q25_nearest_distance', 0):.6f} | |
| - **75th Percentile:** {dist_stats.get('q75_nearest_distance', 0):.6f} | |
| ### Data Quality Assessment | |
| - **Potential Memorization:** {"Yes" if result.get('data_quality', {}).get('potential_memorization', False) else "No"} | |
| - **Distribution Skewness:** {result.get('data_quality', {}).get('distance_distribution_skew', 0):.4f} | |
| --- | |
| ## Configuration Used | |
| **Preprocessing:** | |
| - Categorical Encoding: {result.get('audit_metadata', {}).get('configuration', {}).get('categorical_encoding', 'N/A')} | |
| - Numerical Scaling: {result.get('audit_metadata', {}).get('configuration', {}).get('numerical_scaling', 'N/A')} | |
| - Distance Metric: {result.get('audit_metadata', {}).get('configuration', {}).get('distance_metric', 'N/A')} | |
| **Audit Parameters:** | |
| - Confidence Level: {result.get('audit_metadata', {}).get('configuration', {}).get('confidence_level', 'N/A')} | |
| - Subsample Size: {result.get('audit_metadata', {}).get('configuration', {}).get('subsample_size', 'None (full dataset)')} | |
| - Timeout: {result.get('audit_metadata', {}).get('configuration', {}).get('timeout_seconds', 'N/A')} seconds | |
| --- | |
| ## Methodology | |
| This audit implements the state-of-the-art one-run nearest-neighbor ε-DP auditor. The method provides rigorous lower bounds on the privacy parameter ε, indicating the minimum privacy budget required under differential privacy guarantees. | |
| **Key Benefits:** | |
| - Single-run analysis (no multiple generations needed) | |
| - Rigorous mathematical guarantees | |
| - Suitable for enterprise environments | |
| - Comprehensive preprocessing and validation | |
| --- | |
| ## Support Information | |
| For questions about this audit or to report issues: | |
| - Review the detailed technical logs | |
| - Check the preprocessing reports for data quality issues | |
| - Ensure your data meets the format requirements | |
| --- | |
| *Report generated by Enterprise Privacy Auditor v2.0* | |
| *Session: {result.get('session_id', 'N/A')} | Audit: {result.get('audit_id', 'N/A')}* | |
| """ | |
| return report | |
| except Exception as e: | |
| logger.error(f"Report generation failed: {e}") | |
| return f""" | |
| # Report Generation Failed | |
| An error occurred while generating the executive report: | |
| **Error:** {str(e)} | |
| ## Raw Audit Data | |
| ```json | |
| {json.dumps(result, indent=2, default=str)} | |
| ``` | |
| --- | |
| *Please contact support for assistance* | |
| """ | |
| def safe_export_results(result: Dict[str, Any]) -> Optional[str]: | |
| """Safe export with comprehensive error handling""" | |
| try: | |
| logger.info("Generating export package") | |
| # Create temporary file for export | |
| import tempfile | |
| export_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.zip', delete=False) | |
| with zipfile.ZipFile(export_file, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| # Core results (with safe JSON conversion) | |
| try: | |
| safe_result = {} | |
| for key, value in result.items(): | |
| safe_result[key] = SafeDataProcessor.safe_json_convert(value) | |
| zip_file.writestr( | |
| "audit_results.json", | |
| json.dumps(safe_result, indent=2, default=str) | |
| ) | |
| logger.debug("Added audit results to export") | |
| except Exception as e: | |
| logger.warning(f"Failed to add audit results: {e}") | |
| zip_file.writestr("audit_results_error.txt", f"Failed to export results: {str(e)}") | |
| # Executive report | |
| try: | |
| exec_report = generate_safe_report(result) | |
| zip_file.writestr("executive_summary.md", exec_report) | |
| logger.debug("Added executive report to export") | |
| except Exception as e: | |
| logger.warning(f"Failed to add executive report: {e}") | |
| zip_file.writestr("executive_summary_error.txt", f"Failed to generate report: {str(e)}") | |
| # Technical details | |
| try: | |
| tech_details = f""" | |
| # Technical Privacy Audit Report | |
| ## Audit Metadata | |
| - **Audit ID:** {result.get('audit_id', 'N/A')} | |
| - **Session ID:** {result.get('session_id', 'N/A')} | |
| - **Timestamp:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')} | |
| - **Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds | |
| - **Success:** {result.get('success', False)} | |
| ## Configuration Details | |
| {json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)} | |
| ## Dataset Information | |
| {json.dumps(result.get('dataset_info', {}), indent=2, default=str)} | |
| ## Validation Results | |
| {json.dumps(result.get('validation_result', {}), indent=2, default=str)} | |
| ## Distance Statistics | |
| {json.dumps(result.get('distance_statistics', {}), indent=2, default=str)} | |
| ## Privacy Assessment | |
| {json.dumps(result.get('privacy_assessment', {}), indent=2, default=str)} | |
| """ | |
| zip_file.writestr("technical_details.md", tech_details) | |
| logger.debug("Added technical details to export") | |
| except Exception as e: | |
| logger.warning(f"Failed to add technical details: {e}") | |
| # Key metrics CSV | |
| try: | |
| if "error" not in result: | |
| metrics_data = { | |
| 'Metric': [ | |
| 'Audit_ID', 'Risk_Level', 'Primary_Epsilon', 'Mean_Distance', | |
| 'Zero_Distances', 'Close_Matches', 'Duration_Seconds', | |
| 'Real_Samples', 'Synthetic_Samples', 'Dimensions' | |
| ], | |
| 'Value': [ | |
| result.get('audit_id', ''), | |
| result.get('privacy_assessment', {}).get('risk_level', ''), | |
| result.get('privacy_assessment', {}).get('primary_epsilon', 0), | |
| result.get('distance_statistics', {}).get('mean_nearest_distance', 0), | |
| result.get('distance_statistics', {}).get('zero_distance_count', 0), | |
| result.get('distance_statistics', {}).get('small_distance_count', 0), | |
| result.get('audit_metadata', {}).get('duration_seconds', 0), | |
| result.get('dataset_info', {}).get('real_samples_used', 0), | |
| result.get('dataset_info', {}).get('synthetic_samples', 0), | |
| result.get('dataset_info', {}).get('dimensions', 0) | |
| ] | |
| } | |
| metrics_df = pd.DataFrame(metrics_data) | |
| csv_buffer = io.StringIO() | |
| metrics_df.to_csv(csv_buffer, index=False) | |
| zip_file.writestr("key_metrics.csv", csv_buffer.getvalue()) | |
| logger.debug("Added metrics CSV to export") | |
| except Exception as e: | |
| logger.warning(f"Failed to add metrics CSV: {e}") | |
| # Audit log | |
| try: | |
| log_content = f""" | |
| Privacy Audit Log - {result.get('audit_id', 'N/A')} | |
| {'='*60} | |
| Audit Started: {result.get('audit_metadata', {}).get('timestamp', 'N/A')} | |
| Session ID: {result.get('session_id', 'N/A')} | |
| Configuration: | |
| {json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)} | |
| Dataset Information: | |
| - Real samples (original): {result.get('dataset_info', {}).get('real_samples_original', 'N/A')} | |
| - Real samples (used): {result.get('dataset_info', {}).get('real_samples_used', 'N/A')} | |
| - Synthetic samples: {result.get('dataset_info', {}).get('synthetic_samples', 'N/A')} | |
| - Dimensions: {result.get('dataset_info', {}).get('dimensions', 'N/A')} | |
| {"Success: Audit completed successfully" if "error" not in result else f"Failed: {result.get('error', 'Unknown error')}"} | |
| Duration: {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds | |
| Privacy Results: | |
| - Risk Level: {result.get('privacy_assessment', {}).get('risk_level', 'N/A')} | |
| - Primary ε: {result.get('privacy_assessment', {}).get('primary_epsilon', 'N/A')} | |
| Validation Warnings: | |
| {chr(10).join(result.get('validation_result', {}).get('warnings', ['None']))} | |
| Export completed: {datetime.now().isoformat()} | |
| """ | |
| zip_file.writestr("audit.log", log_content) | |
| logger.debug("Added audit log to export") | |
| except Exception as e: | |
| logger.warning(f"Failed to add audit log: {e}") | |
| export_file.close() | |
| logger.info("Export package generated successfully") | |
| return export_file.name | |
| except Exception as e: | |
| logger.error(f"Export generation failed: {e}") | |
| logger.error(traceback.format_exc()) | |
| # Create minimal error export | |
| try: | |
| import tempfile | |
| error_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| error_file.write(f"Export failed: {str(e)}\n\nTimestamp: {datetime.now().isoformat()}") | |
| if result: | |
| error_file.write(f"\n\nRaw result: {str(result)}") | |
| error_file.close() | |
| return error_file.name | |
| except Exception: | |
| return None | |
| def run_enhanced_audit(real_file, synthetic_file, confidence, subsample_size, | |
| categorical_encoding, numerical_scaling, distance_metric): | |
| """Enhanced main audit interface with comprehensive error handling and status updates""" | |
| # Input validation | |
| if not real_file or not synthetic_file: | |
| error_msg = "Please upload both real and synthetic datasets" | |
| logger.warning(error_msg) | |
| empty_result = {"error": error_msg, "step_failed": "file_upload"} | |
| return empty_result, None, None, f"ERROR: {error_msg}", None | |
| logger.info("Starting enhanced privacy audit") | |
| logger.info(f"Files: Real={real_file.name}, Synthetic={synthetic_file.name}") | |
| try: | |
| # Validate and update configuration | |
| try: | |
| new_config = AuditConfig( | |
| confidence_level=confidence, | |
| subsample_size=int(subsample_size) if subsample_size > 0 else None, | |
| categorical_encoding=categorical_encoding, | |
| numerical_scaling=numerical_scaling, | |
| distance_metric=distance_metric | |
| ) | |
| config_errors = new_config.validate() | |
| if config_errors: | |
| error_msg = f"Configuration errors: {'; '.join(config_errors)}" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "configuration"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| if auditor: | |
| auditor.config = new_config | |
| logger.info("Configuration updated successfully") | |
| else: | |
| error_msg = "Auditor not initialized" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "initialization"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| except Exception as e: | |
| error_msg = f"Configuration error: {str(e)}" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "configuration"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| # Load datasets with enhanced error handling | |
| try: | |
| logger.info("Loading datasets...") | |
| real_df, real_error = SafeDataProcessor.safe_read_csv(real_file.name) | |
| if real_df is None: | |
| error_msg = f"Failed to load real dataset: {real_error}" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "data_loading"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| synth_df, synth_error = SafeDataProcessor.safe_read_csv(synthetic_file.name) | |
| if synth_df is None: | |
| error_msg = f"Failed to load synthetic dataset: {synth_error}" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "data_loading"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| logger.info(f"Datasets loaded successfully - Real: {real_df.shape}, Synthetic: {synth_df.shape}") | |
| except Exception as e: | |
| error_msg = f"Data loading error: {str(e)}" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "data_loading"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| # Check file sizes | |
| try: | |
| real_size_mb = real_df.memory_usage(deep=True).sum() / 1024 / 1024 | |
| synth_size_mb = synth_df.memory_usage(deep=True).sum() / 1024 / 1024 | |
| logger.info(f"Memory usage - Real: {real_size_mb:.2f}MB, Synthetic: {synth_size_mb:.2f}MB") | |
| if real_size_mb > auditor.config.max_file_size_mb or synth_size_mb > auditor.config.max_file_size_mb: | |
| error_msg = f"File size exceeds limit ({auditor.config.max_file_size_mb}MB). Real: {real_size_mb:.1f}MB, Synthetic: {synth_size_mb:.1f}MB" | |
| logger.error(error_msg) | |
| error_result = {"error": error_msg, "step_failed": "size_check"} | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| except Exception as e: | |
| logger.warning(f"Size check failed: {e}") | |
| # Continue anyway | |
| # Run comprehensive audit | |
| logger.info("Starting comprehensive privacy audit...") | |
| result = auditor.run_comprehensive_audit(real_df, synth_df) | |
| # Check for audit errors | |
| if "error" in result: | |
| error_msg = result["error"] | |
| step_failed = result.get("step_failed", "unknown") | |
| logger.error(f"Audit failed at step '{step_failed}': {error_msg}") | |
| return result, None, None, f"ERROR: Audit failed at {step_failed}: {error_msg}", None | |
| # Generate visualizations safely | |
| dist_plot = None | |
| eps_plot = None | |
| try: | |
| logger.info("Generating visualizations...") | |
| dist_plot = create_safe_distance_plot(result) | |
| eps_plot = create_safe_epsilon_plot(result) | |
| logger.info("Visualizations generated successfully") | |
| except Exception as e: | |
| logger.warning(f"Visualization generation failed: {e}") | |
| # Continue without visualizations | |
| # Generate report safely | |
| try: | |
| logger.info("Generating executive report...") | |
| report = generate_safe_report(result) | |
| logger.info("Report generated successfully") | |
| except Exception as e: | |
| logger.warning(f"Report generation failed: {e}") | |
| report = f"ERROR: Report generation failed: {str(e)}" | |
| # Generate export safely | |
| export_data = None | |
| try: | |
| logger.info("Generating export package...") | |
| export_data = safe_export_results(result) | |
| if export_data: | |
| logger.info("Export package generated successfully") | |
| else: | |
| logger.warning("Export generation returned no data") | |
| except Exception as e: | |
| logger.warning(f"Export generation failed: {e}") | |
| # Log success | |
| risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN") | |
| logger.info(f"Audit completed - ID: {result.get('audit_id')}, Risk: {risk_level}") | |
| return result, dist_plot, eps_plot, report, export_data | |
| except Exception as e: | |
| error_msg = f"Unexpected error in audit interface: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(traceback.format_exc()) | |
| error_result = { | |
| "error": error_msg, | |
| "step_failed": "unexpected_error", | |
| "traceback": traceback.format_exc(), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return error_result, None, None, f"ERROR: {error_msg}", None | |
| def create_enhanced_interface(): | |
| """Create the enhanced Gradio interface with improved UX""" | |
| # Custom CSS for better UI | |
| custom_css = """ | |
| .main-header { | |
| text-align: center; | |
| margin-bottom: 30px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 20px; | |
| border-radius: 15px; | |
| margin: 20px; | |
| } | |
| .config-section { | |
| background-color: #f8f9fa; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| border-left: 4px solid #007bff; | |
| } | |
| .results-section { | |
| background-color: #e8f5e8; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| border-left: 4px solid #28a745; | |
| } | |
| .error-section { | |
| background-color: #ffe6e6; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| border-left: 4px solid #dc3545; | |
| } | |
| .status-box { | |
| padding: 15px; | |
| border-radius: 8px; | |
| margin: 10px 0; | |
| font-weight: 500; | |
| } | |
| .upload-section { | |
| border: 2px dashed #ccc; | |
| border-radius: 10px; | |
| padding: 20px; | |
| margin: 10px 0; | |
| background-color: #fafafa; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="Enterprise Privacy Auditor", | |
| theme=gr.themes.Soft(), | |
| css=custom_css | |
| ) as demo: | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>Privacy Auditor for Generative Models</h1> | |
| <h3>Advanced Privacy Assessment Tool for Synthetic Data Generation</h3> | |
| <p><em>Implementing state-of-the-art one-run nearest-neighbor ε-DP auditing with enterprise features</em></p> | |
| <p>Secure • Comprehensive • Fast • Configurable</p> | |
| </div> | |
| """) | |
| # Main audit interface | |
| with gr.Tab("Privacy Audit", elem_id="audit-tab"): | |
| with gr.Row(): | |
| # Left column - Configuration and upload | |
| with gr.Column(scale=1): | |
| gr.HTML('<div class="upload-section">') | |
| gr.Markdown("### Dataset Upload") | |
| real_file = gr.File( | |
| label="Real/Original Dataset (CSV)", | |
| file_types=[".csv"], | |
| elem_id="real-file" | |
| ) | |
| gr.Markdown("*Upload the original dataset used for training or reference*") | |
| synth_file = gr.File( | |
| label="Synthetic Dataset (CSV)", | |
| file_types=[".csv"], | |
| elem_id="synth-file" | |
| ) | |
| gr.Markdown("*Upload the synthetic dataset to be audited for privacy*") | |
| gr.HTML('</div>') | |
| gr.HTML('<div class="config-section">') | |
| gr.Markdown("### Advanced Configuration") | |
| with gr.Group(): | |
| gr.Markdown("#### Privacy Parameters") | |
| confidence = gr.Slider( | |
| 0.80, 0.999, value=0.95, step=0.001, | |
| label="Primary Confidence Level", | |
| info="Higher values provide more conservative privacy bounds" | |
| ) | |
| subsample_size = gr.Number( | |
| value=0, minimum=0, maximum=100000, step=1000, | |
| label="Subsample Size (0 = use all data)", | |
| info="Limit real data samples for faster computation on large datasets" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("#### Data Processing") | |
| categorical_encoding = gr.Dropdown( | |
| choices=[ | |
| ("One-Hot Encoding (recommended)", "onehot"), | |
| ("Label Encoding (memory efficient)", "label") | |
| ], | |
| value="onehot", | |
| label="Categorical Variable Encoding", | |
| info="How to handle non-numeric categorical variables" | |
| ) | |
| numerical_scaling = gr.Dropdown( | |
| choices=[ | |
| ("Standard Scaling (recommended)", "standard"), | |
| ("Min-Max Scaling", "minmax"), | |
| ("Robust Scaling (outlier resistant)", "robust"), | |
| ("No Scaling", "none") | |
| ], | |
| value="standard", | |
| label="Numerical Feature Scaling", | |
| info="Normalization method for numerical features" | |
| ) | |
| distance_metric = gr.Dropdown( | |
| choices=[ | |
| ("Euclidean (recommended)", "euclidean"), | |
| ("Manhattan (robust to outliers)", "manhattan"), | |
| ("Cosine (for high-dimensional data)", "cosine") | |
| ], | |
| value="euclidean", | |
| label="Distance Metric", | |
| info="Method for computing distances between data points" | |
| ) | |
| gr.HTML('</div>') | |
| # Prominent run button | |
| run_btn = gr.Button( | |
| "Run Comprehensive Privacy Audit", | |
| variant="primary", | |
| size="lg", | |
| elem_id="run-audit-btn" | |
| ) | |
| gr.Markdown(""" | |
| ### Quick Start Guide | |
| 1. Upload both datasets in CSV format with headers | |
| 2. Keep file sizes under 500MB for optimal performance | |
| 3. Review configuration settings (defaults work for most cases) | |
| 4. Run audit and review comprehensive results | |
| """) | |
| # Right column - Results and status | |
| with gr.Column(scale=2): | |
| gr.HTML('<div class="results-section">') | |
| gr.Markdown("### Audit Results & Status") | |
| # Status display | |
| audit_status = gr.Markdown( | |
| "**Ready to run audit**\n\nPlease upload your datasets and configure the audit parameters.", | |
| elem_classes=["status-box"] | |
| ) | |
| # Detailed results | |
| with gr.Group(): | |
| audit_results = gr.JSON( | |
| label="Detailed Audit Results", | |
| elem_id="audit-results" | |
| ) | |
| gr.HTML('</div>') | |
| gr.Markdown("### Interactive Visualizations") | |
| # Visualization tabs | |
| with gr.Tabs(): | |
| with gr.Tab("Privacy Dashboard"): | |
| distance_plot = gr.Plot( | |
| label="Comprehensive Privacy Analysis", | |
| elem_id="distance-plot" | |
| ) | |
| with gr.Tab("Risk Analysis"): | |
| epsilon_plot = gr.Plot( | |
| label="Privacy Bounds & Risk Assessment", | |
| elem_id="epsilon-plot" | |
| ) | |
| # Executive report tab | |
| with gr.Tab("Executive Report", elem_id="report-tab"): | |
| gr.Markdown("### Executive Summary & Detailed Analysis") | |
| gr.Markdown("*Complete report will be generated after running the audit*") | |
| audit_report = gr.Markdown( | |
| """ | |
| **No audit completed yet** | |
| Run a privacy audit to generate a comprehensive executive report including: | |
| - Privacy risk assessment and recommendations | |
| - Statistical analysis and data quality metrics | |
| - Technical details and configuration summary | |
| - Actionable insights for improving privacy | |
| """, | |
| elem_id="audit-report" | |
| ) | |
| gr.Markdown("### Export & Download") | |
| export_btn = gr.File( | |
| label="Download Complete Audit Package", | |
| elem_id="export-file", | |
| visible=False | |
| ) | |
| gr.Markdown(""" | |
| Complete audit package includes: | |
| - Executive summary report (Markdown) | |
| - Technical analysis report (Markdown) | |
| - Key metrics spreadsheet (CSV) | |
| - Audit configuration details (JSON) | |
| - Comprehensive audit log (Text) | |
| - Raw results data (JSON) | |
| """) | |
| # Documentation tab | |
| with gr.Tab("Documentation", elem_id="docs-tab"): | |
| gr.Markdown(""" | |
| ## Enterprise Privacy Auditor | |
| ### Methodology | |
| This tool implements the **state-of-the-art one-run nearest-neighbor ε-DP auditor** providing rigorous lower bounds on privacy parameters without requiring multiple dataset generations. | |
| ### Enterprise Features | |
| - **Local Processing**: All data remains secure on your infrastructure | |
| - **Comprehensive Logging**: Detailed audit trails and error reporting | |
| - **Scalable Architecture**: Memory-efficient processing for large datasets | |
| - **Configurable Pipeline**: Flexible preprocessing and analysis options | |
| ### Privacy Risk Framework | |
| | Risk Level | ε Range | Interpretation | Action Required | | |
| |------------|---------|----------------|-----------------| | |
| | EXCEPTIONAL | ε ≤ 0.01 | Outstanding privacy | Suitable for highly sensitive data | | |
| | VERY LOW | 0.01 < ε ≤ 0.1 | Excellent privacy | Good for most enterprise use | | |
| | LOW | 0.1 < ε ≤ 0.5 | Acceptable privacy | Monitor for sensitive applications | | |
| | MEDIUM | 0.5 < ε ≤ 1.0 | Moderate risk | Consider additional measures | | |
| | HIGH | 1.0 < ε ≤ 2.0 | High risk | Review methodology | | |
| | VERY HIGH | 2.0 < ε ≤ 5.0 | Very high risk | Additional privacy required | | |
| | CRITICAL | ε > 5.0 | Critical risk | Immediate action required | | |
| ### Configuration Guide | |
| #### Distance Metrics | |
| - **Euclidean**: Best for continuous numerical data | |
| - **Manhattan**: Robust to outliers, good for mixed data | |
| - **Cosine**: Ideal for high-dimensional sparse data | |
| #### Preprocessing Options | |
| - **One-Hot Encoding**: Creates binary features (recommended for <50 categories) | |
| - **Label Encoding**: Assigns integer codes (memory efficient) | |
| - **Standard Scaling**: Zero mean, unit variance (recommended) | |
| - **Min-Max Scaling**: Scale to [0,1] range | |
| - **Robust Scaling**: Uses median and IQR (outlier resistant) | |
| ### Best Practices | |
| 1. **Data Preparation**: Ensure CSV format with headers, similar structure between datasets | |
| 2. **Memory Management**: Use subsampling for datasets >100K samples | |
| 3. **Configuration**: Start with defaults, adjust based on your data characteristics | |
| 4. **Interpretation**: Review both statistical results and actionable recommendations | |
| ### Support | |
| - Review error logs for troubleshooting | |
| - Check preprocessing reports for data quality issues | |
| - Ensure data meets format requirements | |
| """) | |
| # Event handlers with enhanced error feedback - FIXED OUTPUT COUNT | |
| def update_status_and_run(*args): | |
| """Update status during audit execution""" | |
| try: | |
| # Update status to running | |
| yield ( | |
| gr.update(value="Audit in progress. Processing your datasets and running privacy analysis."), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False) | |
| ) | |
| # Run the actual audit | |
| result = run_enhanced_audit(*args) | |
| # Update status based on result | |
| if result[0] and "error" not in result[0]: | |
| risk_level = result[0].get("privacy_assessment", {}).get("risk_level", "UNKNOWN") | |
| epsilon = result[0].get("privacy_assessment", {}).get("primary_epsilon", 0) | |
| status_msg = f"Audit completed successfully.\n\nRisk Level: {risk_level}\nEpsilon-DP Bound: {epsilon:.6f}" | |
| else: | |
| error_msg = result[0].get("error", "Unknown error") if result[0] else "Unknown error" | |
| status_msg = f"Audit failed: {error_msg}" | |
| # Make export visible if successful | |
| export_visible = result[4] is not None | |
| yield ( | |
| gr.update(value=status_msg), | |
| result[0], # audit_results | |
| result[1], # distance_plot | |
| result[2], # epsilon_plot | |
| result[3], # audit_report | |
| gr.update(value=result[4], visible=export_visible) if export_visible else gr.update(visible=False) | |
| ) | |
| except Exception as e: | |
| error_msg = f"Interface error: {str(e)}" | |
| logger.error(error_msg) | |
| yield ( | |
| gr.update(value=f"Interface Error: {error_msg}"), | |
| {"error": error_msg}, None, None, f"Error: {error_msg}", gr.update(visible=False) | |
| ) | |
| # Connect the interface - FIXED: Now returns 6 outputs | |
| run_btn.click( | |
| fn=update_status_and_run, | |
| inputs=[ | |
| real_file, synth_file, confidence, subsample_size, | |
| categorical_encoding, numerical_scaling, distance_metric | |
| ], | |
| outputs=[ | |
| audit_status, audit_results, distance_plot, epsilon_plot, audit_report, export_btn | |
| ] | |
| ) | |
| return demo | |
| # Launch the application | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("Creating enhanced Gradio interface...") | |
| demo = create_enhanced_interface() | |
| logger.info("Launching Privacy Auditor application...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to launch application: {e}") | |
| logger.error(traceback.format_exc()) | |
| print(f"Application failed to start: {e}") |