Spaces:
Running
Running
| """ | |
| Dataset service for centralized dataset operations and caching. | |
| This module provides a centralized service for dataset operations including | |
| metadata caching, dataset loading, and sampling functionality. | |
| """ | |
| import logging | |
| import os | |
| import json | |
| import time | |
| from typing import Optional, Dict, Any | |
| from pathlib import Path | |
| from datasets import load_dataset | |
| from datasets.utils.logging import disable_progress_bar | |
| from hf_eda_mcp.config import get_config | |
| from hf_eda_mcp.integrations.hf_client import ( | |
| HfClient, | |
| DatasetNotFoundError, | |
| AuthenticationError, | |
| NetworkError | |
| ) | |
| from hf_eda_mcp.integrations.dataset_viewer_adapter import DatasetViewerAdapter | |
| from hf_eda_mcp.error_handling import ( | |
| retry_with_backoff, | |
| RetryConfig, | |
| log_error_with_context, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Disable datasets progress bars for cleaner logging | |
| disable_progress_bar() | |
| class DatasetServiceError(Exception): | |
| """Base exception for dataset service errors.""" | |
| pass | |
| class CacheError(DatasetServiceError): | |
| """Raised when cache operations fail.""" | |
| pass | |
| class DatasetNotParquetError(DatasetServiceError): | |
| """Raised when a dataset is not in parquet format but parquet is required.""" | |
| pass | |
| class NoTextColumnsError(DatasetServiceError): | |
| """Raised when a dataset has no text columns for search.""" | |
| pass | |
| class DatasetService: | |
| """ | |
| Centralized service for dataset operations with caching support. | |
| Provides metadata caching, dataset loading, and sampling functionality | |
| while managing authentication and error handling. | |
| """ | |
| def __init__( | |
| self, | |
| cache_dir: Optional[str] = None, | |
| token: Optional[str] = None, | |
| cache_ttl: int = 3600 # 1 hour default TTL | |
| ): | |
| """ | |
| Initialize dataset service with optional caching and authentication. | |
| Args: | |
| cache_dir: Directory for caching metadata and samples | |
| token: HuggingFace authentication token | |
| cache_ttl: Cache time-to-live in seconds (default: 1 hour) | |
| """ | |
| self.hf_client = HfClient(token=token) | |
| self.dataset_viewer = DatasetViewerAdapter(token=token) | |
| self.cache_ttl = cache_ttl | |
| # Set up cache directory | |
| if cache_dir is None: | |
| cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "hf_eda_mcp") | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| # Cache subdirectories | |
| self.metadata_cache_dir = self.cache_dir / "metadata" | |
| self.sample_cache_dir = self.cache_dir / "samples" | |
| self.statistics_cache_dir = self.cache_dir / "statistics" | |
| self.metadata_cache_dir.mkdir(exist_ok=True) | |
| self.sample_cache_dir.mkdir(exist_ok=True) | |
| self.statistics_cache_dir.mkdir(exist_ok=True) | |
| logger.info(f"DatasetService initialized with cache dir: {self.cache_dir}") | |
| def _get_cache_key(self, dataset_id: str, config_name: Optional[str] = None) -> str: | |
| """Generate cache key for dataset metadata.""" | |
| if config_name: | |
| return f"{dataset_id}_{config_name}".replace("/", "_") | |
| return dataset_id.replace("/", "_") | |
| def _get_sample_cache_key( | |
| self, | |
| dataset_id: str, | |
| split: str, | |
| num_samples: int, | |
| config_name: Optional[str] = None | |
| ) -> str: | |
| """Generate cache key for dataset samples.""" | |
| base_key = self._get_cache_key(dataset_id, config_name) | |
| return f"{base_key}_{split}_{num_samples}" | |
| def _get_statistics_cache_key( | |
| self, | |
| dataset_id: str, | |
| split: str, | |
| config_name: Optional[str] = None | |
| ) -> str: | |
| """Generate cache key for dataset statistics.""" | |
| base_key = self._get_cache_key(dataset_id, config_name) | |
| return f"{base_key}_{split}_stats" | |
| def _is_cache_valid(self, cache_file: Path) -> bool: | |
| """Check if cache file exists and is within TTL.""" | |
| if not cache_file.exists(): | |
| return False | |
| # Check if cache is within TTL | |
| cache_age = time.time() - cache_file.stat().st_mtime | |
| return cache_age < self.cache_ttl | |
| def _save_to_cache(self, cache_file: Path, data: Dict[str, Any]) -> None: | |
| """Save data to cache file.""" | |
| try: | |
| cache_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(cache_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| logger.debug(f"Saved data to cache: {cache_file}") | |
| except Exception as e: | |
| logger.warning(f"Failed to save cache file {cache_file}: {e}") | |
| raise CacheError(f"Failed to save cache: {e}") | |
| def _load_from_cache(self, cache_file: Path) -> Optional[Dict[str, Any]]: | |
| """Load data from cache file.""" | |
| try: | |
| if not self._is_cache_valid(cache_file): | |
| return None | |
| with open(cache_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| logger.debug(f"Loaded data from cache: {cache_file}") | |
| return data | |
| except Exception as e: | |
| logger.warning(f"Failed to load cache file {cache_file}: {e}") | |
| return None | |
| def _merge_viewer_data( | |
| self, | |
| hub_metadata: Dict[str, Any], | |
| viewer_data: Dict[str, Any], | |
| config_name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Merge Dataset Viewer API data into Hub metadata. | |
| Enriches the basic Hub metadata with detailed information from the | |
| Dataset Viewer API including split sizes, features schema, and byte sizes. | |
| When no config is specified, returns detailed information for all configs. | |
| Args: | |
| hub_metadata: Basic metadata from Hub API | |
| viewer_data: Detailed data from Dataset Viewer API | |
| config_name: Optional configuration name to extract | |
| Returns: | |
| Merged metadata dictionary | |
| """ | |
| merged = hub_metadata.copy() | |
| # Extract dataset_info from viewer response | |
| dataset_info = viewer_data.get('dataset_info', {}) | |
| if not dataset_info: | |
| logger.warning("No dataset_info in viewer data") | |
| return merged | |
| # Handle two response formats: | |
| # 1. When config is specified in API call: dataset_info is the config data directly | |
| # 2. When no config specified: dataset_info is a dict with config names as keys | |
| if isinstance(dataset_info, dict) and 'config_name' in dataset_info: | |
| # Format 1: Single config data (config was specified in API call) | |
| config_data = dataset_info | |
| self._enrich_with_single_config(merged, config_data) | |
| elif config_name: | |
| # Format 2: Specific config requested | |
| if config_name in dataset_info: | |
| config_data = dataset_info[config_name] | |
| self._enrich_with_single_config(merged, config_data) | |
| else: | |
| logger.warning(f"Config '{config_name}' not found in viewer data") | |
| return merged | |
| else: | |
| # No config specified | |
| if len(dataset_info) == 1: | |
| # Only one config - use single config format for consistency | |
| config_data = next(iter(dataset_info.values())) | |
| self._enrich_with_single_config(merged, config_data) | |
| else: | |
| # Multiple configs - return all configs with detailed information | |
| self._enrich_with_all_configs(merged, dataset_info) | |
| return merged | |
| def _enrich_with_single_config(self, merged: Dict[str, Any], config_data: Dict[str, Any]) -> None: | |
| """ | |
| Enrich metadata with a single config's data. | |
| Args: | |
| merged: Metadata dictionary to enrich (modified in place) | |
| config_data: Configuration data from Dataset Viewer API | |
| """ | |
| # Enrich features with detailed schema from viewer | |
| if 'features' in config_data: | |
| merged['features'] = config_data['features'] | |
| # Enrich splits with actual sizes | |
| if 'splits' in config_data: | |
| viewer_splits = config_data['splits'] | |
| enriched_splits = {} | |
| for split_name, split_info in viewer_splits.items(): | |
| enriched_splits[split_name] = { | |
| 'num_examples': split_info.get('num_examples', 0), | |
| 'num_bytes': split_info.get('num_bytes', 0) | |
| } | |
| merged['splits'] = enriched_splits | |
| merged['total_splits'] = len(enriched_splits) | |
| # Add dataset size information | |
| if 'dataset_size' in config_data: | |
| merged['dataset_size'] = config_data['dataset_size'] | |
| merged['size_bytes'] = config_data['dataset_size'] | |
| # Update human-readable size | |
| size_bytes = config_data['dataset_size'] | |
| if size_bytes > 0: | |
| merged['size_human'] = self._format_bytes(size_bytes) | |
| if 'download_size' in config_data: | |
| merged['download_size'] = config_data['download_size'] | |
| merged['download_size_human'] = self._format_bytes(config_data['download_size']) | |
| # Add builder and version info | |
| if 'builder_name' in config_data: | |
| merged['builder_name'] = config_data['builder_name'] | |
| if 'version' in config_data: | |
| merged['version'] = config_data['version'] | |
| # Update summary with enriched information | |
| if 'splits' in merged and merged['splits']: | |
| total_examples = sum(s.get('num_examples', 0) for s in merged['splits'].values()) | |
| merged['total_examples'] = total_examples | |
| # Update summary string | |
| split_names = ', '.join(merged['splits'].keys()) | |
| size_str = merged.get('size_human', 'Unknown') | |
| merged['summary'] = ( | |
| f"Dataset: {merged['id']} | " | |
| f"Author: {merged.get('author', 'Unknown')} | " | |
| f"Size: {size_str} | " | |
| f"Examples: {total_examples:,} | " | |
| f"Downloads: {merged.get('downloads', 0):,} | " | |
| f"Likes: {merged.get('likes', 0)} | " | |
| f"Splits: {split_names}" | |
| ) | |
| def _enrich_with_all_configs(self, merged: Dict[str, Any], dataset_info: Dict[str, Any]) -> None: | |
| """ | |
| Enrich metadata with all configs' data. | |
| Creates a detailed 'config_details' list with information for each config. | |
| Args: | |
| merged: Metadata dictionary to enrich (modified in place) | |
| dataset_info: Dict mapping config names to their data | |
| """ | |
| config_details = [] | |
| total_dataset_size = 0 | |
| total_download_size = 0 | |
| total_examples_all_configs = 0 | |
| for cfg_name, cfg_data in dataset_info.items(): | |
| config_detail = { | |
| 'config_name': cfg_name, | |
| 'features': cfg_data.get('features', {}), | |
| 'splits': {}, | |
| 'dataset_size': cfg_data.get('dataset_size', 0), | |
| 'download_size': cfg_data.get('download_size', 0), | |
| 'builder_name': cfg_data.get('builder_name', ''), | |
| 'version': cfg_data.get('version', {}), | |
| } | |
| # Process splits for this config | |
| if 'splits' in cfg_data: | |
| for split_name, split_info in cfg_data['splits'].items(): | |
| config_detail['splits'][split_name] = { | |
| 'num_examples': split_info.get('num_examples', 0), | |
| 'num_bytes': split_info.get('num_bytes', 0) | |
| } | |
| # Calculate totals for this config | |
| config_total_examples = sum( | |
| s.get('num_examples', 0) for s in config_detail['splits'].values() | |
| ) | |
| config_detail['total_examples'] = config_total_examples | |
| config_detail['dataset_size_human'] = self._format_bytes(config_detail['dataset_size']) | |
| config_detail['download_size_human'] = self._format_bytes(config_detail['download_size']) | |
| config_details.append(config_detail) | |
| # Accumulate totals across all configs | |
| total_dataset_size += config_detail['dataset_size'] | |
| total_download_size += config_detail['download_size'] | |
| total_examples_all_configs += config_total_examples | |
| # Add detailed config information | |
| merged['config_details'] = config_details | |
| # Remove redundant top-level fields since they're in config_details | |
| merged.pop('splits', None) | |
| merged.pop('features', None) | |
| # Add aggregate information | |
| merged['total_dataset_size'] = total_dataset_size | |
| merged['total_dataset_size_human'] = self._format_bytes(total_dataset_size) | |
| merged['total_download_size'] = total_download_size | |
| merged['total_download_size_human'] = self._format_bytes(total_download_size) | |
| merged['total_examples'] = total_examples_all_configs | |
| # Update summary for multi-config datasets | |
| merged['summary'] = ( | |
| f"Dataset: {merged['id']} | " | |
| f"Author: {merged.get('author', 'Unknown')} | " | |
| f"Configs: {len(config_details)} | " | |
| f"Total Size: {merged['total_dataset_size_human']} | " | |
| f"Total Examples: {total_examples_all_configs:,} | " | |
| f"Downloads: {merged.get('downloads', 0):,} | " | |
| f"Likes: {merged.get('likes', 0)}" | |
| ) | |
| def _format_bytes(self, size_bytes: int) -> str: | |
| """Format bytes into human-readable string.""" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.2f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.2f} PB" | |
| def load_dataset_info(self, dataset_id: str, config_name: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Load dataset information from HuggingFace Hub with caching. | |
| Combines data from both the Hub API and Dataset Viewer API to provide | |
| comprehensive metadata including split sizes, features schema, and more. | |
| Includes automatic retry logic for transient failures and comprehensive | |
| error handling with helpful suggestions. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| config_name: Optional configuration name | |
| Returns: | |
| Dictionary containing dataset metadata | |
| Raises: | |
| DatasetNotFoundError: If dataset doesn't exist | |
| AuthenticationError: If dataset is private and authentication fails | |
| NetworkError: If network operations fail after retries | |
| """ | |
| context = { | |
| "dataset_id": dataset_id, | |
| "config_name": config_name, | |
| "operation": "load_dataset_info" | |
| } | |
| cache_key = self._get_cache_key(dataset_id, config_name) | |
| cache_file = self.metadata_cache_dir / f"{cache_key}.json" | |
| # Try to load from cache first | |
| cached_data = self._load_from_cache(cache_file) | |
| if cached_data is not None: | |
| logger.debug(f"Using cached metadata for {dataset_id}") | |
| return cached_data | |
| # Fetch from HuggingFace Hub with retry logic | |
| try: | |
| logger.info(f"Fetching metadata for dataset: {dataset_id}") | |
| # Get basic metadata from Hub API | |
| metadata = self.hf_client.get_dataset_info(dataset_id, config_name) | |
| # Try to enrich with Dataset Viewer API data | |
| # Use the full dataset ID from the metadata response | |
| try: | |
| full_dataset_id = metadata.get('id', dataset_id) | |
| viewer_data = self.dataset_viewer.get_dataset_information(full_dataset_id, config_name) | |
| metadata = self._merge_viewer_data(metadata, viewer_data, config_name) | |
| logger.debug("Successfully enriched metadata with Dataset Viewer API") | |
| except Exception as e: | |
| # Log but don't fail if viewer API fails - we still have basic metadata | |
| logger.warning(f"Failed to fetch Dataset Viewer data, using basic metadata only: {e}") | |
| # Add cache timestamp | |
| metadata['_cached_at'] = time.time() | |
| # Save to cache (don't fail if caching fails) | |
| try: | |
| self._save_to_cache(cache_file, metadata) | |
| except CacheError as e: | |
| logger.warning(f"Failed to cache metadata, continuing anyway: {e}") | |
| return metadata | |
| except (DatasetNotFoundError, AuthenticationError, NetworkError): | |
| # Re-raise these specific errors with context | |
| log_error_with_context( | |
| Exception(f"Failed to load dataset info for {dataset_id}"), | |
| context, | |
| level=logging.WARNING | |
| ) | |
| raise | |
| except Exception as e: | |
| # Unexpected error | |
| log_error_with_context(e, context) | |
| raise DatasetServiceError(f"Unexpected error loading dataset info: {str(e)}") from e | |
| def load_dataset_sample( | |
| self, | |
| dataset_id: str, | |
| split: str = "train", | |
| num_samples: int = 10, | |
| config_name: Optional[str] = None, | |
| streaming: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Load samples from the specified dataset with caching. | |
| Includes automatic retry logic for transient failures and comprehensive | |
| error handling. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| split: Dataset split to sample from | |
| num_samples: Number of samples to retrieve | |
| config_name: Optional configuration name | |
| streaming: Whether to use streaming mode for large datasets | |
| Returns: | |
| Dictionary containing sampled data and metadata | |
| Raises: | |
| DatasetNotFoundError: If dataset or split doesn't exist | |
| AuthenticationError: If dataset is private and authentication fails | |
| NetworkError: If network operations fail after retries | |
| DatasetServiceError: If sampling fails for other reasons | |
| """ | |
| context = { | |
| "dataset_id": dataset_id, | |
| "split": split, | |
| "num_samples": num_samples, | |
| "config_name": config_name, | |
| "operation": "load_dataset_sample" | |
| } | |
| # For small samples, check cache first | |
| if num_samples <= 100: # Only cache small samples | |
| cache_key = self._get_sample_cache_key(dataset_id, split, num_samples, config_name) | |
| cache_file = self.sample_cache_dir / f"{cache_key}.json" | |
| cached_data = self._load_from_cache(cache_file) | |
| if cached_data is not None: | |
| logger.debug(f"Using cached sample for {dataset_id}") | |
| return cached_data | |
| try: | |
| logger.info(f"Loading sample from dataset: {dataset_id}, split: {split}") | |
| # Load dataset with streaming for efficiency | |
| dataset = load_dataset( | |
| dataset_id, | |
| name=config_name, | |
| split=split, | |
| streaming=streaming, | |
| token=self.hf_client.token | |
| ) | |
| # Take the requested number of samples | |
| if streaming: | |
| # For streaming datasets, take samples from iterator | |
| samples = [] | |
| for i, sample in enumerate(dataset): | |
| if i >= num_samples: | |
| break | |
| samples.append(sample) | |
| else: | |
| # For non-streaming datasets, use select | |
| max_samples = min(num_samples, len(dataset)) | |
| samples = dataset.select(range(max_samples)) | |
| samples = [samples[i] for i in range(len(samples))] | |
| # Get dataset info for schema | |
| dataset_info = self.load_dataset_info(dataset_id, config_name) | |
| # Prepare response | |
| sample_data = { | |
| 'dataset_id': dataset_id, | |
| 'config_name': config_name, | |
| 'split': split, | |
| 'num_samples': len(samples), | |
| 'requested_samples': num_samples, | |
| 'data': samples, | |
| 'schema': dataset_info.get('features', {}), | |
| '_sampled_at': time.time() | |
| } | |
| # Cache small samples (don't fail if caching fails) | |
| if num_samples <= 100: | |
| try: | |
| self._save_to_cache(cache_file, sample_data) | |
| except CacheError as e: | |
| logger.warning(f"Failed to cache sample, continuing anyway: {e}") | |
| return sample_data | |
| except DatasetNotFoundError: | |
| # Re-raise as-is | |
| log_error_with_context( | |
| Exception(f"Dataset or split not found: {dataset_id}/{split}"), | |
| context, | |
| level=logging.WARNING | |
| ) | |
| raise | |
| except AuthenticationError: | |
| # Re-raise as-is | |
| log_error_with_context( | |
| Exception(f"Authentication failed for dataset: {dataset_id}"), | |
| context, | |
| level=logging.WARNING | |
| ) | |
| raise | |
| except Exception as e: | |
| log_error_with_context(e, context) | |
| # Try to provide more specific error messages | |
| error_str = str(e).lower() | |
| if "not found" in error_str or "doesn't exist" in error_str: | |
| if "split" in error_str or split in error_str: | |
| raise DatasetNotFoundError( | |
| f"Split '{split}' not found in dataset '{dataset_id}'. " | |
| f"Available splits may be different." | |
| ) from e | |
| else: | |
| raise DatasetNotFoundError( | |
| f"Dataset '{dataset_id}' not found on HuggingFace Hub." | |
| ) from e | |
| elif "gated" in error_str or "private" in error_str or "authentication" in error_str: | |
| raise AuthenticationError( | |
| f"Authentication required for dataset '{dataset_id}'. " | |
| "Please provide a valid HuggingFace token." | |
| ) from e | |
| elif "timeout" in error_str or "connection" in error_str: | |
| raise NetworkError( | |
| f"Network error while loading dataset sample: {str(e)}" | |
| ) from e | |
| else: | |
| raise DatasetServiceError( | |
| f"Failed to load dataset sample: {str(e)}" | |
| ) from e | |
| def get_dataset_statistics( | |
| self, | |
| dataset_id: str, | |
| split: str = "train", | |
| config_name: Optional[str] = None, | |
| use_cache: bool = True | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get detailed statistics from Dataset Viewer API with caching. | |
| This method provides comprehensive statistics directly from HuggingFace's | |
| Dataset Viewer API, which is more efficient and complete than sampling. | |
| Statistics are only available for datasets with builder_name="parquet". | |
| If statistics are not available, returns None and the caller should fall | |
| back to sample-based analysis. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| split: Dataset split to get statistics for | |
| config_name: Optional configuration name | |
| use_cache: Whether to use cached statistics (default: True) | |
| Returns: | |
| Dictionary containing statistics or None if unavailable: | |
| - num_examples: Total number of examples | |
| - statistics: List of column statistics | |
| - partial: Whether response is partial | |
| - _cached_at: Cache timestamp | |
| Raises: | |
| DatasetServiceError: If the API request fails unexpectedly | |
| """ | |
| context = { | |
| "dataset_id": dataset_id, | |
| "split": split, | |
| "config_name": config_name, | |
| "operation": "get_dataset_statistics" | |
| } | |
| # Check cache first if enabled | |
| if use_cache: | |
| cache_key = self._get_statistics_cache_key(dataset_id, split, config_name) | |
| cache_file = self.statistics_cache_dir / f"{cache_key}.json" | |
| cached_data = self._load_from_cache(cache_file) | |
| if cached_data is not None: | |
| logger.debug(f"Using cached statistics for {dataset_id}/{split}") | |
| return cached_data | |
| try: | |
| # First, check if statistics are available for this dataset | |
| logger.info(f"Checking statistics availability for {dataset_id}") | |
| availability = self._check_statistics_availability(dataset_id, config_name) | |
| if not availability['available']: | |
| logger.info( | |
| f"Statistics not available for {dataset_id}: {availability['reason']}" | |
| ) | |
| return None | |
| # Determine which config to use | |
| if config_name is None: | |
| # Use first available config | |
| available_configs = availability['configs'] | |
| if not available_configs: | |
| logger.warning(f"No configs with statistics found for {dataset_id}") | |
| return None | |
| config_name = available_configs[0] | |
| logger.info(f"Using config '{config_name}' for statistics") | |
| elif config_name not in availability['configs']: | |
| logger.warning( | |
| f"Config '{config_name}' does not support statistics. " | |
| f"Available configs: {availability['configs']}" | |
| ) | |
| return None | |
| # Fetch statistics from API | |
| logger.info(f"Fetching statistics for {dataset_id}/{config_name}/{split}") | |
| statistics = self.dataset_viewer.get_dataset_statistics( | |
| dataset_name=availability.get("full_dataset_id", dataset_id), | |
| config=config_name, | |
| split_name=split | |
| ) | |
| # Add metadata | |
| statistics['_cached_at'] = time.time() | |
| statistics['_config_used'] = config_name | |
| statistics['_dataset_id'] = dataset_id | |
| statistics['_split'] = split | |
| # Cache the results | |
| if use_cache: | |
| try: | |
| self._save_to_cache(cache_file, statistics) | |
| except CacheError as e: | |
| logger.warning(f"Failed to cache statistics: {e}") | |
| logger.info( | |
| f"Successfully fetched statistics for {dataset_id}: " | |
| f"{statistics.get('num_examples', 0)} examples, " | |
| f"{len(statistics.get('statistics', []))} columns" | |
| ) | |
| return statistics | |
| except Exception as e: | |
| # Log but don't fail - caller can fall back to sampling | |
| log_error_with_context(e, context, level=logging.WARNING) | |
| logger.info( | |
| f"Could not fetch statistics for {dataset_id}, " | |
| "caller should use sample-based analysis" | |
| ) | |
| return None | |
| def get_cached_metadata(self, dataset_id: str, config_name: Optional[str] = None) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve cached metadata without making API calls. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| config_name: Optional configuration name | |
| Returns: | |
| Cached metadata dictionary or None if not cached/expired | |
| """ | |
| cache_key = self._get_cache_key(dataset_id, config_name) | |
| cache_file = self.metadata_cache_dir / f"{cache_key}.json" | |
| return self._load_from_cache(cache_file) | |
| def clear_cache(self, dataset_id: Optional[str] = None) -> None: | |
| """ | |
| Clear cached data for a specific dataset or all datasets. | |
| Args: | |
| dataset_id: Optional dataset ID to clear. If None, clears all cache. | |
| """ | |
| try: | |
| if dataset_id is None: | |
| # Clear all cache | |
| for cache_file in self.metadata_cache_dir.glob("*.json"): | |
| cache_file.unlink() | |
| for cache_file in self.sample_cache_dir.glob("*.json"): | |
| cache_file.unlink() | |
| for cache_file in self.statistics_cache_dir.glob("*.json"): | |
| cache_file.unlink() | |
| logger.info("Cleared all cache") | |
| else: | |
| # Clear cache for specific dataset | |
| cache_key = self._get_cache_key(dataset_id) | |
| # Clear metadata cache | |
| for cache_file in self.metadata_cache_dir.glob(f"{cache_key}*.json"): | |
| cache_file.unlink() | |
| # Clear sample cache | |
| for cache_file in self.sample_cache_dir.glob(f"{cache_key}*.json"): | |
| cache_file.unlink() | |
| # Clear statistics cache | |
| for cache_file in self.statistics_cache_dir.glob(f"{cache_key}*.json"): | |
| cache_file.unlink() | |
| logger.info(f"Cleared cache for dataset: {dataset_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clear cache: {e}") | |
| raise CacheError(f"Failed to clear cache: {e}") | |
| def get_cache_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get statistics about the current cache. | |
| Returns: | |
| Dictionary with cache statistics | |
| """ | |
| try: | |
| metadata_files = list(self.metadata_cache_dir.glob("*.json")) | |
| sample_files = list(self.sample_cache_dir.glob("*.json")) | |
| statistics_files = list(self.statistics_cache_dir.glob("*.json")) | |
| # Calculate cache sizes | |
| metadata_size = sum(f.stat().st_size for f in metadata_files) | |
| sample_size = sum(f.stat().st_size for f in sample_files) | |
| statistics_size = sum(f.stat().st_size for f in statistics_files) | |
| return { | |
| 'cache_dir': str(self.cache_dir), | |
| 'metadata_files': len(metadata_files), | |
| 'sample_files': len(sample_files), | |
| 'statistics_files': len(statistics_files), | |
| 'total_files': len(metadata_files) + len(sample_files) + len(statistics_files), | |
| 'metadata_size_bytes': metadata_size, | |
| 'sample_size_bytes': sample_size, | |
| 'statistics_size_bytes': statistics_size, | |
| 'total_size_bytes': metadata_size + sample_size + statistics_size, | |
| 'cache_ttl_seconds': self.cache_ttl | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to get cache stats: {e}") | |
| return {'error': str(e)} | |
| def validate_dataset_access( | |
| self, | |
| dataset_id: str, | |
| config_name: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Validate that a dataset can be accessed with current authentication. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| config_name: Optional configuration name | |
| Returns: | |
| True if dataset is accessible, False otherwise | |
| """ | |
| return self.hf_client.validate_dataset_access(dataset_id, config_name) | |
| def _check_statistics_availability( | |
| self, | |
| dataset_name: str, | |
| config_name: Optional[str] = None | |
| ) -> dict: | |
| """ | |
| Check if statistics are available for a dataset. | |
| Statistics are only available for datasets with builder_name="parquet". | |
| This method checks the dataset information to determine availability. | |
| Args: | |
| dataset_name: HuggingFace dataset identifier | |
| config_name: Optional configuration name | |
| Returns: | |
| Dictionary with availability information: | |
| - available: Boolean indicating if statistics are available | |
| - configs: List of configs with statistics support | |
| - reason: Explanation if statistics are not available | |
| Raises: | |
| DatasetViewerError: If the API request fails | |
| """ | |
| try: | |
| dataset_info = self.load_dataset_info(dataset_name, config_name) | |
| full_dataset_id = dataset_info.get('id', dataset_name) | |
| if len(dataset_info["configs"]) == 1: | |
| # Single config format | |
| builder_name = dataset_info.get('builder_name', '') | |
| is_parquet = builder_name == 'parquet' | |
| configs = [dataset_info["configs"][0]] if is_parquet else [], | |
| reason = 'Statistics available' if is_parquet else f'Statistics only available for parquet datasets (found: {builder_name})' | |
| else: | |
| # Multiple configs format | |
| if config_name is None: | |
| # Take every configs | |
| configs = [] | |
| for cfg_data in dataset_info["config_details"]: | |
| if cfg_data.get('builder_name') == 'parquet': | |
| configs.append(cfg_data.get("config_name")) | |
| is_parquet = len(configs) > 0 | |
| reason = f'Statistics available for {len(configs)} config(s)' if configs else 'No parquet configs found' | |
| else: | |
| configs = [config_name] | |
| builder_name = dataset_info.get('builder_name', '') | |
| is_parquet = builder_name == 'parquet' | |
| reason = f'Statistics available for provided config {config_name}' if is_parquet else f'No parquet found for config {config_name}' | |
| return { | |
| "available": is_parquet, | |
| "full_dataset_id": full_dataset_id, | |
| "configs": configs, | |
| "reason": reason | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error checking statistics availability: {str(e)}" | |
| logger.error(error_msg) | |
| raise DatasetServiceError(error_msg) from e | |
| def search_text_in_dataset( | |
| self, | |
| dataset_id: str, | |
| config_name: str, | |
| split_name: str, | |
| query: str, | |
| offset: int = 0, | |
| length: int = 50 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Search for text in text columns of a dataset using the Dataset Viewer API. | |
| This method delegates to the DatasetViewerAdapter to perform the search. | |
| Only text columns are searched and only parquet datasets are supported. | |
| Args: | |
| dataset_id: HuggingFace dataset identifier | |
| config_name: Configuration name (required) | |
| split_name: Split name (required) | |
| query: Search query (required) | |
| offset: Offset for pagination (default: 0) | |
| length: Number of examples to return (default: 50) | |
| Returns: | |
| Dictionary containing search results from the Dataset Viewer API | |
| Raises: | |
| DatasetNotParquetError: If the dataset is not in parquet format | |
| NoTextColumnsError: If the dataset has no text columns | |
| DatasetServiceError: If the search operation fails | |
| """ | |
| try: | |
| # Check if dataset is in parquet format and has text columns | |
| dataset_info = self.load_dataset_info(dataset_id, config_name) | |
| # Check builder_name for parquet format | |
| # Also check tags as a fallback since builder_name might not be available | |
| builder_name = dataset_info.get('builder_name', '') | |
| tags = dataset_info.get('tags', []) | |
| is_parquet = builder_name == 'parquet' or 'format:parquet' in tags | |
| if not is_parquet: | |
| error_msg = ( | |
| f"Search is only supported for parquet datasets. " | |
| f"Dataset '{dataset_id}' has builder_name='{builder_name}' " | |
| f"and tags={tags}. " | |
| f"Please use a dataset in parquet format." | |
| ) | |
| logger.warning(error_msg) | |
| raise DatasetNotParquetError(error_msg) | |
| # Check if dataset has text columns | |
| features = dataset_info.get('features', {}) | |
| if not features: | |
| error_msg = f"No features found for dataset '{dataset_id}'" | |
| logger.warning(error_msg) | |
| raise DatasetServiceError(error_msg) | |
| # Check for text/string columns | |
| has_text_columns = False | |
| for _, feature_info in features.items(): | |
| # Check for various text types | |
| if isinstance(feature_info, dict): | |
| feature_type = feature_info.get('dtype', '') | |
| elif isinstance(feature_info, str): | |
| feature_type = feature_info | |
| else: | |
| continue | |
| # Check if it's a text column (string, text, or Value with string dtype) | |
| if any(text_type in str(feature_type).lower() for text_type in ['string', 'text']): | |
| has_text_columns = True | |
| break | |
| if not has_text_columns: | |
| error_msg = ( | |
| f"No text columns found in dataset '{dataset_id}'. " | |
| f"Search requires at least one text/string column. " | |
| f"Available features: {list(features.keys())}" | |
| ) | |
| logger.warning(error_msg) | |
| raise NoTextColumnsError(error_msg) | |
| # Perform the search | |
| return self.dataset_viewer.search_text_in_dataset( | |
| dataset_name=dataset_id, | |
| config_name=config_name, | |
| split_name=split_name, | |
| query=query, | |
| offset=offset, | |
| length=length | |
| ) | |
| except (DatasetNotParquetError, NoTextColumnsError): | |
| # Re-raise our custom exceptions | |
| raise | |
| except Exception as e: | |
| error_msg = f"Failed to search in dataset: {str(e)}" | |
| logger.error(error_msg) | |
| raise DatasetServiceError(error_msg) from e | |
| def get_dataset_service(hf_api_token: str) -> DatasetService: | |
| """Get or create the global dataset service instance using current config.""" | |
| config = get_config() | |
| if hf_api_token is None or len(hf_api_token) == 0: | |
| hf_api_token = config.hf_token | |
| dataset_service = DatasetService( | |
| cache_dir=config.cache_dir, | |
| token=hf_api_token | |
| ) | |
| return dataset_service | |