Spaces:
Running
Running
| """ | |
| Basic analysis tools for exploratory data analysis of HuggingFace datasets. | |
| This module provides tools for performing exploratory data analysis including | |
| feature statistics and missing value analysis. | |
| """ | |
| import logging | |
| import statistics | |
| import gradio as gr | |
| from typing import Optional, Dict, Any, List | |
| from collections import Counter | |
| from hf_eda_mcp.services.dataset_service import get_dataset_service, DatasetServiceError | |
| from hf_eda_mcp.integrations.hf_client import DatasetNotFoundError, AuthenticationError, NetworkError | |
| from hf_eda_mcp.validation import ( | |
| validate_dataset_id, | |
| validate_config_name, | |
| validate_split_name, | |
| validate_sample_size, | |
| ValidationError, | |
| format_validation_error, | |
| ) | |
| from hf_eda_mcp.error_handling import format_error_response, log_error_with_context | |
| logger = logging.getLogger(__name__) | |
| # Default constants (can be overridden by config) | |
| DEFAULT_ANALYSIS_SAMPLE_SIZE = 1000 | |
| MAX_UNIQUE_VALUES_TO_SHOW = 20 | |
| def analyze_dataset_features( | |
| dataset_id: str, | |
| split: str = "train", | |
| sample_size: int = DEFAULT_ANALYSIS_SAMPLE_SIZE, | |
| config_name: Optional[str] = None, | |
| hf_api_token: gr.Header = "", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform basic exploratory analysis on dataset features. | |
| This function analyzes dataset features to provide insights into data types, | |
| distributions, missing values, and data quality. It handles different data | |
| types (numerical, categorical, text) appropriately and generates comprehensive | |
| statistics for each feature. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier (e.g., 'imdb', 'squad') | |
| split: Dataset split to analyze (default: 'train') | |
| sample_size: Number of samples to use for analysis (default: 1000, max: 50000) | |
| config_name: Optional configuration name for multi-config datasets | |
| hf_api_token: Header parsed by Gradio when hf_api_token is provided in MCP configuration headers | |
| Returns: | |
| Dictionary containing comprehensive feature analysis: | |
| - dataset_info: Basic dataset information | |
| - sample_info: Information about the sample used for analysis | |
| - features: Dictionary with analysis for each feature including: | |
| - feature_type: Detected type (numerical, categorical, text, etc.) | |
| - missing_count: Number of missing/null values | |
| - missing_percentage: Percentage of missing values | |
| - unique_count: Number of unique values | |
| - statistics: Type-specific statistics (mean, std for numerical; top values for categorical) | |
| - summary: Overall analysis summary | |
| Raises: | |
| ValueError: If inputs are invalid | |
| DatasetNotFoundError: If dataset or split doesn't exist | |
| AuthenticationError: If dataset is private and authentication fails | |
| DatasetServiceError: If analysis fails for other reasons | |
| Example: | |
| >>> analysis = analyze_dataset_features("imdb", sample_size=500) | |
| >>> for feature_name, feature_analysis in analysis['features'].items(): | |
| ... print(f"{feature_name}: {feature_analysis['feature_type']}") | |
| ... print(f" Missing: {feature_analysis['missing_percentage']:.1f}%") | |
| """ | |
| # Handle empty strings from Gradio (convert to None) | |
| if config_name == "": | |
| config_name = None | |
| # Input validation using centralized validation | |
| try: | |
| dataset_id = validate_dataset_id(dataset_id) | |
| config_name = validate_config_name(config_name) | |
| split = validate_split_name(split) | |
| sample_size = validate_sample_size(sample_size, "sample_size") | |
| except ValidationError as e: | |
| logger.error(f"Validation error: {format_validation_error(e)}") | |
| raise ValueError(format_validation_error(e)) | |
| context = { | |
| "dataset_id": dataset_id, | |
| "split": split, | |
| "sample_size": sample_size, | |
| "config_name": config_name, | |
| "operation": "analyze_dataset_features" | |
| } | |
| logger.info( | |
| f"Analyzing features for dataset: {dataset_id}, split: {split}, " | |
| f"sample_size: {sample_size}" | |
| + (f", config: {config_name}" if config_name else "") | |
| ) | |
| try: | |
| # Get dataset service | |
| service = get_dataset_service(hf_api_token=hf_api_token) | |
| # Try to get statistics from Dataset Viewer API first (more efficient and complete) | |
| viewer_stats = service.get_dataset_statistics( | |
| dataset_id=dataset_id, | |
| split=split, | |
| config_name=config_name | |
| ) | |
| if viewer_stats is not None: | |
| # Use Dataset Viewer statistics (full dataset, no sampling needed) | |
| logger.info(f"Using Dataset Viewer statistics for {dataset_id}") | |
| return _convert_viewer_statistics_to_analysis( | |
| viewer_stats, dataset_id, config_name, split | |
| ) | |
| # Fall back to sample-based analysis | |
| logger.info("Dataset Viewer statistics not available, using sample-based analysis") | |
| sample_data = service.load_dataset_sample( | |
| dataset_id=dataset_id, | |
| split=split, | |
| num_samples=sample_size, | |
| config_name=config_name, | |
| streaming=True, | |
| ) | |
| # Perform feature analysis | |
| features_analysis = {} | |
| data_samples = sample_data["data"] | |
| if not data_samples: | |
| raise DatasetServiceError("No data samples available for analysis") | |
| # Determine feature names from first sample | |
| first_sample = data_samples[0] | |
| if not isinstance(first_sample, dict): | |
| raise DatasetServiceError( | |
| "Dataset samples are not in expected dictionary format" | |
| ) | |
| feature_names = list(first_sample.keys()) | |
| # Analyze each feature | |
| for feature_name in feature_names: | |
| logger.debug(f"Analyzing feature: {feature_name}") | |
| feature_analysis = _analyze_single_feature(feature_name, data_samples) | |
| features_analysis[feature_name] = feature_analysis | |
| # Generate overall analysis | |
| analysis_result = { | |
| "dataset_info": { | |
| "dataset_id": dataset_id, | |
| "config_name": config_name, | |
| "split": split, | |
| "total_features": len(feature_names), | |
| "sample_size_used": len(data_samples), | |
| "sample_size_requested": sample_size, | |
| }, | |
| "sample_info": { | |
| "sampling_method": "sequential_head", | |
| "represents_full_dataset": len(data_samples) >= sample_size, | |
| "analysis_timestamp": sample_data.get("_sampled_at"), | |
| }, | |
| "features": features_analysis, | |
| "summary": _generate_analysis_summary(features_analysis, len(data_samples)), | |
| } | |
| logger.info( | |
| f"Successfully analyzed {len(feature_names)} features from {dataset_id}" | |
| ) | |
| return analysis_result | |
| except DatasetNotFoundError as e: | |
| log_error_with_context(e, context, level=logging.WARNING) | |
| error_response = format_error_response(e, context) | |
| logger.info(f"Dataset/split not found suggestions: {error_response.get('suggestions', [])}") | |
| raise | |
| except AuthenticationError as e: | |
| log_error_with_context(e, context, level=logging.WARNING) | |
| error_response = format_error_response(e, context) | |
| logger.info(f"Authentication error guidance: {error_response.get('suggestions', [])}") | |
| raise | |
| except NetworkError as e: | |
| log_error_with_context(e, context) | |
| error_response = format_error_response(e, context) | |
| logger.info(f"Network error guidance: {error_response.get('suggestions', [])}") | |
| raise | |
| except Exception as e: | |
| log_error_with_context(e, context) | |
| raise DatasetServiceError(f"Failed to analyze dataset features: {str(e)}") from e | |
| def _convert_viewer_statistics_to_analysis( | |
| viewer_stats: Dict[str, Any], | |
| dataset_id: str, | |
| config_name: Optional[str], | |
| split: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convert Dataset Viewer API statistics to our analysis format. | |
| Supports all Dataset Viewer column types: | |
| - Numerical: int, float | |
| - Categorical: class_label, string_label, bool | |
| - Text: string_text | |
| - Media: image, audio | |
| - Structured: list | |
| Args: | |
| viewer_stats: Statistics from Dataset Viewer API | |
| dataset_id: Dataset identifier | |
| config_name: Configuration name | |
| split: Split name | |
| Returns: | |
| Dictionary in our standard analysis format | |
| """ | |
| num_examples = viewer_stats.get('num_examples', 0) | |
| statistics_list = viewer_stats.get('statistics', []) | |
| features_analysis = {} | |
| for col_stat in statistics_list: | |
| column_name = col_stat.get('column_name', 'unknown') | |
| column_type = col_stat.get('column_type', 'unknown') | |
| column_statistics = col_stat.get('column_statistics', {}) | |
| # Convert to our format based on column type | |
| if column_type == 'string_text': | |
| # Text features: character length statistics | |
| features_analysis[column_name] = { | |
| 'feature_type': 'text', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': 0, # Not provided by viewer for text | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'min_length': column_statistics.get('min', 0), | |
| 'max_length': column_statistics.get('max', 0), | |
| 'mean_length': column_statistics.get('mean', 0), | |
| 'median_length': column_statistics.get('median', 0), | |
| 'std_length': column_statistics.get('std', 0), | |
| 'histogram': column_statistics.get('histogram', {}), | |
| }, | |
| 'sample_values': [], | |
| } | |
| elif column_type in ['class_label', 'string_label']: | |
| # Categorical features: frequency distributions | |
| frequencies = column_statistics.get('frequencies', {}) | |
| features_analysis[column_name] = { | |
| 'feature_type': 'categorical', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': column_statistics.get('n_unique', len(frequencies)), | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'unique_count': column_statistics.get('n_unique', len(frequencies)), | |
| 'frequencies': frequencies, | |
| 'most_common': [(k, v) for k, v in sorted(frequencies.items(), key=lambda x: x[1], reverse=True)], | |
| 'top_value': max(frequencies.items(), key=lambda x: x[1]) if frequencies else None, | |
| 'no_label_count': column_statistics.get('no_label_count', 0), | |
| 'no_label_proportion': column_statistics.get('no_label_proportion', 0.0), | |
| }, | |
| 'sample_values': list(frequencies.keys())[:5], | |
| } | |
| elif column_type == 'bool': | |
| # Boolean features: True/False frequencies | |
| frequencies = column_statistics.get('frequencies', {}) | |
| features_analysis[column_name] = { | |
| 'feature_type': 'boolean', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': len(frequencies), | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'frequencies': frequencies, | |
| }, | |
| 'sample_values': list(frequencies.keys()), | |
| } | |
| elif column_type in ['int', 'float']: | |
| # Numerical features: statistical measures | |
| features_analysis[column_name] = { | |
| 'feature_type': 'numerical', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': 0, # Not always provided | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'mean': column_statistics.get('mean', 0), | |
| 'median': column_statistics.get('median', 0), | |
| 'min': column_statistics.get('min', 0), | |
| 'max': column_statistics.get('max', 0), | |
| 'std': column_statistics.get('std', 0), | |
| 'histogram': column_statistics.get('histogram', {}), | |
| }, | |
| 'sample_values': [], | |
| } | |
| elif column_type == 'image': | |
| # Image features: dimension statistics | |
| features_analysis[column_name] = { | |
| 'feature_type': 'image', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': 0, | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'min_dimension': column_statistics.get('min', 0), | |
| 'max_dimension': column_statistics.get('max', 0), | |
| 'mean_dimension': column_statistics.get('mean', 0), | |
| 'median_dimension': column_statistics.get('median', 0), | |
| 'std_dimension': column_statistics.get('std', 0), | |
| 'histogram': column_statistics.get('histogram', {}), | |
| }, | |
| 'sample_values': [], | |
| } | |
| elif column_type == 'audio': | |
| # Audio features: duration statistics (in seconds) | |
| features_analysis[column_name] = { | |
| 'feature_type': 'audio', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': 0, | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'min_duration': column_statistics.get('min', 0), | |
| 'max_duration': column_statistics.get('max', 0), | |
| 'mean_duration': column_statistics.get('mean', 0), | |
| 'median_duration': column_statistics.get('median', 0), | |
| 'std_duration': column_statistics.get('std', 0), | |
| 'histogram': column_statistics.get('histogram', {}), | |
| }, | |
| 'sample_values': [], | |
| } | |
| elif column_type == 'list': | |
| # List features: length statistics | |
| features_analysis[column_name] = { | |
| 'feature_type': 'list', | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': 0, | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': { | |
| 'count': num_examples - column_statistics.get('nan_count', 0), | |
| 'min_length': column_statistics.get('min', 0), | |
| 'max_length': column_statistics.get('max', 0), | |
| 'mean_length': column_statistics.get('mean', 0), | |
| 'median_length': column_statistics.get('median', 0), | |
| 'std_length': column_statistics.get('std', 0), | |
| 'histogram': column_statistics.get('histogram', {}), | |
| }, | |
| 'sample_values': [], | |
| } | |
| else: | |
| # Unknown type - provide basic info with all available statistics | |
| features_analysis[column_name] = { | |
| 'feature_type': column_type, | |
| 'missing_count': column_statistics.get('nan_count', 0), | |
| 'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100, | |
| 'unique_count': column_statistics.get('n_unique', 0), | |
| 'total_count': num_examples, | |
| 'non_missing_count': num_examples - column_statistics.get('nan_count', 0), | |
| 'statistics': column_statistics, | |
| 'sample_values': [], | |
| } | |
| # Generate overall analysis | |
| analysis_result = { | |
| "dataset_info": { | |
| "dataset_id": dataset_id, | |
| "config_name": viewer_stats.get('_config_used', config_name), | |
| "split": split, | |
| "total_features": len(features_analysis), | |
| "sample_size_used": num_examples, | |
| "sample_size_requested": num_examples, | |
| }, | |
| "sample_info": { | |
| "sampling_method": "dataset_viewer_api", | |
| "represents_full_dataset": True, | |
| "analysis_timestamp": viewer_stats.get('_cached_at'), | |
| "partial": viewer_stats.get('partial', False), | |
| }, | |
| "features": features_analysis, | |
| "summary": _generate_analysis_summary(features_analysis, num_examples), | |
| } | |
| return analysis_result | |
| def _analyze_single_feature( | |
| feature_name: str, data_samples: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze a single feature across all data samples. | |
| Args: | |
| feature_name: Name of the feature to analyze | |
| data_samples: List of data sample dictionaries | |
| Returns: | |
| Dictionary containing feature analysis results | |
| """ | |
| # Extract values for this feature | |
| values = [] | |
| missing_count = 0 | |
| for sample in data_samples: | |
| value = sample.get(feature_name) | |
| if ( | |
| value is None | |
| or value == "" | |
| or (isinstance(value, float) and str(value).lower() == "nan") | |
| ): | |
| missing_count += 1 | |
| else: | |
| values.append(value) | |
| total_count = len(data_samples) | |
| missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0 | |
| # Determine feature type and compute statistics | |
| feature_type, statistics_dict = _determine_feature_type_and_stats(values) | |
| # Count unique values | |
| unique_count = len(set(str(v) for v in values)) if values else 0 | |
| return { | |
| "feature_type": feature_type, | |
| "missing_count": missing_count, | |
| "missing_percentage": missing_percentage, | |
| "unique_count": unique_count, | |
| "total_count": total_count, | |
| "non_missing_count": len(values), | |
| "statistics": statistics_dict, | |
| "sample_values": values[:5] if values else [], # First 5 values as examples | |
| } | |
| def _determine_feature_type_and_stats(values: List[Any]) -> tuple[str, Dict[str, Any]]: | |
| """ | |
| Determine the type of a feature and compute appropriate statistics. | |
| Args: | |
| values: List of non-missing values for the feature | |
| Returns: | |
| Tuple of (feature_type, statistics_dict) | |
| """ | |
| if not values: | |
| return "unknown", {} | |
| # Check if all values are numeric | |
| numeric_values = [] | |
| for value in values: | |
| try: | |
| if isinstance(value, (int, float)): | |
| numeric_values.append(float(value)) | |
| elif isinstance(value, str): | |
| # Try to convert string to number | |
| numeric_values.append(float(value)) | |
| else: | |
| # Not numeric | |
| break | |
| except (ValueError, TypeError): | |
| # Not numeric | |
| break | |
| else: | |
| # All values are numeric | |
| if len(numeric_values) == len(values): | |
| return "numerical", _compute_numerical_statistics(numeric_values) | |
| # Check if values are boolean-like | |
| boolean_values = set(str(v).lower() for v in values) | |
| if boolean_values.issubset({"true", "false", "1", "0", "yes", "no"}): | |
| return "boolean", _compute_categorical_statistics(values) | |
| # Check if it's text (strings with average length > 10) | |
| if all(isinstance(v, str) for v in values): | |
| avg_length = sum(len(v) for v in values) / len(values) | |
| if avg_length > 10: | |
| return "text", _compute_text_statistics(values) | |
| # Default to categorical | |
| return "categorical", _compute_categorical_statistics(values) | |
| def _compute_numerical_statistics(values: List[float]) -> Dict[str, Any]: | |
| """Compute statistics for numerical features.""" | |
| if not values: | |
| return {} | |
| try: | |
| stats = { | |
| "count": len(values), | |
| "mean": statistics.mean(values), | |
| "median": statistics.median(values), | |
| "min": min(values), | |
| "max": max(values), | |
| "range": max(values) - min(values), | |
| } | |
| if len(values) > 1: | |
| stats["std"] = statistics.stdev(values) | |
| stats["variance"] = statistics.variance(values) | |
| # Quartiles | |
| sorted_values = sorted(values) | |
| n = len(sorted_values) | |
| if n >= 4: | |
| stats["q1"] = sorted_values[n // 4] | |
| stats["q3"] = sorted_values[3 * n // 4] | |
| stats["iqr"] = stats["q3"] - stats["q1"] | |
| return stats | |
| except Exception as e: | |
| logger.warning(f"Failed to compute numerical statistics: {e}") | |
| return {"count": len(values), "error": str(e)} | |
| def _compute_categorical_statistics(values: List[Any]) -> Dict[str, Any]: | |
| """Compute statistics for categorical features.""" | |
| if not values: | |
| return {} | |
| try: | |
| # Convert all values to strings for consistent counting | |
| str_values = [str(v) for v in values] | |
| value_counts = Counter(str_values) | |
| stats = { | |
| "count": len(values), | |
| "unique_count": len(value_counts), | |
| "most_common": value_counts.most_common(MAX_UNIQUE_VALUES_TO_SHOW), | |
| "top_value": value_counts.most_common(1)[0] if value_counts else None, | |
| } | |
| # Calculate entropy (measure of diversity) | |
| if len(value_counts) > 1: | |
| total = len(str_values) | |
| entropy = -sum( | |
| (count / total) * (count / total).bit_length() | |
| for count in value_counts.values() | |
| if count > 0 | |
| ) | |
| stats["entropy"] = entropy | |
| return stats | |
| except Exception as e: | |
| logger.warning(f"Failed to compute categorical statistics: {e}") | |
| return {"count": len(values), "error": str(e)} | |
| def _compute_text_statistics(values: List[str]) -> Dict[str, Any]: | |
| """Compute statistics for text features.""" | |
| if not values: | |
| return {} | |
| try: | |
| lengths = [len(v) for v in values] | |
| word_counts = [len(v.split()) for v in values] | |
| stats = { | |
| "count": len(values), | |
| "avg_length": statistics.mean(lengths), | |
| "min_length": min(lengths), | |
| "max_length": max(lengths), | |
| "avg_word_count": statistics.mean(word_counts), | |
| "min_word_count": min(word_counts), | |
| "max_word_count": max(word_counts), | |
| } | |
| # Sample of values (first few) | |
| stats["sample_texts"] = values[:3] | |
| return stats | |
| except Exception as e: | |
| logger.warning(f"Failed to compute text statistics: {e}") | |
| return {"count": len(values), "error": str(e)} | |
| def _generate_analysis_summary( | |
| features_analysis: Dict[str, Dict[str, Any]], sample_size: int | |
| ) -> str: | |
| """Generate a human-readable summary of the analysis.""" | |
| if not features_analysis: | |
| return "No features analyzed" | |
| total_features = len(features_analysis) | |
| # Count feature types | |
| type_counts = Counter( | |
| analysis.get("feature_type", "unknown") | |
| for analysis in features_analysis.values() | |
| ) | |
| # Calculate average missing rate | |
| missing_rates = [ | |
| analysis.get("missing_percentage", 0) for analysis in features_analysis.values() | |
| ] | |
| avg_missing = statistics.mean(missing_rates) if missing_rates else 0 | |
| summary_parts = [f"Analyzed {total_features} features from {sample_size} samples"] | |
| # Feature type breakdown | |
| type_summary = [] | |
| for ftype, count in type_counts.most_common(): | |
| type_summary.append(f"{count} {ftype}") | |
| if type_summary: | |
| summary_parts.append(f"Types: {', '.join(type_summary)}") | |
| # Missing data summary | |
| if avg_missing > 0: | |
| summary_parts.append(f"Avg missing: {avg_missing:.1f}%") | |
| return " | ".join(summary_parts) | |