Spaces:
Paused
Paused
| """ | |
| Base classes and types for data sources. | |
| This module defines the abstract base class for all data sources and | |
| common types used throughout the data sources subsystem. | |
| """ | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Dict, Iterator, List, Optional | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class SourceType(Enum): | |
| """Enumeration of supported data source types.""" | |
| FILE = "file" | |
| URL = "url" | |
| GOOGLE_DRIVE = "google_drive" | |
| DROPBOX = "dropbox" | |
| S3 = "s3" | |
| HUGGINGFACE = "huggingface" | |
| GOOGLE_SHEETS = "google_sheets" | |
| DATABASE = "database" | |
| class SourceConfig: | |
| """ | |
| Configuration for a data source. | |
| This dataclass holds the parsed configuration for a single data source, | |
| including type-specific settings and common options. | |
| Attributes: | |
| source_type: The type of data source | |
| source_id: Unique identifier for this source (auto-generated if not provided) | |
| config: The raw configuration dictionary for this source | |
| enabled: Whether this source is enabled | |
| """ | |
| source_type: SourceType | |
| source_id: str | |
| config: Dict[str, Any] | |
| enabled: bool = True | |
| def from_dict(cls, config_dict: Dict[str, Any], index: int = 0) -> "SourceConfig": | |
| """ | |
| Create a SourceConfig from a configuration dictionary. | |
| Args: | |
| config_dict: Dictionary containing source configuration | |
| index: Index in the sources list (used for auto-generated ID) | |
| Returns: | |
| SourceConfig instance | |
| Raises: | |
| ValueError: If the source type is invalid or required fields are missing | |
| """ | |
| type_str = config_dict.get("type") | |
| if not type_str: | |
| raise ValueError("Data source configuration must include 'type' field") | |
| try: | |
| source_type = SourceType(type_str) | |
| except ValueError: | |
| valid_types = [t.value for t in SourceType] | |
| raise ValueError( | |
| f"Invalid data source type '{type_str}'. " | |
| f"Valid types are: {', '.join(valid_types)}" | |
| ) | |
| # Generate source_id if not provided | |
| source_id = config_dict.get("id") or config_dict.get("source_id") | |
| if not source_id: | |
| # Generate ID based on type and index or key identifying info | |
| if source_type == SourceType.FILE: | |
| path = config_dict.get("path", "") | |
| source_id = f"file_{index}_{path.replace('/', '_').replace('.', '_')}" | |
| elif source_type == SourceType.URL: | |
| url = config_dict.get("url", "") | |
| # Use last part of URL path as identifier | |
| source_id = f"url_{index}_{url.split('/')[-1][:30]}" | |
| else: | |
| source_id = f"{source_type.value}_{index}" | |
| enabled = config_dict.get("enabled", True) | |
| return cls( | |
| source_type=source_type, | |
| source_id=source_id, | |
| config=config_dict, | |
| enabled=enabled | |
| ) | |
| class DataSource(ABC): | |
| """ | |
| Abstract base class for all data sources. | |
| Each data source implementation must provide methods for: | |
| - Identifying the source | |
| - Checking availability | |
| - Reading items (with optional partial reading support) | |
| - Reporting total item count | |
| Thread Safety: | |
| Implementations should be thread-safe for concurrent read operations. | |
| Write operations (if any) should be protected by appropriate locking. | |
| """ | |
| def __init__(self, config: SourceConfig): | |
| """ | |
| Initialize the data source. | |
| Args: | |
| config: Source configuration | |
| """ | |
| self._config = config | |
| self._source_id = config.source_id | |
| self._raw_config = config.config | |
| def source_id(self) -> str: | |
| """Get the unique identifier for this source.""" | |
| return self._source_id | |
| def source_type(self) -> SourceType: | |
| """Get the type of this source.""" | |
| return self._config.source_type | |
| def config(self) -> Dict[str, Any]: | |
| """Get the raw configuration dictionary.""" | |
| return self._raw_config | |
| def get_source_id(self) -> str: | |
| """ | |
| Get the unique identifier for this data source. | |
| Returns: | |
| String identifier unique within the DataSourceManager | |
| """ | |
| pass | |
| def is_available(self) -> bool: | |
| """ | |
| Check if the data source is available and accessible. | |
| This method should verify that: | |
| - Required dependencies are installed | |
| - Credentials are valid (if applicable) | |
| - The source location exists and is readable | |
| Returns: | |
| True if the source is ready to read from | |
| """ | |
| pass | |
| def read_items( | |
| self, | |
| start: int = 0, | |
| count: Optional[int] = None | |
| ) -> Iterator[Dict[str, Any]]: | |
| """ | |
| Read items from the data source. | |
| This method yields dictionaries representing annotation items. | |
| For sources that support partial reading, the start and count | |
| parameters allow fetching specific ranges of items. | |
| Args: | |
| start: Index of the first item to read (0-based) | |
| count: Maximum number of items to read (None = all remaining) | |
| Yields: | |
| Dictionary containing item data with at least id_key field | |
| Raises: | |
| RuntimeError: If the source is not available | |
| IOError: If reading fails | |
| """ | |
| pass | |
| def get_total_count(self) -> Optional[int]: | |
| """ | |
| Get the total number of items in the source. | |
| Returns: | |
| Total item count, or None if unknown (e.g., streaming source) | |
| """ | |
| pass | |
| def supports_partial_reading(self) -> bool: | |
| """ | |
| Check if this source supports reading partial ranges. | |
| Partial reading allows loading data in chunks, which is useful | |
| for large datasets or incremental annotation workflows. | |
| Returns: | |
| True if read_items() supports start/count parameters | |
| """ | |
| pass | |
| def validate_config(self) -> List[str]: | |
| """ | |
| Validate the source configuration. | |
| Override this method to add source-specific validation. | |
| Returns: | |
| List of validation error messages (empty if valid) | |
| """ | |
| return [] | |
| def get_status(self) -> Dict[str, Any]: | |
| """ | |
| Get the current status of this data source. | |
| Returns: | |
| Dictionary containing status information: | |
| - source_id: The source identifier | |
| - source_type: The source type | |
| - available: Whether the source is available | |
| - total_count: Total item count (or None) | |
| - supports_partial: Whether partial reading is supported | |
| """ | |
| return { | |
| "source_id": self.source_id, | |
| "source_type": self.source_type.value, | |
| "available": self.is_available(), | |
| "total_count": self.get_total_count(), | |
| "supports_partial": self.supports_partial_reading(), | |
| } | |
| def refresh(self) -> bool: | |
| """ | |
| Refresh the data source (re-fetch from remote, re-validate, etc.). | |
| Override this method for sources that cache data or need | |
| periodic refresh. The default implementation does nothing. | |
| Returns: | |
| True if refresh was successful | |
| """ | |
| return True | |
| def close(self) -> None: | |
| """ | |
| Close the data source and release any resources. | |
| Override this method for sources that hold resources like | |
| database connections or file handles. | |
| """ | |
| pass | |
| def __enter__(self) -> "DataSource": | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
| """Context manager exit.""" | |
| self.close() | |
| def __repr__(self) -> str: | |
| return f"{self.__class__.__name__}(source_id={self.source_id!r})" | |