Privacy_Auditor / app.py
LeonceNsh's picture
Upload folder using huggingface_hub
e287ecc verified
import gradio as gr
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.special import gammaln
import plotly.graph_objects as go
from datetime import datetime
import json
import io
import zipfile
from typing import Dict, List, Tuple, Optional, Any
import logging
import traceback
from dataclasses import dataclass, asdict
from pathlib import Path
import warnings
import uuid
import os
import sys
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
warnings.filterwarnings('ignore')
def setup_logging():
"""Configure comprehensive logging system"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
formatters = {
'detailed': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'),
'simple': logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
}
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
handlers = [
(logging.StreamHandler(sys.stdout), logging.INFO, formatters['simple']),
(logging.FileHandler(log_dir / 'privacy_audit_detailed.log'), logging.DEBUG, formatters['detailed']),
(logging.FileHandler(log_dir / 'privacy_audit_errors.log'), logging.ERROR, formatters['detailed'])
]
for handler, level, formatter in handlers:
handler.setLevel(level)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
return logging.getLogger(__name__)
logger = setup_logging()
logger.info(f"Privacy Auditor Starting - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
@dataclass
class AuditConfig:
"""Enterprise configuration for privacy audit"""
confidence_level: float = 0.95
subsample_size: Optional[int] = None
categorical_encoding: str = "onehot"
numerical_scaling: str = "standard"
distance_metric: str = "euclidean"
enable_preprocessing_report: bool = True
max_file_size_mb: int = 500
timeout_seconds: int = 300
enable_data_validation: bool = True
chunk_size: int = 10000
max_categories_onehot: int = 50
def validate(self) -> List[str]:
"""Validate configuration parameters"""
validations = [
(0.5 <= self.confidence_level <= 0.999, "Confidence level must be between 0.5 and 0.999"),
(self.subsample_size is None or self.subsample_size >= 100, "Subsample size must be at least 100 if specified"),
(self.max_file_size_mb >= 1, "Max file size must be at least 1 MB"),
(self.timeout_seconds >= 10, "Timeout must be at least 10 seconds")
]
try:
return [msg for valid, msg in validations if not valid]
except Exception as e:
logger.error(f"Configuration validation error: {e}")
return [f"Configuration validation failed: {str(e)}"]
class SafeDataProcessor:
"""Safe data processing with comprehensive error handling"""
SUPPORTED_ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']
@classmethod
def safe_read_csv(cls, file_path: str, max_rows: int = None) -> Tuple[Optional[pd.DataFrame], str]:
"""Safely read CSV file with error handling"""
try:
if not os.path.exists(file_path):
return None, f"File not found: {file_path}"
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
logger.info(f"Reading CSV: {file_path} ({file_size_mb:.2f} MB)")
for encoding in cls.SUPPORTED_ENCODINGS:
try:
df = pd.read_csv(file_path, encoding=encoding, nrows=max_rows, low_memory=False)
logger.info(f"Loaded dataset: {df.shape[0]} rows, {df.shape[1]} columns ({encoding})")
return df, ""
except (UnicodeDecodeError, Exception) as e:
if encoding == cls.SUPPORTED_ENCODINGS[-1]:
logger.warning(f"All encodings failed, last error: {e}")
continue
return None, "Failed to read file with any supported encoding"
except Exception as e:
error_msg = f"Error reading CSV file: {str(e)}"
logger.error(error_msg)
return None, error_msg
@staticmethod
def safe_json_convert(obj: Any) -> Any:
"""Convert object to JSON-serializable format"""
conversion_map = {
np.integer: int,
np.floating: float,
np.ndarray: lambda x: x.tolist(),
pd.Series: lambda x: x.to_dict(),
pd.DataFrame: lambda x: x.to_dict()
}
try:
for obj_type, converter in conversion_map.items():
if isinstance(obj, obj_type):
return converter(obj)
if hasattr(obj, 'dtype'):
dtype_str = str(obj.dtype)
if 'int' in dtype_str:
return int(obj)
elif 'float' in dtype_str:
return float(obj)
return str(obj)
except Exception as e:
logger.warning(f"JSON conversion failed for {type(obj)}: {e}")
return str(obj)
class DataValidator:
"""Enhanced data validation with detailed reporting"""
@classmethod
def validate_dataframe(cls, df: pd.DataFrame, name: str) -> Dict[str, Any]:
"""Comprehensive dataframe validation"""
logger.info(f"Validating dataframe: {name}")
if df.empty:
return cls._empty_dataframe_result(name)
issues, warnings_list = [], []
if len(df) < 10:
warnings_list.append(f"{name}: Very small dataset ({len(df)} rows)")
try:
memory_mb = cls._calculate_memory_usage(df)
cls._validate_columns(df, name, warnings_list)
cls._check_data_quality(df, name, warnings_list)
column_types = cls._analyze_column_types(df)
return cls._build_validation_result(df, issues, warnings_list, memory_mb, column_types)
except Exception as e:
return cls._handle_validation_error(e, name, df, warnings_list)
@staticmethod
def _empty_dataframe_result(name: str) -> Dict[str, Any]:
return {
"valid": False,
"errors": [f"{name}: Dataset is empty"],
"warnings": [],
"shape": (0, 0),
"memory_usage_mb": 0
}
@staticmethod
def _calculate_memory_usage(df: pd.DataFrame) -> float:
try:
return df.memory_usage(deep=True).sum() / (1024 * 1024)
except Exception:
return 0.0
@staticmethod
def _validate_columns(df: pd.DataFrame, name: str, warnings_list: List[str]):
for col in df.columns:
try:
if df[col].dtype == 'object':
sample_data = df[col].dropna().head(100)
if len(sample_data) > 0:
sample_types = set(type(x).__name__ for x in sample_data)
if len(sample_types) > 2:
warnings_list.append(f"{name}: Column '{col}' has mixed data types")
if df[col].nunique() <= 1:
warnings_list.append(f"{name}: Column '{col}' is constant")
except Exception as e:
warnings_list.append(f"{name}: Error analyzing column '{col}': {str(e)}")
@staticmethod
def _check_data_quality(df: pd.DataFrame, name: str, warnings_list: List[str]):
try:
missing_pct = (df.isnull().sum() / len(df)) * 100
high_missing = missing_pct[missing_pct > 50]
if not high_missing.empty:
warnings_list.append(f"{name}: High missing values: {high_missing.to_dict()}")
except Exception as e:
warnings_list.append(f"{name}: Error checking missing values: {str(e)}")
try:
duplicates = df.duplicated().sum()
if duplicates > len(df) * 0.1:
warnings_list.append(f"{name}: High duplicate rows ({duplicates})")
except Exception as e:
warnings_list.append(f"{name}: Error checking duplicates: {str(e)}")
@staticmethod
def _analyze_column_types(df: pd.DataFrame) -> Dict[str, int]:
try:
return {str(k): int(v) for k, v in df.dtypes.value_counts().to_dict().items()}
except Exception:
return {}
@staticmethod
def _build_validation_result(df: pd.DataFrame, issues: List[str], warnings_list: List[str],
memory_mb: float, column_types: Dict[str, int]) -> Dict[str, Any]:
return {
"valid": len(issues) == 0,
"errors": issues,
"warnings": warnings_list,
"shape": df.shape,
"memory_usage_mb": memory_mb,
"column_types": column_types,
"null_counts": SafeDataProcessor.safe_json_convert(df.isnull().sum().to_dict())
}
@staticmethod
def _handle_validation_error(e: Exception, name: str, df: pd.DataFrame,
warnings_list: List[str]) -> Dict[str, Any]:
error_msg = f"Validation failed for {name}: {str(e)}"
logger.error(error_msg)
return {
"valid": False,
"errors": [error_msg],
"warnings": warnings_list,
"shape": df.shape if hasattr(df, 'shape') else (0, 0),
"memory_usage_mb": 0,
"column_types": {}
}
class EnhancedPrivacyAuditor:
"""Enhanced privacy auditor with comprehensive error handling and logging"""
def __init__(self, config: AuditConfig = None):
self.config = config or AuditConfig()
self.audit_history = []
self.session_id = str(uuid.uuid4())[:8]
self.current_audit_id = None
logger.info(f"Initialized Privacy Auditor - Session: {self.session_id}")
logger.info(f"Configuration: {asdict(self.config)}")
def validate_inputs(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]:
"""Comprehensive input validation with enhanced error handling"""
logger.info("Starting comprehensive input validation")
try:
validator = DataValidator()
# Validate individual datasets
real_validation = validator.validate_dataframe(real_data, "Real Dataset")
synth_validation = validator.validate_dataframe(synthetic_data, "Synthetic Dataset")
all_errors = real_validation["errors"] + synth_validation["errors"]
all_warnings = real_validation["warnings"] + synth_validation["warnings"]
# Cross-dataset validation
if real_validation["valid"] and synth_validation["valid"]:
try:
real_cols = set(real_data.columns)
synth_cols = set(synthetic_data.columns)
missing_in_synth = real_cols - synth_cols
missing_in_real = synth_cols - real_cols
if missing_in_synth:
all_warnings.append(f"Columns missing in synthetic data: {list(missing_in_synth)}")
if missing_in_real:
all_warnings.append(f"Extra columns in synthetic data: {list(missing_in_real)}")
# Check data type compatibility
common_cols = real_cols & synth_cols
for col in common_cols:
try:
real_type = real_data[col].dtype
synth_type = synthetic_data[col].dtype
if real_type != synth_type:
all_warnings.append(f"Type mismatch in column '{col}': {real_type} vs {synth_type}")
except Exception as e:
all_warnings.append(f"Error checking column '{col}': {str(e)}")
except Exception as e:
all_warnings.append(f"Cross-validation error: {str(e)}")
result = {
"valid": len(all_errors) == 0,
"errors": all_errors,
"warnings": all_warnings,
"real_dataset": real_validation,
"synthetic_dataset": synth_validation
}
logger.info(f"Validation completed - Valid: {result['valid']}, Errors: {len(all_errors)}, Warnings: {len(all_warnings)}")
return result
except Exception as e:
error_msg = f"Input validation failed: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
return {
"valid": False,
"errors": [error_msg],
"warnings": [],
"real_dataset": {"valid": False, "errors": [error_msg]},
"synthetic_dataset": {"valid": False, "errors": [error_msg]}
}
def safe_preprocess_data(self, df: pd.DataFrame, is_real: bool = True) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""Enhanced data preprocessing with comprehensive error handling"""
dataset_type = "real" if is_real else "synthetic"
logger.info(f"Starting preprocessing for {dataset_type} dataset")
report = {
"dataset_type": dataset_type,
"original_shape": df.shape,
"start_time": datetime.now().isoformat(),
"success": False,
"steps_completed": []
}
try:
# Create working copy
df_processed = df.copy()
report["steps_completed"].append("data_copy")
# Handle missing values
try:
missing_counts = df_processed.isnull().sum()
if missing_counts.any():
logger.info(f"Handling missing values in {len(missing_counts[missing_counts > 0])} columns")
for col in df_processed.columns:
if missing_counts[col] > 0:
try:
if pd.api.types.is_numeric_dtype(df_processed[col]):
fill_value = df_processed[col].median()
if pd.isna(fill_value):
fill_value = 0
else:
mode_values = df_processed[col].mode()
fill_value = mode_values[0] if len(mode_values) > 0 else 'unknown'
df_processed[col].fillna(fill_value, inplace=True)
except Exception as e:
logger.warning(f"Failed to fill missing values in column '{col}': {e}")
df_processed[col].fillna('unknown', inplace=True)
report["missing_values_handled"] = SafeDataProcessor.safe_json_convert(missing_counts[missing_counts > 0].to_dict())
report["steps_completed"].append("missing_values")
except Exception as e:
logger.error(f"Missing value handling failed: {e}")
report["errors"] = report.get("errors", []) + [f"Missing value handling: {str(e)}"]
# Identify column types
try:
numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df_processed.select_dtypes(exclude=[np.number]).columns.tolist()
logger.info(f"Identified {len(numerical_cols)} numerical and {len(categorical_cols)} categorical columns")
report["numerical_columns"] = numerical_cols
report["categorical_columns"] = categorical_cols
report["steps_completed"].append("column_identification")
except Exception as e:
logger.error(f"Column type identification failed: {e}")
numerical_cols = []
categorical_cols = list(df_processed.columns)
report["errors"] = report.get("errors", []) + [f"Column identification: {str(e)}"]
# Handle categorical encoding
if categorical_cols and self.config.categorical_encoding != "none":
try:
logger.info(f"Applying {self.config.categorical_encoding} encoding to categorical columns")
if self.config.categorical_encoding == "onehot":
# Limit categories to prevent explosion
for col in categorical_cols[:]: # Copy list to modify during iteration
try:
unique_count = df_processed[col].nunique()
if unique_count > self.config.max_categories_onehot:
logger.warning(f"Column '{col}' has {unique_count} categories, limiting to top {self.config.max_categories_onehot - 1}")
top_categories = df_processed[col].value_counts().head(self.config.max_categories_onehot - 1).index
df_processed[col] = df_processed[col].apply(
lambda x: x if x in top_categories else 'other'
)
except Exception as e:
logger.warning(f"Error processing column '{col}': {e}")
categorical_cols.remove(col)
if categorical_cols: # Only if we have categorical columns left
df_processed = pd.get_dummies(
df_processed,
columns=categorical_cols,
prefix=categorical_cols,
drop_first=True,
dummy_na=True
)
elif self.config.categorical_encoding == "label":
for col in categorical_cols:
try:
# Simple label encoding
unique_vals = df_processed[col].unique()
label_map = {val: idx for idx, val in enumerate(unique_vals)}
df_processed[col] = df_processed[col].map(label_map)
except Exception as e:
logger.warning(f"Label encoding failed for column '{col}': {e}")
# Fallback to categorical codes
df_processed[col] = pd.Categorical(df_processed[col]).codes
report["categorical_encoding_applied"] = self.config.categorical_encoding
report["steps_completed"].append("categorical_encoding")
except Exception as e:
logger.error(f"Categorical encoding failed: {e}")
# Fallback to simple codes
for col in categorical_cols:
try:
df_processed[col] = pd.Categorical(df_processed[col]).codes
except Exception:
df_processed[col] = 0
report["categorical_encoding_fallback"] = "categorical_codes"
report["errors"] = report.get("errors", []) + [f"Categorical encoding: {str(e)}"]
# Update numerical columns after encoding
try:
numerical_cols = df_processed.select_dtypes(include=[np.number]).columns.tolist()
logger.info(f"After encoding: {len(numerical_cols)} numerical columns")
except Exception:
numerical_cols = []
# Handle numerical scaling
if numerical_cols and self.config.numerical_scaling != "none":
try:
logger.info(f"Applying {self.config.numerical_scaling} scaling to numerical columns")
# Simple scaling implementations to avoid sklearn dependency
if self.config.numerical_scaling == "standard":
for col in numerical_cols:
try:
mean_val = df_processed[col].mean()
std_val = df_processed[col].std()
if std_val > 0:
df_processed[col] = (df_processed[col] - mean_val) / std_val
except Exception as e:
logger.warning(f"Standard scaling failed for column '{col}': {e}")
elif self.config.numerical_scaling == "minmax":
for col in numerical_cols:
try:
min_val = df_processed[col].min()
max_val = df_processed[col].max()
if max_val > min_val:
df_processed[col] = (df_processed[col] - min_val) / (max_val - min_val)
except Exception as e:
logger.warning(f"MinMax scaling failed for column '{col}': {e}")
elif self.config.numerical_scaling == "robust":
for col in numerical_cols:
try:
median_val = df_processed[col].median()
q75 = df_processed[col].quantile(0.75)
q25 = df_processed[col].quantile(0.25)
iqr = q75 - q25
if iqr > 0:
df_processed[col] = (df_processed[col] - median_val) / iqr
except Exception as e:
logger.warning(f"Robust scaling failed for column '{col}': {e}")
report["numerical_scaling_applied"] = self.config.numerical_scaling
report["steps_completed"].append("numerical_scaling")
except Exception as e:
logger.error(f"Numerical scaling failed: {e}")
report["errors"] = report.get("errors", []) + [f"Numerical scaling: {str(e)}"]
# Final cleanup
try:
# Replace infinite values
df_processed = df_processed.replace([np.inf, -np.inf], np.nan)
# Fill remaining NaN values
df_processed = df_processed.fillna(0)
# Ensure all data is numeric
for col in df_processed.columns:
if not pd.api.types.is_numeric_dtype(df_processed[col]):
try:
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce')
df_processed[col] = df_processed[col].fillna(0)
except Exception:
df_processed[col] = 0
report["steps_completed"].append("final_cleanup")
except Exception as e:
logger.error(f"Final cleanup failed: {e}")
report["errors"] = report.get("errors", []) + [f"Final cleanup: {str(e)}"]
report.update({
"final_shape": df_processed.shape,
"processing_completed": datetime.now().isoformat(),
"success": True
})
logger.info(f"Preprocessing completed successfully for {dataset_type} dataset: {df_processed.shape}")
return df_processed, report
except Exception as e:
error_msg = f"Preprocessing failed for {dataset_type} dataset: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
report.update({
"error": error_msg,
"processing_completed": datetime.now().isoformat(),
"success": False
})
return df, report
def safe_compute_distances(self, X: np.ndarray, S: np.ndarray) -> Optional[np.ndarray]:
"""Safe distance computation with memory management"""
logger.info(f"Computing {self.config.distance_metric} distances for {len(X)}x{len(S)} matrix")
try:
# Memory check
estimated_memory = (len(X) * len(S) * 8) / (1024 ** 3) # GB
logger.info(f"Estimated memory requirement: {estimated_memory:.2f} GB")
if estimated_memory > 4: # > 4GB
logger.info("Using chunked computation for large distance matrix")
return self._chunked_distance_computation(X, S)
else:
return self._direct_distance_computation(X, S)
except Exception as e:
logger.error(f"Distance computation failed: {e}")
logger.error(traceback.format_exc())
return None
def _direct_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""Direct distance computation for smaller datasets"""
try:
if self.config.distance_metric == "euclidean":
return distance_matrix(X, S)
elif self.config.distance_metric == "manhattan":
return distance_matrix(X, S, p=1)
elif self.config.distance_metric == "cosine":
# Manual cosine distance to avoid sklearn dependency
X_norm = X / (np.linalg.norm(X, axis=1, keepdims=True) + 1e-10)
S_norm = S / (np.linalg.norm(S, axis=1, keepdims=True) + 1e-10)
cosine_sim = np.dot(X_norm, S_norm.T)
return 1 - cosine_sim
else:
return distance_matrix(X, S)
except Exception as e:
logger.error(f"Direct distance computation failed: {e}")
raise
def _chunked_distance_computation(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""Chunked distance computation for large datasets"""
try:
chunk_size = min(self.config.chunk_size, len(X))
distances = []
for i in range(0, len(X), chunk_size):
end_idx = min(i + chunk_size, len(X))
chunk_X = X[i:end_idx]
logger.debug(f"Processing chunk {i//chunk_size + 1}/{(len(X)-1)//chunk_size + 1}")
chunk_dist = self._direct_distance_computation(chunk_X, S)
distances.append(chunk_dist)
return np.vstack(distances)
except Exception as e:
logger.error(f"Chunked distance computation failed: {e}")
raise
def safe_compute_epsilon(self, m: int, n: int, d: int, v: float, p: float = 0.05) -> float:
"""Safe epsilon computation with enhanced error handling"""
try:
# Input validation
if any(x <= 0 for x in [m, n, d]) or v < 0 or not 0 < p < 1:
logger.warning(f"Invalid epsilon parameters: m={m}, n={n}, d={d}, v={v}, p={p}")
return 0.0
# Handle edge cases
if v == 0:
logger.warning("Distance sum is zero, returning zero epsilon")
return 0.0
# Compute with numerical stability
try:
log_gamma_term = gammaln(d/2) - gammaln(d)
log_md_factorial = gammaln(m * d + 1)
log_top_terms = (np.log(p) + log_md_factorial) / m
log_bottom_terms = (
np.log(2) + (d / 2) * np.log(np.pi) +
np.log(n) + d * np.log(v)
)
eps_lower = log_gamma_term + log_top_terms - log_bottom_terms
except Exception as e:
logger.warning(f"Epsilon computation numerical error: {e}")
return 0.0
# Ensure result is valid
if not np.isfinite(eps_lower):
logger.warning("Non-finite epsilon computed")
return 0.0
result = float(max(0, eps_lower))
logger.debug(f"Computed epsilon: {result} for confidence {1-p}")
return result
except Exception as e:
logger.error(f"Epsilon computation failed: {e}")
return 0.0
def run_comprehensive_audit(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame) -> Dict[str, Any]:
"""Main audit function with comprehensive error handling and timeout"""
self.current_audit_id = str(uuid.uuid4())[:12]
start_time = datetime.now()
logger.info(f"Starting comprehensive audit - ID: {self.current_audit_id}")
def audit_worker():
try:
# Input validation
logger.info("Step 1/7: Input validation")
validation_result = self.validate_inputs(real_data, synthetic_data)
if not validation_result["valid"]:
return {
"audit_id": self.current_audit_id,
"error": "Input validation failed",
"validation_errors": validation_result["errors"],
"validation_warnings": validation_result["warnings"],
"timestamp": start_time.isoformat(),
"step_failed": "input_validation"
}
# Preprocessing
logger.info("Step 2/7: Data preprocessing")
X_processed, real_report = self.safe_preprocess_data(real_data, is_real=True)
S_processed, synth_report = self.safe_preprocess_data(synthetic_data, is_real=False)
if not real_report["success"] or not synth_report["success"]:
return {
"audit_id": self.current_audit_id,
"error": "Data preprocessing failed",
"preprocessing_reports": {"real": real_report, "synthetic": synth_report},
"timestamp": start_time.isoformat(),
"step_failed": "preprocessing"
}
# Align columns
logger.info("Step 3/7: Column alignment")
try:
common_cols = list(set(X_processed.columns) & set(S_processed.columns))
if len(common_cols) == 0:
return {
"audit_id": self.current_audit_id,
"error": "No common columns between datasets after preprocessing",
"timestamp": start_time.isoformat(),
"step_failed": "column_alignment"
}
X_processed = X_processed[common_cols].sort_index(axis=1)
S_processed = S_processed[common_cols].sort_index(axis=1)
logger.info(f"Using {len(common_cols)} common columns")
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Column alignment failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "column_alignment"
}
# Convert to numpy arrays
logger.info("Step 4/7: Array conversion")
try:
X = X_processed.astype(np.float64).values
S = S_processed.astype(np.float64).values
# Validate arrays
if not np.isfinite(X).all():
logger.warning("Non-finite values in real data, cleaning...")
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
if not np.isfinite(S).all():
logger.warning("Non-finite values in synthetic data, cleaning...")
S = np.nan_to_num(S, nan=0.0, posinf=0.0, neginf=0.0)
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Array conversion failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "array_conversion"
}
# Subsampling
logger.info("Step 5/7: Subsampling (if needed)")
original_m = len(X)
if self.config.subsample_size and len(X) > self.config.subsample_size:
try:
np.random.seed(42)
indices = np.random.choice(len(X), self.config.subsample_size, replace=False)
X = X[indices]
logger.info(f"Subsampled real data from {original_m} to {len(X)} samples")
except Exception as e:
logger.warning(f"Subsampling failed: {e}, using full dataset")
m, d = X.shape
n = len(S)
logger.info(f"Final dataset sizes - Real: {m}x{d}, Synthetic: {n}x{d}")
# Distance computation
logger.info("Step 6/7: Distance computation")
dist_matrix = self.safe_compute_distances(X, S)
if dist_matrix is None:
return {
"audit_id": self.current_audit_id,
"error": "Distance computation failed",
"timestamp": start_time.isoformat(),
"step_failed": "distance_computation"
}
# Compute statistics
logger.info("Step 7/7: Statistical analysis")
try:
nearest_distances = np.min(dist_matrix, axis=1)
v = np.sum(nearest_distances)
# Epsilon bounds for multiple confidence levels
confidence_levels = [0.90, 0.95, 0.99]
epsilon_bounds = {}
for conf in confidence_levels:
p = 1 - conf
eps_lb = self.safe_compute_epsilon(m, n, d, v, p)
epsilon_bounds[f"eps_lb_{int(conf*100)}"] = eps_lb
# Distance statistics
distance_stats = {
"mean_nearest_distance": float(np.mean(nearest_distances)),
"median_nearest_distance": float(np.median(nearest_distances)),
"std_nearest_distance": float(np.std(nearest_distances)),
"min_nearest_distance": float(np.min(nearest_distances)),
"max_nearest_distance": float(np.max(nearest_distances)),
"q25_nearest_distance": float(np.percentile(nearest_distances, 25)),
"q75_nearest_distance": float(np.percentile(nearest_distances, 75)),
"distance_sum": float(v),
"zero_distance_count": int(np.sum(nearest_distances == 0)),
"small_distance_count": int(np.sum(nearest_distances < 1e-6))
}
# Risk assessment
primary_epsilon = epsilon_bounds["eps_lb_95"]
risk_level = self.assess_privacy_risk(primary_epsilon)
except Exception as e:
return {
"audit_id": self.current_audit_id,
"error": f"Statistical analysis failed: {str(e)}",
"timestamp": start_time.isoformat(),
"step_failed": "statistical_analysis"
}
# Compile results
duration = (datetime.now() - start_time).total_seconds()
result = {
"audit_id": self.current_audit_id,
"session_id": self.session_id,
"success": True,
"audit_metadata": {
"timestamp": start_time.isoformat(),
"duration_seconds": round(duration, 2),
"distance_metric": self.config.distance_metric,
"configuration": asdict(self.config)
},
"dataset_info": {
"real_samples_original": original_m,
"real_samples_used": m,
"synthetic_samples": n,
"dimensions": d,
"common_features": len(common_cols),
"subsampling_applied": self.config.subsample_size is not None and original_m > m
},
"preprocessing_reports": {
"real_dataset": real_report,
"synthetic_dataset": synth_report
},
"validation_result": validation_result,
"epsilon_bounds": epsilon_bounds,
"distance_statistics": distance_stats,
"privacy_assessment": {
"risk_level": risk_level,
"primary_epsilon": primary_epsilon,
"interpretation": self.get_risk_interpretation(risk_level, primary_epsilon),
"recommendations": self.get_recommendations(risk_level, distance_stats)
},
"data_quality": {
"potential_memorization": distance_stats["zero_distance_count"] > 0,
"very_close_matches": distance_stats["small_distance_count"],
"distance_distribution_skew": self._safe_compute_skewness(nearest_distances)
}
}
self.audit_history.append(result)
logger.info(f"Audit completed successfully - ID: {self.current_audit_id}, Risk: {risk_level}, Duration: {duration:.2f}s")
return result
except Exception as e:
error_msg = f"Unexpected audit error: {str(e)}"
logger.error(f"Audit failed - ID: {self.current_audit_id}: {error_msg}")
logger.error(traceback.format_exc())
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"traceback": traceback.format_exc(),
"step_failed": "unexpected_error"
}
# Run with timeout
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(audit_worker)
result = future.result(timeout=self.config.timeout_seconds)
return result
except FutureTimeoutError:
error_msg = f"Audit timed out after {self.config.timeout_seconds} seconds"
logger.error(error_msg)
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"step_failed": "timeout"
}
except Exception as e:
error_msg = f"Audit execution failed: {str(e)}"
logger.error(error_msg)
return {
"audit_id": self.current_audit_id,
"error": error_msg,
"timestamp": start_time.isoformat(),
"step_failed": "execution_error"
}
def _safe_compute_skewness(self, data: np.ndarray) -> float:
"""Safely compute skewness"""
try:
if len(data) < 3:
return 0.0
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0.0
skewness = np.mean(((data - mean) / std) ** 3)
return float(skewness) if np.isfinite(skewness) else 0.0
except Exception as e:
logger.warning(f"Skewness computation failed: {e}")
return 0.0
def assess_privacy_risk(self, epsilon: float) -> str:
"""Enhanced privacy risk assessment"""
risk_thresholds = [
(0.01, "EXCEPTIONAL"),
(0.1, "VERY LOW"),
(0.5, "LOW"),
(1.0, "MEDIUM"),
(2.0, "HIGH"),
(5.0, "VERY HIGH")
]
try:
for threshold, level in risk_thresholds:
if epsilon <= threshold:
return level
return "CRITICAL"
except Exception:
return "UNKNOWN"
def get_risk_interpretation(self, risk_level: str, epsilon: float) -> str:
"""Detailed risk interpretation"""
interpretations = {
"EXCEPTIONAL": "Outstanding privacy preservation. Suitable for highly sensitive applications.",
"VERY LOW": "Excellent privacy preservation. Strong guarantees for most sensitive data.",
"LOW": "Good privacy preservation. Acceptable for most commercial applications.",
"MEDIUM": "Moderate privacy risk. Consider additional privacy-enhancing techniques.",
"HIGH": "High privacy risk. Significant leakage detected. Review methodology.",
"VERY HIGH": "Very high privacy risk. Additional privacy measures strongly recommended.",
"CRITICAL": "Critical privacy risk. Synthetic data not suitable for production use."
}
try:
base_msg = interpretations.get(risk_level, "Unknown risk level")
return f"{base_msg} (ε = {epsilon:.6f})"
except Exception as e:
logger.warning(f"Risk interpretation failed: {e}")
return f"Risk interpretation unavailable (ε = {epsilon:.6f})"
def get_recommendations(self, risk_level: str, distance_stats: Dict[str, Any]) -> List[str]:
"""Generate actionable recommendations"""
try:
recommendations = []
risk_actions = {
"HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production",
"VERY HIGH": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production",
"CRITICAL": "IMMEDIATE ACTION REQUIRED: Privacy risk unacceptable for production"
}
if risk_level in risk_actions:
recommendations.extend([
risk_actions[risk_level],
"Consider stronger privacy-preserving methods (DP-SGD, PATE)",
"Reduce model capacity or increase privacy budget",
"Review data preprocessing and feature selection"
])
self._add_distance_recommendations(recommendations, distance_stats)
if risk_level in ["EXCEPTIONAL", "VERY LOW", "LOW"]:
recommendations.append("Privacy level acceptable for most production applications")
return recommendations or ["Review detailed analysis for specific insights"]
except Exception as e:
logger.warning(f"Recommendations generation failed: {e}")
return ["Could not generate recommendations due to analysis error"]
def _add_distance_recommendations(self, recommendations: List[str], distance_stats: Dict[str, Any]):
"""Add distance-based recommendations"""
zero_distances = distance_stats.get("zero_distance_count", 0)
small_distances = distance_stats.get("small_distance_count", 0)
if zero_distances > 0:
recommendations.append(f"WARNING: {zero_distances} exact matches - potential memorization")
if small_distances > zero_distances:
close_matches = small_distances - zero_distances
recommendations.append(f"REVIEW: {close_matches} close matches - check near-memorization")
# Global auditor instance
try:
auditor = EnhancedPrivacyAuditor()
logger.info("Privacy auditor initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize privacy auditor: {e}")
auditor = None
def create_safe_distance_plot(result: Dict[str, Any]) -> go.Figure:
"""Create enhanced privacy audit dashboard with improved data visualization"""
try:
if "error" in result:
return _create_error_figure(result)
return _create_comprehensive_dashboard(result)
except Exception as e:
logger.error(f"Distance plot creation failed: {e}")
return _create_error_figure({"error": str(e)})
def _create_error_figure(result: Dict[str, Any]) -> go.Figure:
"""Create error visualization with clear messaging"""
fig = go.Figure()
fig.add_annotation(
text=f"<b>Audit Error</b><br>{result.get('error', 'Unknown error')}<br><span style='font-size:12px'>Step: {result.get('step_failed', 'Unknown')}</span>",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="#dc3545"),
align="center",
bgcolor="rgba(220, 53, 69, 0.1)",
bordercolor="#dc3545",
borderwidth=2
)
fig.update_layout(
title="Privacy Audit Failed",
xaxis=dict(visible=False),
yaxis=dict(visible=False),
plot_bgcolor="white",
paper_bgcolor="white"
)
return fig
def _create_comprehensive_dashboard(result: Dict[str, Any]) -> go.Figure:
"""Create simplified privacy dashboard focused on key metrics"""
from plotly.subplots import make_subplots
# Simplified 2x2 layout focusing on essential information
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
"Distance Statistics",
"Privacy Risk Level",
"Data Quality Assessment",
"Key Metrics Summary"
),
specs=[
[{"type": "bar"}, {"type": "indicator"}],
[{"type": "bar"}, {"type": "table"}]
],
vertical_spacing=0.2,
horizontal_spacing=0.15
)
_add_simplified_distance_analysis(fig, result)
_add_simplified_risk_assessment(fig, result)
_add_simplified_quality_assessment(fig, result)
_add_key_metrics_table(fig, result)
# Clean, minimal layout
fig.update_layout(
title={
"text": "Privacy Audit Results",
"x": 0.5,
"xanchor": "center",
"font": {"size": 18, "color": "#000000"}
},
height=600,
showlegend=False,
plot_bgcolor="white",
paper_bgcolor="white",
font=dict(family="Arial, sans-serif", size=12, color="#000000"),
margin=dict(t=80, b=50, l=60, r=60)
)
return fig
def _add_simplified_distance_analysis(fig, result: Dict[str, Any]):
"""Add simplified distance analysis focusing on key metrics"""
stats = result.get("distance_statistics", {})
# Focus on most important metrics
metrics = ["Mean", "Median", "Max"]
values = [
stats.get("mean_nearest_distance", 0),
stats.get("median_nearest_distance", 0),
stats.get("max_nearest_distance", 0)
]
# Use simple, accessible colors
fig.add_trace(
go.Bar(
x=metrics,
y=values,
marker_color='#2563eb',
marker_line=dict(color='#1e40af', width=1),
text=[f"{v:.4f}" if v > 0 else "0.0000" for v in values],
textposition='outside',
textfont=dict(size=11, color="#000000"),
hovertemplate="<b>%{x}</b><br>%{y:.6f}<extra></extra>",
showlegend=False
),
row=1, col=1
)
fig.update_xaxes(title_text="Distance Metric", row=1, col=1, title_font_size=12)
fig.update_yaxes(title_text="Value", row=1, col=1, title_font_size=12)
def _add_simplified_risk_assessment(fig, result: Dict[str, Any]):
"""Add simplified risk assessment indicator"""
risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN")
epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0)
# Simple risk color mapping
risk_colors = {
"EXCEPTIONAL": "#059669", "VERY LOW": "#059669", "LOW": "#0891b2",
"MEDIUM": "#ea580c", "HIGH": "#dc2626", "VERY HIGH": "#dc2626",
"CRITICAL": "#991b1b", "UNKNOWN": "#6b7280"
}
fig.add_trace(
go.Indicator(
mode="number+gauge",
value=epsilon,
title={
"text": f"Privacy Risk: {risk_level}<br>Epsilon Value",
"font": {"size": 14, "color": "#000000"}
},
number={"font": {"size": 20, "color": risk_colors.get(risk_level, "#6b7280")}},
gauge={
"axis": {"range": [0, 5], "tickcolor": "#000000"},
"bar": {"color": risk_colors.get(risk_level, "#6b7280")},
"bgcolor": "white",
"bordercolor": "#d1d5db",
"borderwidth": 2
}
),
row=1, col=2
)
def _add_simplified_quality_assessment(fig, result: Dict[str, Any]):
"""Add simplified quality assessment"""
stats = result.get("distance_statistics", {})
total_samples = result.get("dataset_info", {}).get("real_samples_used", 1)
zero_distances = stats.get("zero_distance_count", 0)
small_distances = stats.get("small_distance_count", 0)
categories = ["Safe", "Near Match", "Exact Match"]
counts = [total_samples - small_distances, small_distances - zero_distances, zero_distances]
fig.add_trace(
go.Bar(
x=categories,
y=counts,
marker_color=['#059669', '#ea580c', '#dc2626'],
marker_line=dict(color='#000000', width=1),
text=[f"{c:,}" for c in counts],
textposition='outside',
textfont=dict(size=11, color="#000000"),
hovertemplate="<b>%{x}</b><br>Count: %{y:,}<extra></extra>",
showlegend=False
),
row=2, col=1
)
fig.update_xaxes(title_text="Sample Type", row=2, col=1, title_font_size=12)
fig.update_yaxes(title_text="Count", row=2, col=1, title_font_size=12)
def _add_key_metrics_table(fig, result: Dict[str, Any]):
"""Add key metrics summary table"""
dataset_info = result.get("dataset_info", {})
stats = result.get("distance_statistics", {})
risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN")
epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0)
metrics = [
"Real Samples",
"Synthetic Samples",
"Dimensions",
"Risk Level",
"Epsilon Value",
"Exact Matches"
]
values = [
f"{dataset_info.get('real_samples_used', 0):,}",
f"{dataset_info.get('synthetic_samples', 0):,}",
f"{dataset_info.get('dimensions', 0)}",
risk_level,
f"{epsilon:.6f}",
f"{stats.get('zero_distance_count', 0):,}"
]
fig.add_trace(
go.Table(
header=dict(
values=["<b>Metric</b>", "<b>Value</b>"],
fill_color="#f3f4f6",
font=dict(size=12, color="#000000"),
align="left",
line_color="#d1d5db"
),
cells=dict(
values=[metrics, values],
fill_color="white",
font=dict(size=11, color="#000000"),
align="left",
line_color="#d1d5db",
height=30
)
),
row=2, col=2
)
def _add_privacy_bounds(fig, result: Dict[str, Any]):
"""Add privacy bounds comparison across confidence levels"""
epsilon_bounds = result.get("epsilon_bounds", {})
confidence_levels = ["90%", "95%", "99%"]
epsilon_values = [
epsilon_bounds.get("eps_lb_90", 0),
epsilon_bounds.get("eps_lb_95", 0),
epsilon_bounds.get("eps_lb_99", 0)
]
# Use gradient colors to show increasing confidence
colors = ['#52c41a', '#1890ff', '#722ed1']
fig.add_trace(
go.Bar(
x=confidence_levels,
y=epsilon_values,
marker_color=colors,
text=[f"ε = {v:.6f}" for v in epsilon_values],
textposition='outside',
hovertemplate="<b>%{x} Confidence</b><br>ε Lower Bound: %{y:.6f}<extra></extra>",
name="Privacy Bounds"
),
row=2, col=2
)
fig.update_xaxes(title_text="Confidence Level", row=2, col=2)
fig.update_yaxes(title_text="ε Lower Bound", row=2, col=2, type="log" if max(epsilon_values) > 0 else "linear")
def _add_processing_status(fig, result: Dict[str, Any]):
"""Add processing pipeline status visualization"""
real_report = result.get("preprocessing_reports", {}).get("real_dataset", {})
synth_report = result.get("preprocessing_reports", {}).get("synthetic_dataset", {})
# Count completed processing steps
real_steps = len(real_report.get("steps_completed", []))
synth_steps = len(synth_report.get("steps_completed", []))
total_steps = 6 # Expected number of processing steps
datasets = ["Real Dataset", "Synthetic Dataset"]
completion = [real_steps / total_steps * 100, synth_steps / total_steps * 100]
colors = ['#28a745' if c == 100 else '#ffc107' for c in completion]
fig.add_trace(
go.Bar(
x=datasets,
y=completion,
marker_color=colors,
text=[f"{c:.0f}%<br>({int(c/100*total_steps)}/{total_steps})" for c in completion],
textposition='auto',
hovertemplate="<b>%{x}</b><br>Processing: %{y:.0f}% Complete<extra></extra>",
name="Processing Status"
),
row=2, col=3
)
fig.update_xaxes(title_text="Dataset Type", row=2, col=3)
fig.update_yaxes(title_text="Processing Completion %", row=2, col=3, range=[0, 100])
def create_safe_epsilon_plot(result: Dict[str, Any]) -> go.Figure:
"""Create simplified epsilon analysis plot"""
try:
if "error" in result:
return _create_error_figure(result)
epsilon_bounds = result.get("epsilon_bounds", {})
confidence_levels = [90, 95, 99]
epsilon_values = [epsilon_bounds.get(f"eps_lb_{conf}", 0) for conf in confidence_levels]
fig = go.Figure()
# Simple bar chart
fig.add_trace(go.Bar(
x=[f"{conf}%" for conf in confidence_levels],
y=epsilon_values,
marker_color='#2563eb',
marker_line=dict(color='#1e40af', width=1),
text=[f"{eps:.6f}" for eps in epsilon_values],
textposition='outside',
textfont=dict(size=11, color="#000000"),
hovertemplate="<b>%{x} Confidence</b><br>Epsilon: %{y:.6f}<extra></extra>",
showlegend=False
))
fig.update_layout(
title="Privacy Budget Analysis",
xaxis_title="Confidence Level",
yaxis_title="Epsilon Lower Bound",
plot_bgcolor="white",
paper_bgcolor="white",
font=dict(family="Arial, sans-serif", size=12, color="#000000"),
height=400,
margin=dict(t=80, b=50, l=60, r=60)
)
return fig
except Exception as e:
logger.error(f"Epsilon plot creation failed: {e}")
return _create_error_figure({"error": str(e)})
def generate_safe_report(result: Dict[str, Any]) -> str:
"""Generate safe executive report with error handling"""
try:
if "error" in result:
return f"""
# Privacy Audit Failed
**Error:** {result.get('error', 'Unknown error')}
**Audit ID:** {result.get('audit_id', 'N/A')}
**Timestamp:** {result.get('timestamp', 'N/A')}
**Failed Step:** {result.get('step_failed', 'Unknown')}
## Troubleshooting
Please check the following:
- Both datasets are in CSV format with headers
- Files are not corrupted and can be opened
- Datasets have overlapping column names
- Data contains numeric values or categorical data that can be encoded
- File sizes are within limits
## Next Steps
1. Review the error message above
2. Check your data format and content
3. Try with smaller datasets if memory/timeout issues occur
4. Contact support if the issue persists
---
*Report generated by Enterprise Privacy Auditor*
"""
# Extract key information
risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN")
epsilon = result.get("privacy_assessment", {}).get("primary_epsilon", 0)
# Build comprehensive report
report = f"""
# Privacy Audit Executive Summary
## Overall Assessment: {risk_level} RISK
**Audit ID:** {result.get('audit_id', 'N/A')}
**Session ID:** {result.get('session_id', 'N/A')}
**Conducted:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')}
**Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds
---
## Key Findings
### Privacy Metrics
- **Primary ε-DP Bound (95% confidence):** {epsilon:.6f}
- **Risk Assessment:** {result.get('privacy_assessment', {}).get('interpretation', 'N/A')}
### Dataset Overview
- **Real Data Samples (Original):** {result.get('dataset_info', {}).get('real_samples_original', 'N/A'):,}
- **Real Data Samples (Used):** {result.get('dataset_info', {}).get('real_samples_used', 'N/A'):,}
- **Synthetic Data Samples:** {result.get('dataset_info', {}).get('synthetic_samples', 'N/A'):,}
- **Feature Dimensions:** {result.get('dataset_info', {}).get('dimensions', 'N/A')}
- **Common Features:** {result.get('dataset_info', {}).get('common_features', 'N/A')}
### Data Quality Indicators
- **Exact Matches (Memorization):** {result.get('distance_statistics', {}).get('zero_distance_count', 'N/A')}
- **Very Close Matches:** {result.get('distance_statistics', {}).get('small_distance_count', 'N/A')}
- **Mean Nearest Distance:** {result.get('distance_statistics', {}).get('mean_nearest_distance', 0):.6f}
---
## Recommendations
"""
# Add recommendations
recommendations = result.get('privacy_assessment', {}).get('recommendations', [])
if recommendations:
for i, rec in enumerate(recommendations, 1):
report += f"{i}. {rec}\n"
else:
report += "No specific recommendations available.\n"
report += f"""
---
## Detailed Analysis
### Multi-Confidence Privacy Bounds
| Confidence Level | ε Lower Bound | Risk Level |
|------------------|---------------|------------|"""
# Add epsilon bounds table
epsilon_bounds = result.get('epsilon_bounds', {})
for conf in [90, 95, 99]:
eps_val = epsilon_bounds.get(f'eps_lb_{conf}', 0)
risk = auditor.assess_privacy_risk(eps_val) if auditor else "UNKNOWN"
report += f"\n| {conf}% | {eps_val:.6f} | {risk} |"
# Add distance statistics
dist_stats = result.get('distance_statistics', {})
report += f"""
### Distance Statistics Summary
- **Mean:** {dist_stats.get('mean_nearest_distance', 0):.6f}
- **Median:** {dist_stats.get('median_nearest_distance', 0):.6f}
- **Standard Deviation:** {dist_stats.get('std_nearest_distance', 0):.6f}
- **Range:** [{dist_stats.get('min_nearest_distance', 0):.6f}, {dist_stats.get('max_nearest_distance', 0):.6f}]
- **25th Percentile:** {dist_stats.get('q25_nearest_distance', 0):.6f}
- **75th Percentile:** {dist_stats.get('q75_nearest_distance', 0):.6f}
### Data Quality Assessment
- **Potential Memorization:** {"Yes" if result.get('data_quality', {}).get('potential_memorization', False) else "No"}
- **Distribution Skewness:** {result.get('data_quality', {}).get('distance_distribution_skew', 0):.4f}
---
## Configuration Used
**Preprocessing:**
- Categorical Encoding: {result.get('audit_metadata', {}).get('configuration', {}).get('categorical_encoding', 'N/A')}
- Numerical Scaling: {result.get('audit_metadata', {}).get('configuration', {}).get('numerical_scaling', 'N/A')}
- Distance Metric: {result.get('audit_metadata', {}).get('configuration', {}).get('distance_metric', 'N/A')}
**Audit Parameters:**
- Confidence Level: {result.get('audit_metadata', {}).get('configuration', {}).get('confidence_level', 'N/A')}
- Subsample Size: {result.get('audit_metadata', {}).get('configuration', {}).get('subsample_size', 'None (full dataset)')}
- Timeout: {result.get('audit_metadata', {}).get('configuration', {}).get('timeout_seconds', 'N/A')} seconds
---
## Methodology
This audit implements the state-of-the-art one-run nearest-neighbor ε-DP auditor. The method provides rigorous lower bounds on the privacy parameter ε, indicating the minimum privacy budget required under differential privacy guarantees.
**Key Benefits:**
- Single-run analysis (no multiple generations needed)
- Rigorous mathematical guarantees
- Suitable for enterprise environments
- Comprehensive preprocessing and validation
---
## Support Information
For questions about this audit or to report issues:
- Review the detailed technical logs
- Check the preprocessing reports for data quality issues
- Ensure your data meets the format requirements
---
*Report generated by Enterprise Privacy Auditor v2.0*
*Session: {result.get('session_id', 'N/A')} | Audit: {result.get('audit_id', 'N/A')}*
"""
return report
except Exception as e:
logger.error(f"Report generation failed: {e}")
return f"""
# Report Generation Failed
An error occurred while generating the executive report:
**Error:** {str(e)}
## Raw Audit Data
```json
{json.dumps(result, indent=2, default=str)}
```
---
*Please contact support for assistance*
"""
def safe_export_results(result: Dict[str, Any]) -> Optional[str]:
"""Safe export with comprehensive error handling"""
try:
logger.info("Generating export package")
# Create temporary file for export
import tempfile
export_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.zip', delete=False)
with zipfile.ZipFile(export_file, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Core results (with safe JSON conversion)
try:
safe_result = {}
for key, value in result.items():
safe_result[key] = SafeDataProcessor.safe_json_convert(value)
zip_file.writestr(
"audit_results.json",
json.dumps(safe_result, indent=2, default=str)
)
logger.debug("Added audit results to export")
except Exception as e:
logger.warning(f"Failed to add audit results: {e}")
zip_file.writestr("audit_results_error.txt", f"Failed to export results: {str(e)}")
# Executive report
try:
exec_report = generate_safe_report(result)
zip_file.writestr("executive_summary.md", exec_report)
logger.debug("Added executive report to export")
except Exception as e:
logger.warning(f"Failed to add executive report: {e}")
zip_file.writestr("executive_summary_error.txt", f"Failed to generate report: {str(e)}")
# Technical details
try:
tech_details = f"""
# Technical Privacy Audit Report
## Audit Metadata
- **Audit ID:** {result.get('audit_id', 'N/A')}
- **Session ID:** {result.get('session_id', 'N/A')}
- **Timestamp:** {result.get('audit_metadata', {}).get('timestamp', 'N/A')}
- **Duration:** {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds
- **Success:** {result.get('success', False)}
## Configuration Details
{json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)}
## Dataset Information
{json.dumps(result.get('dataset_info', {}), indent=2, default=str)}
## Validation Results
{json.dumps(result.get('validation_result', {}), indent=2, default=str)}
## Distance Statistics
{json.dumps(result.get('distance_statistics', {}), indent=2, default=str)}
## Privacy Assessment
{json.dumps(result.get('privacy_assessment', {}), indent=2, default=str)}
"""
zip_file.writestr("technical_details.md", tech_details)
logger.debug("Added technical details to export")
except Exception as e:
logger.warning(f"Failed to add technical details: {e}")
# Key metrics CSV
try:
if "error" not in result:
metrics_data = {
'Metric': [
'Audit_ID', 'Risk_Level', 'Primary_Epsilon', 'Mean_Distance',
'Zero_Distances', 'Close_Matches', 'Duration_Seconds',
'Real_Samples', 'Synthetic_Samples', 'Dimensions'
],
'Value': [
result.get('audit_id', ''),
result.get('privacy_assessment', {}).get('risk_level', ''),
result.get('privacy_assessment', {}).get('primary_epsilon', 0),
result.get('distance_statistics', {}).get('mean_nearest_distance', 0),
result.get('distance_statistics', {}).get('zero_distance_count', 0),
result.get('distance_statistics', {}).get('small_distance_count', 0),
result.get('audit_metadata', {}).get('duration_seconds', 0),
result.get('dataset_info', {}).get('real_samples_used', 0),
result.get('dataset_info', {}).get('synthetic_samples', 0),
result.get('dataset_info', {}).get('dimensions', 0)
]
}
metrics_df = pd.DataFrame(metrics_data)
csv_buffer = io.StringIO()
metrics_df.to_csv(csv_buffer, index=False)
zip_file.writestr("key_metrics.csv", csv_buffer.getvalue())
logger.debug("Added metrics CSV to export")
except Exception as e:
logger.warning(f"Failed to add metrics CSV: {e}")
# Audit log
try:
log_content = f"""
Privacy Audit Log - {result.get('audit_id', 'N/A')}
{'='*60}
Audit Started: {result.get('audit_metadata', {}).get('timestamp', 'N/A')}
Session ID: {result.get('session_id', 'N/A')}
Configuration:
{json.dumps(result.get('audit_metadata', {}).get('configuration', {}), indent=2, default=str)}
Dataset Information:
- Real samples (original): {result.get('dataset_info', {}).get('real_samples_original', 'N/A')}
- Real samples (used): {result.get('dataset_info', {}).get('real_samples_used', 'N/A')}
- Synthetic samples: {result.get('dataset_info', {}).get('synthetic_samples', 'N/A')}
- Dimensions: {result.get('dataset_info', {}).get('dimensions', 'N/A')}
{"Success: Audit completed successfully" if "error" not in result else f"Failed: {result.get('error', 'Unknown error')}"}
Duration: {result.get('audit_metadata', {}).get('duration_seconds', 'N/A')} seconds
Privacy Results:
- Risk Level: {result.get('privacy_assessment', {}).get('risk_level', 'N/A')}
- Primary ε: {result.get('privacy_assessment', {}).get('primary_epsilon', 'N/A')}
Validation Warnings:
{chr(10).join(result.get('validation_result', {}).get('warnings', ['None']))}
Export completed: {datetime.now().isoformat()}
"""
zip_file.writestr("audit.log", log_content)
logger.debug("Added audit log to export")
except Exception as e:
logger.warning(f"Failed to add audit log: {e}")
export_file.close()
logger.info("Export package generated successfully")
return export_file.name
except Exception as e:
logger.error(f"Export generation failed: {e}")
logger.error(traceback.format_exc())
# Create minimal error export
try:
import tempfile
error_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
error_file.write(f"Export failed: {str(e)}\n\nTimestamp: {datetime.now().isoformat()}")
if result:
error_file.write(f"\n\nRaw result: {str(result)}")
error_file.close()
return error_file.name
except Exception:
return None
def run_enhanced_audit(real_file, synthetic_file, confidence, subsample_size,
categorical_encoding, numerical_scaling, distance_metric):
"""Enhanced main audit interface with comprehensive error handling and status updates"""
# Input validation
if not real_file or not synthetic_file:
error_msg = "Please upload both real and synthetic datasets"
logger.warning(error_msg)
empty_result = {"error": error_msg, "step_failed": "file_upload"}
return empty_result, None, None, f"ERROR: {error_msg}", None
logger.info("Starting enhanced privacy audit")
logger.info(f"Files: Real={real_file.name}, Synthetic={synthetic_file.name}")
try:
# Validate and update configuration
try:
new_config = AuditConfig(
confidence_level=confidence,
subsample_size=int(subsample_size) if subsample_size > 0 else None,
categorical_encoding=categorical_encoding,
numerical_scaling=numerical_scaling,
distance_metric=distance_metric
)
config_errors = new_config.validate()
if config_errors:
error_msg = f"Configuration errors: {'; '.join(config_errors)}"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "configuration"}
return error_result, None, None, f"ERROR: {error_msg}", None
if auditor:
auditor.config = new_config
logger.info("Configuration updated successfully")
else:
error_msg = "Auditor not initialized"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "initialization"}
return error_result, None, None, f"ERROR: {error_msg}", None
except Exception as e:
error_msg = f"Configuration error: {str(e)}"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "configuration"}
return error_result, None, None, f"ERROR: {error_msg}", None
# Load datasets with enhanced error handling
try:
logger.info("Loading datasets...")
real_df, real_error = SafeDataProcessor.safe_read_csv(real_file.name)
if real_df is None:
error_msg = f"Failed to load real dataset: {real_error}"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "data_loading"}
return error_result, None, None, f"ERROR: {error_msg}", None
synth_df, synth_error = SafeDataProcessor.safe_read_csv(synthetic_file.name)
if synth_df is None:
error_msg = f"Failed to load synthetic dataset: {synth_error}"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "data_loading"}
return error_result, None, None, f"ERROR: {error_msg}", None
logger.info(f"Datasets loaded successfully - Real: {real_df.shape}, Synthetic: {synth_df.shape}")
except Exception as e:
error_msg = f"Data loading error: {str(e)}"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "data_loading"}
return error_result, None, None, f"ERROR: {error_msg}", None
# Check file sizes
try:
real_size_mb = real_df.memory_usage(deep=True).sum() / 1024 / 1024
synth_size_mb = synth_df.memory_usage(deep=True).sum() / 1024 / 1024
logger.info(f"Memory usage - Real: {real_size_mb:.2f}MB, Synthetic: {synth_size_mb:.2f}MB")
if real_size_mb > auditor.config.max_file_size_mb or synth_size_mb > auditor.config.max_file_size_mb:
error_msg = f"File size exceeds limit ({auditor.config.max_file_size_mb}MB). Real: {real_size_mb:.1f}MB, Synthetic: {synth_size_mb:.1f}MB"
logger.error(error_msg)
error_result = {"error": error_msg, "step_failed": "size_check"}
return error_result, None, None, f"ERROR: {error_msg}", None
except Exception as e:
logger.warning(f"Size check failed: {e}")
# Continue anyway
# Run comprehensive audit
logger.info("Starting comprehensive privacy audit...")
result = auditor.run_comprehensive_audit(real_df, synth_df)
# Check for audit errors
if "error" in result:
error_msg = result["error"]
step_failed = result.get("step_failed", "unknown")
logger.error(f"Audit failed at step '{step_failed}': {error_msg}")
return result, None, None, f"ERROR: Audit failed at {step_failed}: {error_msg}", None
# Generate visualizations safely
dist_plot = None
eps_plot = None
try:
logger.info("Generating visualizations...")
dist_plot = create_safe_distance_plot(result)
eps_plot = create_safe_epsilon_plot(result)
logger.info("Visualizations generated successfully")
except Exception as e:
logger.warning(f"Visualization generation failed: {e}")
# Continue without visualizations
# Generate report safely
try:
logger.info("Generating executive report...")
report = generate_safe_report(result)
logger.info("Report generated successfully")
except Exception as e:
logger.warning(f"Report generation failed: {e}")
report = f"ERROR: Report generation failed: {str(e)}"
# Generate export safely
export_data = None
try:
logger.info("Generating export package...")
export_data = safe_export_results(result)
if export_data:
logger.info("Export package generated successfully")
else:
logger.warning("Export generation returned no data")
except Exception as e:
logger.warning(f"Export generation failed: {e}")
# Log success
risk_level = result.get("privacy_assessment", {}).get("risk_level", "UNKNOWN")
logger.info(f"Audit completed - ID: {result.get('audit_id')}, Risk: {risk_level}")
return result, dist_plot, eps_plot, report, export_data
except Exception as e:
error_msg = f"Unexpected error in audit interface: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
error_result = {
"error": error_msg,
"step_failed": "unexpected_error",
"traceback": traceback.format_exc(),
"timestamp": datetime.now().isoformat()
}
return error_result, None, None, f"ERROR: {error_msg}", None
def create_enhanced_interface():
"""Create the enhanced Gradio interface with improved UX"""
# Custom CSS for better UI
custom_css = """
.main-header {
text-align: center;
margin-bottom: 30px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
border-radius: 15px;
margin: 20px;
}
.config-section {
background-color: #f8f9fa;
padding: 20px;
border-radius: 10px;
margin: 10px 0;
border-left: 4px solid #007bff;
}
.results-section {
background-color: #e8f5e8;
padding: 20px;
border-radius: 10px;
margin: 10px 0;
border-left: 4px solid #28a745;
}
.error-section {
background-color: #ffe6e6;
padding: 20px;
border-radius: 10px;
margin: 10px 0;
border-left: 4px solid #dc3545;
}
.status-box {
padding: 15px;
border-radius: 8px;
margin: 10px 0;
font-weight: 500;
}
.upload-section {
border: 2px dashed #ccc;
border-radius: 10px;
padding: 20px;
margin: 10px 0;
background-color: #fafafa;
}
"""
with gr.Blocks(
title="Enterprise Privacy Auditor",
theme=gr.themes.Soft(),
css=custom_css
) as demo:
gr.HTML("""
<div class="main-header">
<h1>Privacy Auditor for Generative Models</h1>
<h3>Advanced Privacy Assessment Tool for Synthetic Data Generation</h3>
<p><em>Implementing state-of-the-art one-run nearest-neighbor ε-DP auditing with enterprise features</em></p>
<p>Secure • Comprehensive • Fast • Configurable</p>
</div>
""")
# Main audit interface
with gr.Tab("Privacy Audit", elem_id="audit-tab"):
with gr.Row():
# Left column - Configuration and upload
with gr.Column(scale=1):
gr.HTML('<div class="upload-section">')
gr.Markdown("### Dataset Upload")
real_file = gr.File(
label="Real/Original Dataset (CSV)",
file_types=[".csv"],
elem_id="real-file"
)
gr.Markdown("*Upload the original dataset used for training or reference*")
synth_file = gr.File(
label="Synthetic Dataset (CSV)",
file_types=[".csv"],
elem_id="synth-file"
)
gr.Markdown("*Upload the synthetic dataset to be audited for privacy*")
gr.HTML('</div>')
gr.HTML('<div class="config-section">')
gr.Markdown("### Advanced Configuration")
with gr.Group():
gr.Markdown("#### Privacy Parameters")
confidence = gr.Slider(
0.80, 0.999, value=0.95, step=0.001,
label="Primary Confidence Level",
info="Higher values provide more conservative privacy bounds"
)
subsample_size = gr.Number(
value=0, minimum=0, maximum=100000, step=1000,
label="Subsample Size (0 = use all data)",
info="Limit real data samples for faster computation on large datasets"
)
with gr.Group():
gr.Markdown("#### Data Processing")
categorical_encoding = gr.Dropdown(
choices=[
("One-Hot Encoding (recommended)", "onehot"),
("Label Encoding (memory efficient)", "label")
],
value="onehot",
label="Categorical Variable Encoding",
info="How to handle non-numeric categorical variables"
)
numerical_scaling = gr.Dropdown(
choices=[
("Standard Scaling (recommended)", "standard"),
("Min-Max Scaling", "minmax"),
("Robust Scaling (outlier resistant)", "robust"),
("No Scaling", "none")
],
value="standard",
label="Numerical Feature Scaling",
info="Normalization method for numerical features"
)
distance_metric = gr.Dropdown(
choices=[
("Euclidean (recommended)", "euclidean"),
("Manhattan (robust to outliers)", "manhattan"),
("Cosine (for high-dimensional data)", "cosine")
],
value="euclidean",
label="Distance Metric",
info="Method for computing distances between data points"
)
gr.HTML('</div>')
# Prominent run button
run_btn = gr.Button(
"Run Comprehensive Privacy Audit",
variant="primary",
size="lg",
elem_id="run-audit-btn"
)
gr.Markdown("""
### Quick Start Guide
1. Upload both datasets in CSV format with headers
2. Keep file sizes under 500MB for optimal performance
3. Review configuration settings (defaults work for most cases)
4. Run audit and review comprehensive results
""")
# Right column - Results and status
with gr.Column(scale=2):
gr.HTML('<div class="results-section">')
gr.Markdown("### Audit Results & Status")
# Status display
audit_status = gr.Markdown(
"**Ready to run audit**\n\nPlease upload your datasets and configure the audit parameters.",
elem_classes=["status-box"]
)
# Detailed results
with gr.Group():
audit_results = gr.JSON(
label="Detailed Audit Results",
elem_id="audit-results"
)
gr.HTML('</div>')
gr.Markdown("### Interactive Visualizations")
# Visualization tabs
with gr.Tabs():
with gr.Tab("Privacy Dashboard"):
distance_plot = gr.Plot(
label="Comprehensive Privacy Analysis",
elem_id="distance-plot"
)
with gr.Tab("Risk Analysis"):
epsilon_plot = gr.Plot(
label="Privacy Bounds & Risk Assessment",
elem_id="epsilon-plot"
)
# Executive report tab
with gr.Tab("Executive Report", elem_id="report-tab"):
gr.Markdown("### Executive Summary & Detailed Analysis")
gr.Markdown("*Complete report will be generated after running the audit*")
audit_report = gr.Markdown(
"""
**No audit completed yet**
Run a privacy audit to generate a comprehensive executive report including:
- Privacy risk assessment and recommendations
- Statistical analysis and data quality metrics
- Technical details and configuration summary
- Actionable insights for improving privacy
""",
elem_id="audit-report"
)
gr.Markdown("### Export & Download")
export_btn = gr.File(
label="Download Complete Audit Package",
elem_id="export-file",
visible=False
)
gr.Markdown("""
Complete audit package includes:
- Executive summary report (Markdown)
- Technical analysis report (Markdown)
- Key metrics spreadsheet (CSV)
- Audit configuration details (JSON)
- Comprehensive audit log (Text)
- Raw results data (JSON)
""")
# Documentation tab
with gr.Tab("Documentation", elem_id="docs-tab"):
gr.Markdown("""
## Enterprise Privacy Auditor
### Methodology
This tool implements the **state-of-the-art one-run nearest-neighbor ε-DP auditor** providing rigorous lower bounds on privacy parameters without requiring multiple dataset generations.
### Enterprise Features
- **Local Processing**: All data remains secure on your infrastructure
- **Comprehensive Logging**: Detailed audit trails and error reporting
- **Scalable Architecture**: Memory-efficient processing for large datasets
- **Configurable Pipeline**: Flexible preprocessing and analysis options
### Privacy Risk Framework
| Risk Level | ε Range | Interpretation | Action Required |
|------------|---------|----------------|-----------------|
| EXCEPTIONAL | ε ≤ 0.01 | Outstanding privacy | Suitable for highly sensitive data |
| VERY LOW | 0.01 < ε ≤ 0.1 | Excellent privacy | Good for most enterprise use |
| LOW | 0.1 < ε ≤ 0.5 | Acceptable privacy | Monitor for sensitive applications |
| MEDIUM | 0.5 < ε ≤ 1.0 | Moderate risk | Consider additional measures |
| HIGH | 1.0 < ε ≤ 2.0 | High risk | Review methodology |
| VERY HIGH | 2.0 < ε ≤ 5.0 | Very high risk | Additional privacy required |
| CRITICAL | ε > 5.0 | Critical risk | Immediate action required |
### Configuration Guide
#### Distance Metrics
- **Euclidean**: Best for continuous numerical data
- **Manhattan**: Robust to outliers, good for mixed data
- **Cosine**: Ideal for high-dimensional sparse data
#### Preprocessing Options
- **One-Hot Encoding**: Creates binary features (recommended for <50 categories)
- **Label Encoding**: Assigns integer codes (memory efficient)
- **Standard Scaling**: Zero mean, unit variance (recommended)
- **Min-Max Scaling**: Scale to [0,1] range
- **Robust Scaling**: Uses median and IQR (outlier resistant)
### Best Practices
1. **Data Preparation**: Ensure CSV format with headers, similar structure between datasets
2. **Memory Management**: Use subsampling for datasets >100K samples
3. **Configuration**: Start with defaults, adjust based on your data characteristics
4. **Interpretation**: Review both statistical results and actionable recommendations
### Support
- Review error logs for troubleshooting
- Check preprocessing reports for data quality issues
- Ensure data meets format requirements
""")
# Event handlers with enhanced error feedback - FIXED OUTPUT COUNT
def update_status_and_run(*args):
"""Update status during audit execution"""
try:
# Update status to running
yield (
gr.update(value="Audit in progress. Processing your datasets and running privacy analysis."),
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False)
)
# Run the actual audit
result = run_enhanced_audit(*args)
# Update status based on result
if result[0] and "error" not in result[0]:
risk_level = result[0].get("privacy_assessment", {}).get("risk_level", "UNKNOWN")
epsilon = result[0].get("privacy_assessment", {}).get("primary_epsilon", 0)
status_msg = f"Audit completed successfully.\n\nRisk Level: {risk_level}\nEpsilon-DP Bound: {epsilon:.6f}"
else:
error_msg = result[0].get("error", "Unknown error") if result[0] else "Unknown error"
status_msg = f"Audit failed: {error_msg}"
# Make export visible if successful
export_visible = result[4] is not None
yield (
gr.update(value=status_msg),
result[0], # audit_results
result[1], # distance_plot
result[2], # epsilon_plot
result[3], # audit_report
gr.update(value=result[4], visible=export_visible) if export_visible else gr.update(visible=False)
)
except Exception as e:
error_msg = f"Interface error: {str(e)}"
logger.error(error_msg)
yield (
gr.update(value=f"Interface Error: {error_msg}"),
{"error": error_msg}, None, None, f"Error: {error_msg}", gr.update(visible=False)
)
# Connect the interface - FIXED: Now returns 6 outputs
run_btn.click(
fn=update_status_and_run,
inputs=[
real_file, synth_file, confidence, subsample_size,
categorical_encoding, numerical_scaling, distance_metric
],
outputs=[
audit_status, audit_results, distance_plot, epsilon_plot, audit_report, export_btn
]
)
return demo
# Launch the application
if __name__ == "__main__":
try:
logger.info("Creating enhanced Gradio interface...")
demo = create_enhanced_interface()
logger.info("Launching Privacy Auditor application...")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
logger.error(traceback.format_exc())
print(f"Application failed to start: {e}")