Spaces:
Paused
Paused
| """ | |
| Dropbox data source. | |
| This module provides data loading from Dropbox files, | |
| supporting both public share links and authenticated access. | |
| """ | |
| import json | |
| import logging | |
| import re | |
| from typing import Any, Dict, Iterator, List, Optional | |
| from urllib.parse import urlparse, parse_qs | |
| from potato.data_sources.base import DataSource, SourceConfig | |
| logger = logging.getLogger(__name__) | |
| def convert_share_link(url: str) -> str: | |
| """ | |
| Convert a Dropbox share link to a direct download URL. | |
| Args: | |
| url: Dropbox share link | |
| Returns: | |
| Direct download URL | |
| Examples: | |
| https://www.dropbox.com/s/xxx/file.json?dl=0 | |
| -> https://www.dropbox.com/s/xxx/file.json?dl=1 | |
| """ | |
| parsed = urlparse(url) | |
| # Check if it's a Dropbox URL | |
| if 'dropbox.com' not in parsed.netloc: | |
| raise ValueError(f"Not a Dropbox URL: {url}") | |
| # Convert dl=0 to dl=1 for direct download | |
| if 'dl=0' in url: | |
| return url.replace('dl=0', 'dl=1') | |
| elif 'dl=1' in url: | |
| return url | |
| else: | |
| # Add dl=1 parameter | |
| separator = '&' if '?' in url else '?' | |
| return f"{url}{separator}dl=1" | |
| class DropboxSource(DataSource): | |
| """ | |
| Data source for Dropbox files. | |
| Supports both public share links (no authentication required) | |
| and private files with access token authentication. | |
| Configuration for public files: | |
| type: dropbox | |
| url: "https://www.dropbox.com/s/xxx/file.jsonl?dl=0" | |
| Configuration for private files: | |
| type: dropbox | |
| path: "/path/to/file.jsonl" # Path in Dropbox | |
| access_token: "${DROPBOX_TOKEN}" | |
| Supported formats: JSON, JSONL, CSV, TSV | |
| """ | |
| # Check for optional dependencies | |
| _HAS_DROPBOX = None | |
| def _check_dependencies(cls) -> bool: | |
| """Check if Dropbox SDK is available.""" | |
| if cls._HAS_DROPBOX is None: | |
| try: | |
| import dropbox | |
| cls._HAS_DROPBOX = True | |
| except ImportError: | |
| cls._HAS_DROPBOX = False | |
| return cls._HAS_DROPBOX | |
| def __init__(self, config: SourceConfig): | |
| """Initialize the Dropbox source.""" | |
| super().__init__(config) | |
| self._url = config.config.get("url", "") | |
| self._path = config.config.get("path", "") | |
| self._access_token = config.config.get("access_token") | |
| self._cached_data: Optional[List[Dict]] = None | |
| self._client = None | |
| def get_source_id(self) -> str: | |
| """Get unique identifier.""" | |
| return self._source_id | |
| def validate_config(self) -> List[str]: | |
| """Validate source configuration.""" | |
| errors = [] | |
| if not self._url and not self._path: | |
| errors.append( | |
| "Either 'url' or 'path' is required for Dropbox source" | |
| ) | |
| return errors | |
| # If path is provided, token is required | |
| if self._path and not self._access_token: | |
| errors.append( | |
| "'access_token' is required when using 'path' for private files" | |
| ) | |
| # Validate URL format if provided | |
| if self._url: | |
| try: | |
| convert_share_link(self._url) | |
| except ValueError as e: | |
| errors.append(str(e)) | |
| return errors | |
| def is_available(self) -> bool: | |
| """Check if the source is available.""" | |
| # For authenticated access, check dependencies | |
| if self._access_token: | |
| if not self._check_dependencies(): | |
| logger.warning( | |
| "Dropbox SDK not installed. " | |
| "Install with: pip install dropbox" | |
| ) | |
| return False | |
| return True | |
| def _get_client(self): | |
| """Get or create the Dropbox client.""" | |
| if self._client: | |
| return self._client | |
| if not self._access_token: | |
| return None | |
| import dropbox | |
| self._client = dropbox.Dropbox(self._access_token) | |
| return self._client | |
| def _fetch_public_file(self, url: str) -> bytes: | |
| """Fetch a public file using direct download URL.""" | |
| import urllib.request | |
| import urllib.error | |
| download_url = convert_share_link(url) | |
| request = urllib.request.Request(download_url) | |
| request.add_header('User-Agent', 'Potato-Annotation-Tool/1.0') | |
| try: | |
| with urllib.request.urlopen(request, timeout=60) as response: | |
| return response.read() | |
| except urllib.error.HTTPError as e: | |
| if e.code == 404: | |
| raise ValueError("File not found or link has expired") | |
| raise RuntimeError(f"HTTP error {e.code}: {e.reason}") | |
| except urllib.error.URLError as e: | |
| raise RuntimeError(f"URL error: {e.reason}") | |
| def _fetch_authenticated_file(self, path: str) -> bytes: | |
| """Fetch a file using authenticated API access.""" | |
| client = self._get_client() | |
| if not client: | |
| raise RuntimeError("No access token configured") | |
| import dropbox | |
| try: | |
| # Ensure path starts with / | |
| if not path.startswith('/'): | |
| path = '/' + path | |
| metadata, response = client.files_download(path) | |
| logger.debug(f"Downloaded: {metadata.name} ({metadata.size} bytes)") | |
| return response.content | |
| except dropbox.exceptions.ApiError as e: | |
| if e.error.is_path(): | |
| raise ValueError(f"File not found: {path}") | |
| raise RuntimeError(f"Dropbox API error: {e}") | |
| def _fetch_data(self) -> List[Dict[str, Any]]: | |
| """Fetch and parse data from Dropbox.""" | |
| # Fetch the file | |
| if self._url: | |
| content = self._fetch_public_file(self._url) | |
| else: | |
| content = self._fetch_authenticated_file(self._path) | |
| # Decode and parse | |
| text = content.decode('utf-8') | |
| return self._parse_content(text) | |
| def _parse_content(self, text: str) -> List[Dict[str, Any]]: | |
| """Parse file content.""" | |
| # Try JSON array first | |
| try: | |
| data = json.loads(text) | |
| if isinstance(data, list): | |
| return data | |
| elif isinstance(data, dict): | |
| return [data] | |
| except json.JSONDecodeError: | |
| pass | |
| # Try JSONL | |
| items = [] | |
| lines = text.strip().split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| item = json.loads(line) | |
| if isinstance(item, list): | |
| items.extend(item) | |
| else: | |
| items.append(item) | |
| except json.JSONDecodeError: | |
| pass | |
| if items: | |
| return items | |
| # Try CSV | |
| import csv | |
| from io import StringIO | |
| try: | |
| reader = csv.DictReader(StringIO(text)) | |
| items = [dict(row) for row in reader] | |
| if items: | |
| return items | |
| except Exception: | |
| pass | |
| raise ValueError("Could not parse file content as JSON, JSONL, or CSV") | |
| def read_items( | |
| self, | |
| start: int = 0, | |
| count: Optional[int] = None | |
| ) -> Iterator[Dict[str, Any]]: | |
| """Read items from Dropbox file.""" | |
| if self._cached_data is None: | |
| self._cached_data = self._fetch_data() | |
| items = self._cached_data[start:] | |
| if count is not None: | |
| items = items[:count] | |
| yield from items | |
| def get_total_count(self) -> Optional[int]: | |
| """Get total number of items.""" | |
| if self._cached_data is None: | |
| try: | |
| self._cached_data = self._fetch_data() | |
| except Exception as e: | |
| logger.error(f"Error fetching data: {e}") | |
| return None | |
| return len(self._cached_data) | |
| def supports_partial_reading(self) -> bool: | |
| """Partial reading is supported after initial fetch.""" | |
| return True | |
| def refresh(self) -> bool: | |
| """Refresh by clearing cached data.""" | |
| self._cached_data = None | |
| return True | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get source status.""" | |
| status = super().get_status() | |
| status["url"] = self._url | |
| status["path"] = self._path | |
| status["authenticated"] = self._access_token is not None | |
| status["cached"] = self._cached_data is not None | |
| return status | |
| def close(self) -> None: | |
| """Close the source.""" | |
| self._client = None | |
| self._cached_data = None | |