Spaces:
Running
Running
File size: 25,714 Bytes
11df203 2762e2a 11df203 2762e2a 43642a4 11df203 2762e2a 2b910cc 2762e2a 2b910cc 2a623ac 2762e2a 2b910cc 2762e2a ab96cfe 2762e2a c2830c1 2762e2a c2830c1 2762e2a aefe0b6 2a623ac 2762e2a 43642a4 2b910cc 43642a4 2762e2a 2a623ac 2762e2a 2a623ac 2762e2a 2a623ac 2762e2a 43642a4 2762e2a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 |
"""
Basic analysis tools for exploratory data analysis of HuggingFace datasets.
This module provides tools for performing exploratory data analysis including
feature statistics and missing value analysis.
"""
import logging
import statistics
import gradio as gr
from typing import Optional, Dict, Any, List
from collections import Counter
from hf_eda_mcp.services.dataset_service import get_dataset_service, DatasetServiceError
from hf_eda_mcp.integrations.hf_client import DatasetNotFoundError, AuthenticationError, NetworkError
from hf_eda_mcp.validation import (
validate_dataset_id,
validate_config_name,
validate_split_name,
validate_sample_size,
ValidationError,
format_validation_error,
)
from hf_eda_mcp.error_handling import format_error_response, log_error_with_context
logger = logging.getLogger(__name__)
# Default constants (can be overridden by config)
DEFAULT_ANALYSIS_SAMPLE_SIZE = 1000
MAX_UNIQUE_VALUES_TO_SHOW = 20
def analyze_dataset_features(
dataset_id: str,
split: str = "train",
sample_size: int = DEFAULT_ANALYSIS_SAMPLE_SIZE,
config_name: Optional[str] = None,
hf_api_token: gr.Header = "",
) -> Dict[str, Any]:
"""
Perform basic exploratory analysis on dataset features.
This function analyzes dataset features to provide insights into data types,
distributions, missing values, and data quality. It handles different data
types (numerical, categorical, text) appropriately and generates comprehensive
statistics for each feature.
Args:
dataset_id: HuggingFace dataset identifier (e.g., 'imdb', 'squad')
split: Dataset split to analyze (default: 'train')
sample_size: Number of samples to use for analysis (default: 1000, max: 50000)
config_name: Optional configuration name for multi-config datasets
hf_api_token: Header parsed by Gradio when hf_api_token is provided in MCP configuration headers
Returns:
Dictionary containing comprehensive feature analysis:
- dataset_info: Basic dataset information
- sample_info: Information about the sample used for analysis
- features: Dictionary with analysis for each feature including:
- feature_type: Detected type (numerical, categorical, text, etc.)
- missing_count: Number of missing/null values
- missing_percentage: Percentage of missing values
- unique_count: Number of unique values
- statistics: Type-specific statistics (mean, std for numerical; top values for categorical)
- summary: Overall analysis summary
Raises:
ValueError: If inputs are invalid
DatasetNotFoundError: If dataset or split doesn't exist
AuthenticationError: If dataset is private and authentication fails
DatasetServiceError: If analysis fails for other reasons
Example:
>>> analysis = analyze_dataset_features("imdb", sample_size=500)
>>> for feature_name, feature_analysis in analysis['features'].items():
... print(f"{feature_name}: {feature_analysis['feature_type']}")
... print(f" Missing: {feature_analysis['missing_percentage']:.1f}%")
"""
# Handle empty strings from Gradio (convert to None)
if config_name == "":
config_name = None
# Input validation using centralized validation
try:
dataset_id = validate_dataset_id(dataset_id)
config_name = validate_config_name(config_name)
split = validate_split_name(split)
sample_size = validate_sample_size(sample_size, "sample_size")
except ValidationError as e:
logger.error(f"Validation error: {format_validation_error(e)}")
raise ValueError(format_validation_error(e))
context = {
"dataset_id": dataset_id,
"split": split,
"sample_size": sample_size,
"config_name": config_name,
"operation": "analyze_dataset_features"
}
logger.info(
f"Analyzing features for dataset: {dataset_id}, split: {split}, "
f"sample_size: {sample_size}"
+ (f", config: {config_name}" if config_name else "")
)
try:
# Get dataset service
service = get_dataset_service(hf_api_token=hf_api_token)
# Try to get statistics from Dataset Viewer API first (more efficient and complete)
viewer_stats = service.get_dataset_statistics(
dataset_id=dataset_id,
split=split,
config_name=config_name
)
if viewer_stats is not None:
# Use Dataset Viewer statistics (full dataset, no sampling needed)
logger.info(f"Using Dataset Viewer statistics for {dataset_id}")
return _convert_viewer_statistics_to_analysis(
viewer_stats, dataset_id, config_name, split
)
# Fall back to sample-based analysis
logger.info("Dataset Viewer statistics not available, using sample-based analysis")
sample_data = service.load_dataset_sample(
dataset_id=dataset_id,
split=split,
num_samples=sample_size,
config_name=config_name,
streaming=True,
)
# Perform feature analysis
features_analysis = {}
data_samples = sample_data["data"]
if not data_samples:
raise DatasetServiceError("No data samples available for analysis")
# Determine feature names from first sample
first_sample = data_samples[0]
if not isinstance(first_sample, dict):
raise DatasetServiceError(
"Dataset samples are not in expected dictionary format"
)
feature_names = list(first_sample.keys())
# Analyze each feature
for feature_name in feature_names:
logger.debug(f"Analyzing feature: {feature_name}")
feature_analysis = _analyze_single_feature(feature_name, data_samples)
features_analysis[feature_name] = feature_analysis
# Generate overall analysis
analysis_result = {
"dataset_info": {
"dataset_id": dataset_id,
"config_name": config_name,
"split": split,
"total_features": len(feature_names),
"sample_size_used": len(data_samples),
"sample_size_requested": sample_size,
},
"sample_info": {
"sampling_method": "sequential_head",
"represents_full_dataset": len(data_samples) >= sample_size,
"analysis_timestamp": sample_data.get("_sampled_at"),
},
"features": features_analysis,
"summary": _generate_analysis_summary(features_analysis, len(data_samples)),
}
logger.info(
f"Successfully analyzed {len(feature_names)} features from {dataset_id}"
)
return analysis_result
except DatasetNotFoundError as e:
log_error_with_context(e, context, level=logging.WARNING)
error_response = format_error_response(e, context)
logger.info(f"Dataset/split not found suggestions: {error_response.get('suggestions', [])}")
raise
except AuthenticationError as e:
log_error_with_context(e, context, level=logging.WARNING)
error_response = format_error_response(e, context)
logger.info(f"Authentication error guidance: {error_response.get('suggestions', [])}")
raise
except NetworkError as e:
log_error_with_context(e, context)
error_response = format_error_response(e, context)
logger.info(f"Network error guidance: {error_response.get('suggestions', [])}")
raise
except Exception as e:
log_error_with_context(e, context)
raise DatasetServiceError(f"Failed to analyze dataset features: {str(e)}") from e
def _convert_viewer_statistics_to_analysis(
viewer_stats: Dict[str, Any],
dataset_id: str,
config_name: Optional[str],
split: str
) -> Dict[str, Any]:
"""
Convert Dataset Viewer API statistics to our analysis format.
Supports all Dataset Viewer column types:
- Numerical: int, float
- Categorical: class_label, string_label, bool
- Text: string_text
- Media: image, audio
- Structured: list
Args:
viewer_stats: Statistics from Dataset Viewer API
dataset_id: Dataset identifier
config_name: Configuration name
split: Split name
Returns:
Dictionary in our standard analysis format
"""
num_examples = viewer_stats.get('num_examples', 0)
statistics_list = viewer_stats.get('statistics', [])
features_analysis = {}
for col_stat in statistics_list:
column_name = col_stat.get('column_name', 'unknown')
column_type = col_stat.get('column_type', 'unknown')
column_statistics = col_stat.get('column_statistics', {})
# Convert to our format based on column type
if column_type == 'string_text':
# Text features: character length statistics
features_analysis[column_name] = {
'feature_type': 'text',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0, # Not provided by viewer for text
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_length': column_statistics.get('min', 0),
'max_length': column_statistics.get('max', 0),
'mean_length': column_statistics.get('mean', 0),
'median_length': column_statistics.get('median', 0),
'std_length': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type in ['class_label', 'string_label']:
# Categorical features: frequency distributions
frequencies = column_statistics.get('frequencies', {})
features_analysis[column_name] = {
'feature_type': 'categorical',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': column_statistics.get('n_unique', len(frequencies)),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'unique_count': column_statistics.get('n_unique', len(frequencies)),
'frequencies': frequencies,
'most_common': [(k, v) for k, v in sorted(frequencies.items(), key=lambda x: x[1], reverse=True)],
'top_value': max(frequencies.items(), key=lambda x: x[1]) if frequencies else None,
'no_label_count': column_statistics.get('no_label_count', 0),
'no_label_proportion': column_statistics.get('no_label_proportion', 0.0),
},
'sample_values': list(frequencies.keys())[:5],
}
elif column_type == 'bool':
# Boolean features: True/False frequencies
frequencies = column_statistics.get('frequencies', {})
features_analysis[column_name] = {
'feature_type': 'boolean',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': len(frequencies),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'frequencies': frequencies,
},
'sample_values': list(frequencies.keys()),
}
elif column_type in ['int', 'float']:
# Numerical features: statistical measures
features_analysis[column_name] = {
'feature_type': 'numerical',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0, # Not always provided
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'mean': column_statistics.get('mean', 0),
'median': column_statistics.get('median', 0),
'min': column_statistics.get('min', 0),
'max': column_statistics.get('max', 0),
'std': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'image':
# Image features: dimension statistics
features_analysis[column_name] = {
'feature_type': 'image',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_dimension': column_statistics.get('min', 0),
'max_dimension': column_statistics.get('max', 0),
'mean_dimension': column_statistics.get('mean', 0),
'median_dimension': column_statistics.get('median', 0),
'std_dimension': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'audio':
# Audio features: duration statistics (in seconds)
features_analysis[column_name] = {
'feature_type': 'audio',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_duration': column_statistics.get('min', 0),
'max_duration': column_statistics.get('max', 0),
'mean_duration': column_statistics.get('mean', 0),
'median_duration': column_statistics.get('median', 0),
'std_duration': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
elif column_type == 'list':
# List features: length statistics
features_analysis[column_name] = {
'feature_type': 'list',
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': 0,
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': {
'count': num_examples - column_statistics.get('nan_count', 0),
'min_length': column_statistics.get('min', 0),
'max_length': column_statistics.get('max', 0),
'mean_length': column_statistics.get('mean', 0),
'median_length': column_statistics.get('median', 0),
'std_length': column_statistics.get('std', 0),
'histogram': column_statistics.get('histogram', {}),
},
'sample_values': [],
}
else:
# Unknown type - provide basic info with all available statistics
features_analysis[column_name] = {
'feature_type': column_type,
'missing_count': column_statistics.get('nan_count', 0),
'missing_percentage': column_statistics.get('nan_proportion', 0.0) * 100,
'unique_count': column_statistics.get('n_unique', 0),
'total_count': num_examples,
'non_missing_count': num_examples - column_statistics.get('nan_count', 0),
'statistics': column_statistics,
'sample_values': [],
}
# Generate overall analysis
analysis_result = {
"dataset_info": {
"dataset_id": dataset_id,
"config_name": viewer_stats.get('_config_used', config_name),
"split": split,
"total_features": len(features_analysis),
"sample_size_used": num_examples,
"sample_size_requested": num_examples,
},
"sample_info": {
"sampling_method": "dataset_viewer_api",
"represents_full_dataset": True,
"analysis_timestamp": viewer_stats.get('_cached_at'),
"partial": viewer_stats.get('partial', False),
},
"features": features_analysis,
"summary": _generate_analysis_summary(features_analysis, num_examples),
}
return analysis_result
def _analyze_single_feature(
feature_name: str, data_samples: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Analyze a single feature across all data samples.
Args:
feature_name: Name of the feature to analyze
data_samples: List of data sample dictionaries
Returns:
Dictionary containing feature analysis results
"""
# Extract values for this feature
values = []
missing_count = 0
for sample in data_samples:
value = sample.get(feature_name)
if (
value is None
or value == ""
or (isinstance(value, float) and str(value).lower() == "nan")
):
missing_count += 1
else:
values.append(value)
total_count = len(data_samples)
missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0
# Determine feature type and compute statistics
feature_type, statistics_dict = _determine_feature_type_and_stats(values)
# Count unique values
unique_count = len(set(str(v) for v in values)) if values else 0
return {
"feature_type": feature_type,
"missing_count": missing_count,
"missing_percentage": missing_percentage,
"unique_count": unique_count,
"total_count": total_count,
"non_missing_count": len(values),
"statistics": statistics_dict,
"sample_values": values[:5] if values else [], # First 5 values as examples
}
def _determine_feature_type_and_stats(values: List[Any]) -> tuple[str, Dict[str, Any]]:
"""
Determine the type of a feature and compute appropriate statistics.
Args:
values: List of non-missing values for the feature
Returns:
Tuple of (feature_type, statistics_dict)
"""
if not values:
return "unknown", {}
# Check if all values are numeric
numeric_values = []
for value in values:
try:
if isinstance(value, (int, float)):
numeric_values.append(float(value))
elif isinstance(value, str):
# Try to convert string to number
numeric_values.append(float(value))
else:
# Not numeric
break
except (ValueError, TypeError):
# Not numeric
break
else:
# All values are numeric
if len(numeric_values) == len(values):
return "numerical", _compute_numerical_statistics(numeric_values)
# Check if values are boolean-like
boolean_values = set(str(v).lower() for v in values)
if boolean_values.issubset({"true", "false", "1", "0", "yes", "no"}):
return "boolean", _compute_categorical_statistics(values)
# Check if it's text (strings with average length > 10)
if all(isinstance(v, str) for v in values):
avg_length = sum(len(v) for v in values) / len(values)
if avg_length > 10:
return "text", _compute_text_statistics(values)
# Default to categorical
return "categorical", _compute_categorical_statistics(values)
def _compute_numerical_statistics(values: List[float]) -> Dict[str, Any]:
"""Compute statistics for numerical features."""
if not values:
return {}
try:
stats = {
"count": len(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"min": min(values),
"max": max(values),
"range": max(values) - min(values),
}
if len(values) > 1:
stats["std"] = statistics.stdev(values)
stats["variance"] = statistics.variance(values)
# Quartiles
sorted_values = sorted(values)
n = len(sorted_values)
if n >= 4:
stats["q1"] = sorted_values[n // 4]
stats["q3"] = sorted_values[3 * n // 4]
stats["iqr"] = stats["q3"] - stats["q1"]
return stats
except Exception as e:
logger.warning(f"Failed to compute numerical statistics: {e}")
return {"count": len(values), "error": str(e)}
def _compute_categorical_statistics(values: List[Any]) -> Dict[str, Any]:
"""Compute statistics for categorical features."""
if not values:
return {}
try:
# Convert all values to strings for consistent counting
str_values = [str(v) for v in values]
value_counts = Counter(str_values)
stats = {
"count": len(values),
"unique_count": len(value_counts),
"most_common": value_counts.most_common(MAX_UNIQUE_VALUES_TO_SHOW),
"top_value": value_counts.most_common(1)[0] if value_counts else None,
}
# Calculate entropy (measure of diversity)
if len(value_counts) > 1:
total = len(str_values)
entropy = -sum(
(count / total) * (count / total).bit_length()
for count in value_counts.values()
if count > 0
)
stats["entropy"] = entropy
return stats
except Exception as e:
logger.warning(f"Failed to compute categorical statistics: {e}")
return {"count": len(values), "error": str(e)}
def _compute_text_statistics(values: List[str]) -> Dict[str, Any]:
"""Compute statistics for text features."""
if not values:
return {}
try:
lengths = [len(v) for v in values]
word_counts = [len(v.split()) for v in values]
stats = {
"count": len(values),
"avg_length": statistics.mean(lengths),
"min_length": min(lengths),
"max_length": max(lengths),
"avg_word_count": statistics.mean(word_counts),
"min_word_count": min(word_counts),
"max_word_count": max(word_counts),
}
# Sample of values (first few)
stats["sample_texts"] = values[:3]
return stats
except Exception as e:
logger.warning(f"Failed to compute text statistics: {e}")
return {"count": len(values), "error": str(e)}
def _generate_analysis_summary(
features_analysis: Dict[str, Dict[str, Any]], sample_size: int
) -> str:
"""Generate a human-readable summary of the analysis."""
if not features_analysis:
return "No features analyzed"
total_features = len(features_analysis)
# Count feature types
type_counts = Counter(
analysis.get("feature_type", "unknown")
for analysis in features_analysis.values()
)
# Calculate average missing rate
missing_rates = [
analysis.get("missing_percentage", 0) for analysis in features_analysis.values()
]
avg_missing = statistics.mean(missing_rates) if missing_rates else 0
summary_parts = [f"Analyzed {total_features} features from {sample_size} samples"]
# Feature type breakdown
type_summary = []
for ftype, count in type_counts.most_common():
type_summary.append(f"{count} {ftype}")
if type_summary:
summary_parts.append(f"Types: {', '.join(type_summary)}")
# Missing data summary
if avg_missing > 0:
summary_parts.append(f"Avg missing: {avg_missing:.1f}%")
return " | ".join(summary_parts)
|