KhalilGuetari's picture
change order of args for gr headers
c2830c1
"""
Basic analysis tools for exploratory data analysis of HuggingFace datasets.
This module provides tools for performing exploratory data analysis including
feature statistics and missing value analysis.
"""
import logging
import statistics
import gradio as gr
from typing import Optional, Dict, Any, List
from collections import Counter
from hf_eda_mcp.services.dataset_service import get_dataset_service, DatasetServiceError
from hf_eda_mcp.integrations.hf_client import DatasetNotFoundError, AuthenticationError, NetworkError
from hf_eda_mcp.validation import (
validate_dataset_id,
validate_config_name,
validate_split_name,
validate_sample_size,
ValidationError,
format_validation_error,
)
from hf_eda_mcp.error_handling import format_error_response, log_error_with_context
logger = logging.getLogger(__name__)
# Default constants (can be overridden by config)
DEFAULT_ANALYSIS_SAMPLE_SIZE = 1000
MAX_UNIQUE_VALUES_TO_SHOW = 20
def analyze_dataset_features(
dataset_id: str,
split: str = "train",
sample_size: int = DEFAULT_ANALYSIS_SAMPLE_SIZE,
config_name: Optional[str] = None,
hf_api_token: gr.Header = "",
) -> Dict[str, Any]:
"""
Perform basic exploratory analysis on dataset features.
This function analyzes dataset features to provide insights into data types,
distributions, missing values, and data quality. It handles different data
types (numerical, categorical, text) appropriately and generates comprehensive
statistics for each feature.
Args:
dataset_id: HuggingFace dataset identifier (e.g., 'imdb', 'squad')
split: Dataset split to analyze (default: 'train')
sample_size: Number of samples to use for analysis (default: 1000, max: 50000)
config_name: Optional configuration name for multi-config datasets
hf_api_token: Header parsed by Gradio when hf_api_token is provided in MCP configuration headers
Returns:
Dictionary containing comprehensive feature analysis:
- dataset_info: Basic dataset information
- sample_info: Information about the sample used for analysis
- features: Dictionary with analysis for each feature including:
- feature_type: Detected type (numerical, categorical, text, etc.)
- missing_count: Number of missing/null values
- missing_percentage: Percentage of missing values
- unique_count: Number of unique values
- statistics: Type-specific statistics (mean, std for numerical; top values for categorical)
- summary: Overall analysis summary
Raises:
ValueError: If inputs are invalid
DatasetNotFoundError: If dataset or split doesn't exist
AuthenticationError: If dataset is private and authentication fails
DatasetServiceError: If analysis fails for other reasons
Example:
>>> analysis = analyze_dataset_features("imdb", sample_size=500)
>>> for feature_name, feature_analysis in analysis['features'].items():
... print(f"{feature_name}: {feature_analysis['feature_type']}")
... print(f" Missing: {feature_analysis['missing_percentage']:.1f}%")
"""
# Handle empty strings from Gradio (convert to None)
if config_name == "":
config_name = None
# Input validation using centralized validation
try:
dataset_id = validate_dataset_id(dataset_id)
config_name = validate_config_name(config_name)
split = validate_split_name(split)
sample_size = validate_sample_size(sample_size, "sample_size")
except ValidationError as e:
logger.error(f"Validation error: {format_validation_error(e)}")
raise ValueError(format_validation_error(e))
context = {
"dataset_id": dataset_id,
"split": split,
"sample_size": sample_size,
"config_name": config_name,
"operation": "analyze_dataset_features"
}
logger.info(
f"Analyzing features for dataset: {dataset_id}, split: {split}, "
f"sample_size: {sample_size}"
+ (f", config: {config_name}" if config_name else "")
)
try:
# Get dataset service
service = get_dataset_service(hf_api_token=hf_api_token)
# Try to get statistics from Dataset Viewer API first (more efficient and complete)
viewer_stats = service.get_dataset_statistics(
dataset_id=dataset_id,
split=split,
config_name=config_name
)
if viewer_stats is not None:
# Use Dataset Viewer statistics (full dataset, no sampling needed)
logger.info(f"Using Dataset Viewer statistics for {dataset_id}")
return _convert_viewer_statistics_to_analysis(
viewer_stats, dataset_id, config_name, split
)
# Fall back to sample-based analysis
logger.info("Dataset Viewer statistics not available, using sample-based analysis")
sample_data = service.load_dataset_sample(
dataset_id=dataset_id,
split=split,
num_samples=sample_size,
config_name=config_name,
streaming=True,
)
# Perform feature analysis
features_analysis = {}
data_samples = sample_data["data"]
if not data_samples:
raise DatasetServiceError("No data samples available for analysis")
# Determine feature names from first sample
first_sample = data_samples[0]
if not isinstance(first_sample, dict):
raise DatasetServiceError(
"Dataset samples are not in expected dictionary format"
)
feature_names = list(first_sample.keys())
# Analyze each feature
for feature_name in feature_names:
logger.debug(f"Analyzing feature: {feature_name}")
feature_analysis = _analyze_single_feature(feature_name, data_samples)
features_analysis[feature_name] = feature_analysis
# Generate overall analysis
analysis_result = {
"dataset_info": {
"dataset_id": dataset_id,
"config_name": config_name,
"split": split,
"total_features": len(feature_names),
"sample_size_used": len(data_samples),
"sample_size_requested": sample_size,
},
"sample_info": {
"sampling_method": "sequential_head",
"represents_full_dataset": len(data_samples) >= sample_size,
"analysis_timestamp": sample_data.get("_sampled_at"),
},
"features": features_analysis,
"summary": _generate_analysis_summary(features_analysis, len(data_samples)),
}
logger.info(
f"Successfully analyzed {len(feature_names)} features from {dataset_id}"
)
return analysis_result
except DatasetNotFoundError as e:
log_error_with_context(e, context, level=logging.WARNING)
error_response = format_error_response(e, context)
logger.info(f"Dataset/split not found suggestions: {error_response.get('suggestions', [])}")
raise
except AuthenticationError as e:
log_error_with_context(e, context, level=logging.WARNING)
error_response = format_error_response(e, context)
logger.info(f"Authentication error guidance: {error_response.get('suggestions', [])}")
raise
except NetworkError as e:
log_error_with_context(e, context)
error_response = format_error_response(e, context)
logger.info(f"Network error guidance: {error_response.get('suggestions', [])}")
raise
except Exception as e:
log_error_with_context(e, context)
raise DatasetServiceError(f"Failed to analyze dataset features: {str(e)}") from e
def _convert_viewer_statistics_to_analysis(
viewer_stats: Dict[str, Any],
dataset_id: str,
config_name: Optional[str],
split: str
) -> Dict[str, Any]:
"""
Convert Dataset Viewer API statistics to our analysis format.
Supports all Dataset Viewer column types:
- Numerical: int, float
- Categorical: class_label, string_label, bool
- Text: string_text
- Media: image, audio
- Structured: list
Args:
viewer_stats: Statistics from Dataset Viewer API
dataset_id: Dataset identifier
config_name: Configuration name
split: Split name
Returns:
Dictionary in our standard analysis format
"""
num_examples = viewer_stats.get('num_examples', 0)
statistics_list = viewer_stats.get('statistics', [])
features_analysis = {}
for col_stat in statistics_list:
column_name = col_stat.get('column_name', 'unknown')
column_type = col_stat.get('column_type', 'unknown')
column_statistics = col_stat.get('column_statistics', {})
# Convert to our format based on column type
if column_type == 'string_text':
# Text features: character length statistics
features_analysis[column_name] = {
'feature_type': 'text',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0, # Not provided by viewer for text
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_length': column_statistics.get('min', 0),
'max_length': column_statistics.get('max', 0),
'mean_length': column_statistics.get('mean', 0),
'median_length': column_statistics.get('median', 0),
'std_length': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type in ['class_label', 'string_label']:
# Categorical features: frequency distributions
frequencies = column_statistics.get('frequencies', {})
features_analysis[column_name] = {
'feature_type': 'categorical',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': column_statistics.get('n_unique', len(frequencies)),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'unique_count': column_statistics.get('n_unique', len(frequencies)),
'frequencies': frequencies,
'most_common': [(k, v) for k, v in sorted(frequencies.items(), key=lambda x: x[1], reverse=True)],
'top_value': max(frequencies.items(), key=lambda x: x[1]) if frequencies else None,
'no_label_count': column_statistics.get('no_label_count', 0),
'no_label_proportion': column_statistics.get('no_label_proportion', 0.0),
},
'sample_values': list(frequencies.keys())[:5],
}
elif column_type == 'bool':
# Boolean features: True/False frequencies
frequencies = column_statistics.get('frequencies', {})
features_analysis[column_name] = {
'feature_type': 'boolean',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': len(frequencies),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'frequencies': frequencies,
},
'sample_values': list(frequencies.keys()),
}
elif column_type in ['int', 'float']:
# Numerical features: statistical measures
features_analysis[column_name] = {
'feature_type': 'numerical',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0, # Not always provided
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'mean': column_statistics.get('mean', 0),
'median': column_statistics.get('median', 0),
'min': column_statistics.get('min', 0),
'max': column_statistics.get('max', 0),
'std': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'image':
# Image features: dimension statistics
features_analysis[column_name] = {
'feature_type': 'image',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_dimension': column_statistics.get('min', 0),
'max_dimension': column_statistics.get('max', 0),
'mean_dimension': column_statistics.get('mean', 0),
'median_dimension': column_statistics.get('median', 0),
'std_dimension': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'audio':
# Audio features: duration statistics (in seconds)
features_analysis[column_name] = {
'feature_type': 'audio',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_duration': column_statistics.get('min', 0),
'max_duration': column_statistics.get('max', 0),
'mean_duration': column_statistics.get('mean', 0),
'median_duration': column_statistics.get('median', 0),
'std_duration': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'list':
# List features: length statistics
features_analysis[column_name] = {
'feature_type': 'list',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_length': column_statistics.get('min', 0),
'max_length': column_statistics.get('max', 0),
'mean_length': column_statistics.get('mean', 0),
'median_length': column_statistics.get('median', 0),
'std_length': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
else:
# Unknown type - provide basic info with all available statistics
features_analysis[column_name] = {
'feature_type': column_type,
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': column_statistics.get('n_unique', 0),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': column_statistics,
'sample_values': [],
}
# Generate overall analysis
analysis_result = {
"dataset_info": {
"dataset_id": dataset_id,
"config_name": viewer_stats.get('_config_used', config_name),
"split": split,
"total_features": len(features_analysis),
"sample_size_used": num_examples,
"sample_size_requested": num_examples,
},
"sample_info": {
"sampling_method": "dataset_viewer_api",
"represents_full_dataset": True,
"analysis_timestamp": viewer_stats.get('_cached_at'),
"partial": viewer_stats.get('partial', False),
},
"features": features_analysis,
"summary": _generate_analysis_summary(features_analysis, num_examples),
}
return analysis_result
def _analyze_single_feature(
feature_name: str, data_samples: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Analyze a single feature across all data samples.
Args:
feature_name: Name of the feature to analyze
data_samples: List of data sample dictionaries
Returns:
Dictionary containing feature analysis results
"""
# Extract values for this feature
values = []
missing_count = 0
for sample in data_samples:
value = sample.get(feature_name)
if (
value is None
or value == ""
or (isinstance(value, float) and str(value).lower() == "nan")
):
missing_count += 1
else:
values.append(value)
total_count = len(data_samples)
missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0
# Determine feature type and compute statistics
feature_type, statistics_dict = _determine_feature_type_and_stats(values)
# Count unique values
unique_count = len(set(str(v) for v in values)) if values else 0
return {
"feature_type": feature_type,
"missing_count": missing_count,
"missing_percentage": missing_percentage,
"unique_count": unique_count,
"total_count": total_count,
"non_missing_count": len(values),
"statistics": statistics_dict,
"sample_values": values[:5] if values else [], # First 5 values as examples
}
def _determine_feature_type_and_stats(values: List[Any]) -> tuple[str, Dict[str, Any]]:
"""
Determine the type of a feature and compute appropriate statistics.
Args:
values: List of non-missing values for the feature
Returns:
Tuple of (feature_type, statistics_dict)
"""
if not values:
return "unknown", {}
# Check if all values are numeric
numeric_values = []
for value in values:
try:
if isinstance(value, (int, float)):
numeric_values.append(float(value))
elif isinstance(value, str):
# Try to convert string to number
numeric_values.append(float(value))
else:
# Not numeric
break
except (ValueError, TypeError):
# Not numeric
break
else:
# All values are numeric
if len(numeric_values) == len(values):
return "numerical", _compute_numerical_statistics(numeric_values)
# Check if values are boolean-like
boolean_values = set(str(v).lower() for v in values)
if boolean_values.issubset({"true", "false", "1", "0", "yes", "no"}):
return "boolean", _compute_categorical_statistics(values)
# Check if it's text (strings with average length > 10)
if all(isinstance(v, str) for v in values):
avg_length = sum(len(v) for v in values) / len(values)
if avg_length > 10:
return "text", _compute_text_statistics(values)
# Default to categorical
return "categorical", _compute_categorical_statistics(values)
def _compute_numerical_statistics(values: List[float]) -> Dict[str, Any]:
"""Compute statistics for numerical features."""
if not values:
return {}
try:
stats = {
"count": len(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"min": min(values),
"max": max(values),
"range": max(values) - min(values),
}
if len(values) > 1:
stats["std"] = statistics.stdev(values)
stats["variance"] = statistics.variance(values)
# Quartiles
sorted_values = sorted(values)
n = len(sorted_values)
if n >= 4:
stats["q1"] = sorted_values[n // 4]
stats["q3"] = sorted_values[3 * n // 4]
stats["iqr"] = stats["q3"] - stats["q1"]
return stats
except Exception as e:
logger.warning(f"Failed to compute numerical statistics: {e}")
return {"count": len(values), "error": str(e)}
def _compute_categorical_statistics(values: List[Any]) -> Dict[str, Any]:
"""Compute statistics for categorical features."""
if not values:
return {}
try:
# Convert all values to strings for consistent counting
str_values = [str(v) for v in values]
value_counts = Counter(str_values)
stats = {
"count": len(values),
"unique_count": len(value_counts),
"most_common": value_counts.most_common(MAX_UNIQUE_VALUES_TO_SHOW),
"top_value": value_counts.most_common(1)[0] if value_counts else None,
}
# Calculate entropy (measure of diversity)
if len(value_counts) > 1:
total = len(str_values)
entropy = -sum(
(count / total) * (count / total).bit_length()
for count in value_counts.values()
if count > 0
)
stats["entropy"] = entropy
return stats
except Exception as e:
logger.warning(f"Failed to compute categorical statistics: {e}")
return {"count": len(values), "error": str(e)}
def _compute_text_statistics(values: List[str]) -> Dict[str, Any]:
"""Compute statistics for text features."""
if not values:
return {}
try:
lengths = [len(v) for v in values]
word_counts = [len(v.split()) for v in values]
stats = {
"count": len(values),
"avg_length": statistics.mean(lengths),
"min_length": min(lengths),
"max_length": max(lengths),
"avg_word_count": statistics.mean(word_counts),
"min_word_count": min(word_counts),
"max_word_count": max(word_counts),
}
# Sample of values (first few)
stats["sample_texts"] = values[:3]
return stats
except Exception as e:
logger.warning(f"Failed to compute text statistics: {e}")
return {"count": len(values), "error": str(e)}
def _generate_analysis_summary(
features_analysis: Dict[str, Dict[str, Any]], sample_size: int
) -> str:
"""Generate a human-readable summary of the analysis."""
if not features_analysis:
return "No features analyzed"
total_features = len(features_analysis)
# Count feature types
type_counts = Counter(
analysis.get("feature_type", "unknown")
for analysis in features_analysis.values()
)
# Calculate average missing rate
missing_rates = [
analysis.get("missing_percentage", 0) for analysis in features_analysis.values()
]
avg_missing = statistics.mean(missing_rates) if missing_rates else 0
summary_parts = [f"Analyzed {total_features} features from {sample_size} samples"]
# Feature type breakdown
type_summary = []
for ftype, count in type_counts.most_common():
type_summary.append(f"{count} {ftype}")
if type_summary:
summary_parts.append(f"Types: {', '.join(type_summary)}")
# Missing data summary
if avg_missing > 0:
summary_parts.append(f"Avg missing: {avg_missing:.1f}%")
return " | ".join(summary_parts)