codebook / potato /data_sources /sources /dropbox_source.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
8.69 kB
"""
Dropbox data source.
This module provides data loading from Dropbox files,
supporting both public share links and authenticated access.
"""
import json
import logging
import re
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urlparse, parse_qs
from potato.data_sources.base import DataSource, SourceConfig
logger = logging.getLogger(__name__)
def convert_share_link(url: str) -> str:
"""
Convert a Dropbox share link to a direct download URL.
Args:
url: Dropbox share link
Returns:
Direct download URL
Examples:
https://www.dropbox.com/s/xxx/file.json?dl=0
-> https://www.dropbox.com/s/xxx/file.json?dl=1
"""
parsed = urlparse(url)
# Check if it's a Dropbox URL
if 'dropbox.com' not in parsed.netloc:
raise ValueError(f"Not a Dropbox URL: {url}")
# Convert dl=0 to dl=1 for direct download
if 'dl=0' in url:
return url.replace('dl=0', 'dl=1')
elif 'dl=1' in url:
return url
else:
# Add dl=1 parameter
separator = '&' if '?' in url else '?'
return f"{url}{separator}dl=1"
class DropboxSource(DataSource):
"""
Data source for Dropbox files.
Supports both public share links (no authentication required)
and private files with access token authentication.
Configuration for public files:
type: dropbox
url: "https://www.dropbox.com/s/xxx/file.jsonl?dl=0"
Configuration for private files:
type: dropbox
path: "/path/to/file.jsonl" # Path in Dropbox
access_token: "${DROPBOX_TOKEN}"
Supported formats: JSON, JSONL, CSV, TSV
"""
# Check for optional dependencies
_HAS_DROPBOX = None
@classmethod
def _check_dependencies(cls) -> bool:
"""Check if Dropbox SDK is available."""
if cls._HAS_DROPBOX is None:
try:
import dropbox
cls._HAS_DROPBOX = True
except ImportError:
cls._HAS_DROPBOX = False
return cls._HAS_DROPBOX
def __init__(self, config: SourceConfig):
"""Initialize the Dropbox source."""
super().__init__(config)
self._url = config.config.get("url", "")
self._path = config.config.get("path", "")
self._access_token = config.config.get("access_token")
self._cached_data: Optional[List[Dict]] = None
self._client = None
def get_source_id(self) -> str:
"""Get unique identifier."""
return self._source_id
def validate_config(self) -> List[str]:
"""Validate source configuration."""
errors = []
if not self._url and not self._path:
errors.append(
"Either 'url' or 'path' is required for Dropbox source"
)
return errors
# If path is provided, token is required
if self._path and not self._access_token:
errors.append(
"'access_token' is required when using 'path' for private files"
)
# Validate URL format if provided
if self._url:
try:
convert_share_link(self._url)
except ValueError as e:
errors.append(str(e))
return errors
def is_available(self) -> bool:
"""Check if the source is available."""
# For authenticated access, check dependencies
if self._access_token:
if not self._check_dependencies():
logger.warning(
"Dropbox SDK not installed. "
"Install with: pip install dropbox"
)
return False
return True
def _get_client(self):
"""Get or create the Dropbox client."""
if self._client:
return self._client
if not self._access_token:
return None
import dropbox
self._client = dropbox.Dropbox(self._access_token)
return self._client
def _fetch_public_file(self, url: str) -> bytes:
"""Fetch a public file using direct download URL."""
import urllib.request
import urllib.error
download_url = convert_share_link(url)
request = urllib.request.Request(download_url)
request.add_header('User-Agent', 'Potato-Annotation-Tool/1.0')
try:
with urllib.request.urlopen(request, timeout=60) as response:
return response.read()
except urllib.error.HTTPError as e:
if e.code == 404:
raise ValueError("File not found or link has expired")
raise RuntimeError(f"HTTP error {e.code}: {e.reason}")
except urllib.error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}")
def _fetch_authenticated_file(self, path: str) -> bytes:
"""Fetch a file using authenticated API access."""
client = self._get_client()
if not client:
raise RuntimeError("No access token configured")
import dropbox
try:
# Ensure path starts with /
if not path.startswith('/'):
path = '/' + path
metadata, response = client.files_download(path)
logger.debug(f"Downloaded: {metadata.name} ({metadata.size} bytes)")
return response.content
except dropbox.exceptions.ApiError as e:
if e.error.is_path():
raise ValueError(f"File not found: {path}")
raise RuntimeError(f"Dropbox API error: {e}")
def _fetch_data(self) -> List[Dict[str, Any]]:
"""Fetch and parse data from Dropbox."""
# Fetch the file
if self._url:
content = self._fetch_public_file(self._url)
else:
content = self._fetch_authenticated_file(self._path)
# Decode and parse
text = content.decode('utf-8')
return self._parse_content(text)
def _parse_content(self, text: str) -> List[Dict[str, Any]]:
"""Parse file content."""
# Try JSON array first
try:
data = json.loads(text)
if isinstance(data, list):
return data
elif isinstance(data, dict):
return [data]
except json.JSONDecodeError:
pass
# Try JSONL
items = []
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
if isinstance(item, list):
items.extend(item)
else:
items.append(item)
except json.JSONDecodeError:
pass
if items:
return items
# Try CSV
import csv
from io import StringIO
try:
reader = csv.DictReader(StringIO(text))
items = [dict(row) for row in reader]
if items:
return items
except Exception:
pass
raise ValueError("Could not parse file content as JSON, JSONL, or CSV")
def read_items(
self,
start: int = 0,
count: Optional[int] = None
) -> Iterator[Dict[str, Any]]:
"""Read items from Dropbox file."""
if self._cached_data is None:
self._cached_data = self._fetch_data()
items = self._cached_data[start:]
if count is not None:
items = items[:count]
yield from items
def get_total_count(self) -> Optional[int]:
"""Get total number of items."""
if self._cached_data is None:
try:
self._cached_data = self._fetch_data()
except Exception as e:
logger.error(f"Error fetching data: {e}")
return None
return len(self._cached_data)
def supports_partial_reading(self) -> bool:
"""Partial reading is supported after initial fetch."""
return True
def refresh(self) -> bool:
"""Refresh by clearing cached data."""
self._cached_data = None
return True
def get_status(self) -> Dict[str, Any]:
"""Get source status."""
status = super().get_status()
status["url"] = self._url
status["path"] = self._path
status["authenticated"] = self._access_token is not None
status["cached"] = self._cached_data is not None
return status
def close(self) -> None:
"""Close the source."""
self._client = None
self._cached_data = None