""" Basic analysis tools for exploratory data analysis of HuggingFace datasets. This module provides tools for performing exploratory data analysis including feature statistics and missing value analysis. """ import logging import statistics import gradio as gr from typing import Optional, Dict, Any, List from collections import Counter from hf_eda_mcp.services.dataset_service import get_dataset_service, DatasetServiceError from hf_eda_mcp.integrations.hf_client import DatasetNotFoundError, AuthenticationError, NetworkError from hf_eda_mcp.validation import ( validate_dataset_id, validate_config_name, validate_split_name, validate_sample_size, ValidationError, format_validation_error, ) from hf_eda_mcp.error_handling import format_error_response, log_error_with_context logger = logging.getLogger(__name__) # Default constants (can be overridden by config) DEFAULT_ANALYSIS_SAMPLE_SIZE = 1000 MAX_UNIQUE_VALUES_TO_SHOW = 20 def analyze_dataset_features( dataset_id: str, split: str = "train", sample_size: int = DEFAULT_ANALYSIS_SAMPLE_SIZE, config_name: Optional[str] = None, hf_api_token: gr.Header = "", ) -> Dict[str, Any]: """ Perform basic exploratory analysis on dataset features. This function analyzes dataset features to provide insights into data types, distributions, missing values, and data quality. It handles different data types (numerical, categorical, text) appropriately and generates comprehensive statistics for each feature. Args: dataset_id: HuggingFace dataset identifier (e.g., 'imdb', 'squad') split: Dataset split to analyze (default: 'train') sample_size: Number of samples to use for analysis (default: 1000, max: 50000) config_name: Optional configuration name for multi-config datasets hf_api_token: Header parsed by Gradio when hf_api_token is provided in MCP configuration headers Returns: Dictionary containing comprehensive feature analysis: - dataset_info: Basic dataset information - sample_info: Information about the sample used for analysis - features: Dictionary with analysis for each feature including: - feature_type: Detected type (numerical, categorical, text, etc.) - missing_count: Number of missing/null values - missing_percentage: Percentage of missing values - unique_count: Number of unique values - statistics: Type-specific statistics (mean, std for numerical; top values for categorical) - summary: Overall analysis summary Raises: ValueError: If inputs are invalid DatasetNotFoundError: If dataset or split doesn't exist AuthenticationError: If dataset is private and authentication fails DatasetServiceError: If analysis fails for other reasons Example: >>> analysis = analyze_dataset_features("imdb", sample_size=500) >>> for feature_name, feature_analysis in analysis['features'].items(): ... print(f"{feature_name}: {feature_analysis['feature_type']}") ... print(f" Missing: {feature_analysis['missing_percentage']:.1f}%") """ # Handle empty strings from Gradio (convert to None) if config_name == "": config_name = None # Input validation using centralized validation try: dataset_id = validate_dataset_id(dataset_id) config_name = validate_config_name(config_name) split = validate_split_name(split) sample_size = validate_sample_size(sample_size, "sample_size") except ValidationError as e: logger.error(f"Validation error: {format_validation_error(e)}") raise ValueError(format_validation_error(e)) context = { "dataset_id": dataset_id, "split": split, "sample_size": sample_size, "config_name": config_name, "operation": "analyze_dataset_features" } logger.info( f"Analyzing features for dataset: {dataset_id}, split: {split}, " f"sample_size: {sample_size}" + (f", config: {config_name}" if config_name else "") ) try: # Get dataset service service = get_dataset_service(hf_api_token=hf_api_token) # Try to get statistics from Dataset Viewer API first (more efficient and complete) viewer_stats = service.get_dataset_statistics( dataset_id=dataset_id, split=split, config_name=config_name ) if viewer_stats is not None: # Use Dataset Viewer statistics (full dataset, no sampling needed) logger.info(f"Using Dataset Viewer statistics for {dataset_id}") return _convert_viewer_statistics_to_analysis( viewer_stats, dataset_id, config_name, split ) # Fall back to sample-based analysis logger.info("Dataset Viewer statistics not available, using sample-based analysis") sample_data = service.load_dataset_sample( dataset_id=dataset_id, split=split, num_samples=sample_size, config_name=config_name, streaming=True, ) # Perform feature analysis features_analysis = {} data_samples = sample_data["data"] if not data_samples: raise DatasetServiceError("No data samples available for analysis") # Determine feature names from first sample first_sample = data_samples[0] if not isinstance(first_sample, dict): raise DatasetServiceError( "Dataset samples are not in expected dictionary format" ) feature_names = list(first_sample.keys()) # Analyze each feature for feature_name in feature_names: logger.debug(f"Analyzing feature: {feature_name}") feature_analysis = _analyze_single_feature(feature_name, data_samples) features_analysis[feature_name] = feature_analysis # Generate overall analysis analysis_result = { "dataset_info": { "dataset_id": dataset_id, "config_name": config_name, "split": split, "total_features": len(feature_names), "sample_size_used": len(data_samples), "sample_size_requested": sample_size, }, "sample_info": { "sampling_method": "sequential_head", "represents_full_dataset": len(data_samples) >= sample_size, "analysis_timestamp": sample_data.get("_sampled_at"), }, "features": features_analysis, "summary": _generate_analysis_summary(features_analysis, len(data_samples)), } logger.info( f"Successfully analyzed {len(feature_names)} features from {dataset_id}" ) return analysis_result except DatasetNotFoundError as e: log_error_with_context(e, context, level=logging.WARNING) error_response = format_error_response(e, context) logger.info(f"Dataset/split not found suggestions: {error_response.get('suggestions', [])}") raise except AuthenticationError as e: log_error_with_context(e, context, level=logging.WARNING) error_response = format_error_response(e, context) logger.info(f"Authentication error guidance: {error_response.get('suggestions', [])}") raise except NetworkError as e: log_error_with_context(e, context) error_response = format_error_response(e, context) logger.info(f"Network error guidance: {error_response.get('suggestions', [])}") raise except Exception as e: log_error_with_context(e, context) raise DatasetServiceError(f"Failed to analyze dataset features: {str(e)}") from e def _convert_viewer_statistics_to_analysis( viewer_stats: Dict[str, Any], dataset_id: str, config_name: Optional[str], split: str ) -> Dict[str, Any]: """ Convert Dataset Viewer API statistics to our analysis format. Supports all Dataset Viewer column types: - Numerical: int, float - Categorical: class_label, string_label, bool - Text: string_text - Media: image, audio - Structured: list Args: viewer_stats: Statistics from Dataset Viewer API dataset_id: Dataset identifier config_name: Configuration name split: Split name Returns: Dictionary in our standard analysis format """ num_examples = viewer_stats.get('num_examples', 0) statistics_list = viewer_stats.get('statistics', []) features_analysis = {} for col_stat in statistics_list: column_name = col_stat.get('column_name', 'unknown') column_type = col_stat.get('column_type', 'unknown') column_statistics = col_stat.get('column_statistics', {}) # Convert to our format based on column type if column_type == 'string_text': # Text features: character length statistics features_analysis[column_name] = { 'feature_type': 'text', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': 0, # Not provided by viewer for text 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'min_length': column_statistics.get('min', 0), 'max_length': column_statistics.get('max', 0), 'mean_length': column_statistics.get('mean', 0), 'median_length': column_statistics.get('median', 0), 'std_length': column_statistics.get('std', 0), 'histogram': column_statistics.get('histogram', {}), }, 'sample_values': [], } elif column_type in ['class_label', 'string_label']: # Categorical features: frequency distributions frequencies = column_statistics.get('frequencies', {}) features_analysis[column_name] = { 'feature_type': 'categorical', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': column_statistics.get('n_unique', len(frequencies)), 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'unique_count': column_statistics.get('n_unique', len(frequencies)), 'frequencies': frequencies, 'most_common': [(k, v) for k, v in sorted(frequencies.items(), key=lambda x: x[1], reverse=True)], 'top_value': max(frequencies.items(), key=lambda x: x[1]) if frequencies else None, 'no_label_count': column_statistics.get('no_label_count', 0), 'no_label_proportion': column_statistics.get('no_label_proportion', 0.0), }, 'sample_values': list(frequencies.keys())[:5], } elif column_type == 'bool': # Boolean features: True/False frequencies frequencies = column_statistics.get('frequencies', {}) features_analysis[column_name] = { 'feature_type': 'boolean', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': len(frequencies), 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'frequencies': frequencies, }, 'sample_values': list(frequencies.keys()), } elif column_type in ['int', 'float']: # Numerical features: statistical measures features_analysis[column_name] = { 'feature_type': 'numerical', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': 0, # Not always provided 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'mean': column_statistics.get('mean', 0), 'median': column_statistics.get('median', 0), 'min': column_statistics.get('min', 0), 'max': column_statistics.get('max', 0), 'std': column_statistics.get('std', 0), 'histogram': column_statistics.get('histogram', {}), }, 'sample_values': [], } elif column_type == 'image': # Image features: dimension statistics features_analysis[column_name] = { 'feature_type': 'image', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': 0, 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'min_dimension': column_statistics.get('min', 0), 'max_dimension': column_statistics.get('max', 0), 'mean_dimension': column_statistics.get('mean', 0), 'median_dimension': column_statistics.get('median', 0), 'std_dimension': column_statistics.get('std', 0), 'histogram': column_statistics.get('histogram', {}), }, 'sample_values': [], } elif column_type == 'audio': # Audio features: duration statistics (in seconds) features_analysis[column_name] = { 'feature_type': 'audio', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': 0, 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'min_duration': column_statistics.get('min', 0), 'max_duration': column_statistics.get('max', 0), 'mean_duration': column_statistics.get('mean', 0), 'median_duration': column_statistics.get('median', 0), 'std_duration': column_statistics.get('std', 0), 'histogram': column_statistics.get('histogram', {}), }, 'sample_values': [], } elif column_type == 'list': # List features: length statistics features_analysis[column_name] = { 'feature_type': 'list', 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': 0, 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': { 'count': num_examples - column_statistics.get('nan_count', 0), 'min_length': column_statistics.get('min', 0), 'max_length': column_statistics.get('max', 0), 'mean_length': column_statistics.get('mean', 0), 'median_length': column_statistics.get('median', 0), 'std_length': column_statistics.get('std', 0), 'histogram': column_statistics.get('histogram', {}), }, 'sample_values': [], } else: # Unknown type - provide basic info with all available statistics features_analysis[column_name] = { 'feature_type': column_type, 'missing_count': column_statistics.get('nan_count', 0), 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, 'unique_count': column_statistics.get('n_unique', 0), 'total_count': num_examples, 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), 'statistics': column_statistics, 'sample_values': [], } # Generate overall analysis analysis_result = { "dataset_info": { "dataset_id": dataset_id, "config_name": viewer_stats.get('_config_used', config_name), "split": split, "total_features": len(features_analysis), "sample_size_used": num_examples, "sample_size_requested": num_examples, }, "sample_info": { "sampling_method": "dataset_viewer_api", "represents_full_dataset": True, "analysis_timestamp": viewer_stats.get('_cached_at'), "partial": viewer_stats.get('partial', False), }, "features": features_analysis, "summary": _generate_analysis_summary(features_analysis, num_examples), } return analysis_result def _analyze_single_feature( feature_name: str, data_samples: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Analyze a single feature across all data samples. Args: feature_name: Name of the feature to analyze data_samples: List of data sample dictionaries Returns: Dictionary containing feature analysis results """ # Extract values for this feature values = [] missing_count = 0 for sample in data_samples: value = sample.get(feature_name) if ( value is None or value == "" or (isinstance(value, float) and str(value).lower() == "nan") ): missing_count += 1 else: values.append(value) total_count = len(data_samples) missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0 # Determine feature type and compute statistics feature_type, statistics_dict = _determine_feature_type_and_stats(values) # Count unique values unique_count = len(set(str(v) for v in values)) if values else 0 return { "feature_type": feature_type, "missing_count": missing_count, "missing_percentage": missing_percentage, "unique_count": unique_count, "total_count": total_count, "non_missing_count": len(values), "statistics": statistics_dict, "sample_values": values[:5] if values else [], # First 5 values as examples } def _determine_feature_type_and_stats(values: List[Any]) -> tuple[str, Dict[str, Any]]: """ Determine the type of a feature and compute appropriate statistics. Args: values: List of non-missing values for the feature Returns: Tuple of (feature_type, statistics_dict) """ if not values: return "unknown", {} # Check if all values are numeric numeric_values = [] for value in values: try: if isinstance(value, (int, float)): numeric_values.append(float(value)) elif isinstance(value, str): # Try to convert string to number numeric_values.append(float(value)) else: # Not numeric break except (ValueError, TypeError): # Not numeric break else: # All values are numeric if len(numeric_values) == len(values): return "numerical", _compute_numerical_statistics(numeric_values) # Check if values are boolean-like boolean_values = set(str(v).lower() for v in values) if boolean_values.issubset({"true", "false", "1", "0", "yes", "no"}): return "boolean", _compute_categorical_statistics(values) # Check if it's text (strings with average length > 10) if all(isinstance(v, str) for v in values): avg_length = sum(len(v) for v in values) / len(values) if avg_length > 10: return "text", _compute_text_statistics(values) # Default to categorical return "categorical", _compute_categorical_statistics(values) def _compute_numerical_statistics(values: List[float]) -> Dict[str, Any]: """Compute statistics for numerical features.""" if not values: return {} try: stats = { "count": len(values), "mean": statistics.mean(values), "median": statistics.median(values), "min": min(values), "max": max(values), "range": max(values) - min(values), } if len(values) > 1: stats["std"] = statistics.stdev(values) stats["variance"] = statistics.variance(values) # Quartiles sorted_values = sorted(values) n = len(sorted_values) if n >= 4: stats["q1"] = sorted_values[n // 4] stats["q3"] = sorted_values[3 * n // 4] stats["iqr"] = stats["q3"] - stats["q1"] return stats except Exception as e: logger.warning(f"Failed to compute numerical statistics: {e}") return {"count": len(values), "error": str(e)} def _compute_categorical_statistics(values: List[Any]) -> Dict[str, Any]: """Compute statistics for categorical features.""" if not values: return {} try: # Convert all values to strings for consistent counting str_values = [str(v) for v in values] value_counts = Counter(str_values) stats = { "count": len(values), "unique_count": len(value_counts), "most_common": value_counts.most_common(MAX_UNIQUE_VALUES_TO_SHOW), "top_value": value_counts.most_common(1)[0] if value_counts else None, } # Calculate entropy (measure of diversity) if len(value_counts) > 1: total = len(str_values) entropy = -sum( (count / total) * (count / total).bit_length() for count in value_counts.values() if count > 0 ) stats["entropy"] = entropy return stats except Exception as e: logger.warning(f"Failed to compute categorical statistics: {e}") return {"count": len(values), "error": str(e)} def _compute_text_statistics(values: List[str]) -> Dict[str, Any]: """Compute statistics for text features.""" if not values: return {} try: lengths = [len(v) for v in values] word_counts = [len(v.split()) for v in values] stats = { "count": len(values), "avg_length": statistics.mean(lengths), "min_length": min(lengths), "max_length": max(lengths), "avg_word_count": statistics.mean(word_counts), "min_word_count": min(word_counts), "max_word_count": max(word_counts), } # Sample of values (first few) stats["sample_texts"] = values[:3] return stats except Exception as e: logger.warning(f"Failed to compute text statistics: {e}") return {"count": len(values), "error": str(e)} def _generate_analysis_summary( features_analysis: Dict[str, Dict[str, Any]], sample_size: int ) -> str: """Generate a human-readable summary of the analysis.""" if not features_analysis: return "No features analyzed" total_features = len(features_analysis) # Count feature types type_counts = Counter( analysis.get("feature_type", "unknown") for analysis in features_analysis.values() ) # Calculate average missing rate missing_rates = [ analysis.get("missing_percentage", 0) for analysis in features_analysis.values() ] avg_missing = statistics.mean(missing_rates) if missing_rates else 0 summary_parts = [f"Analyzed {total_features} features from {sample_size} samples"] # Feature type breakdown type_summary = [] for ftype, count in type_counts.most_common(): type_summary.append(f"{count} {ftype}") if type_summary: summary_parts.append(f"Types: {', '.join(type_summary)}") # Missing data summary if avg_missing > 0: summary_parts.append(f"Avg missing: {avg_missing:.1f}%") return " | ".join(summary_parts)