Spaces:
Paused
Paused
File size: 8,688 Bytes
aceb1b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | """
Dropbox data source.
This module provides data loading from Dropbox files,
supporting both public share links and authenticated access.
"""
import json
import logging
import re
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urlparse, parse_qs
from potato.data_sources.base import DataSource, SourceConfig
logger = logging.getLogger(__name__)
def convert_share_link(url: str) -> str:
"""
Convert a Dropbox share link to a direct download URL.
Args:
url: Dropbox share link
Returns:
Direct download URL
Examples:
https://www.dropbox.com/s/xxx/file.json?dl=0
-> https://www.dropbox.com/s/xxx/file.json?dl=1
"""
parsed = urlparse(url)
# Check if it's a Dropbox URL
if 'dropbox.com' not in parsed.netloc:
raise ValueError(f"Not a Dropbox URL: {url}")
# Convert dl=0 to dl=1 for direct download
if 'dl=0' in url:
return url.replace('dl=0', 'dl=1')
elif 'dl=1' in url:
return url
else:
# Add dl=1 parameter
separator = '&' if '?' in url else '?'
return f"{url}{separator}dl=1"
class DropboxSource(DataSource):
"""
Data source for Dropbox files.
Supports both public share links (no authentication required)
and private files with access token authentication.
Configuration for public files:
type: dropbox
url: "https://www.dropbox.com/s/xxx/file.jsonl?dl=0"
Configuration for private files:
type: dropbox
path: "/path/to/file.jsonl" # Path in Dropbox
access_token: "${DROPBOX_TOKEN}"
Supported formats: JSON, JSONL, CSV, TSV
"""
# Check for optional dependencies
_HAS_DROPBOX = None
@classmethod
def _check_dependencies(cls) -> bool:
"""Check if Dropbox SDK is available."""
if cls._HAS_DROPBOX is None:
try:
import dropbox
cls._HAS_DROPBOX = True
except ImportError:
cls._HAS_DROPBOX = False
return cls._HAS_DROPBOX
def __init__(self, config: SourceConfig):
"""Initialize the Dropbox source."""
super().__init__(config)
self._url = config.config.get("url", "")
self._path = config.config.get("path", "")
self._access_token = config.config.get("access_token")
self._cached_data: Optional[List[Dict]] = None
self._client = None
def get_source_id(self) -> str:
"""Get unique identifier."""
return self._source_id
def validate_config(self) -> List[str]:
"""Validate source configuration."""
errors = []
if not self._url and not self._path:
errors.append(
"Either 'url' or 'path' is required for Dropbox source"
)
return errors
# If path is provided, token is required
if self._path and not self._access_token:
errors.append(
"'access_token' is required when using 'path' for private files"
)
# Validate URL format if provided
if self._url:
try:
convert_share_link(self._url)
except ValueError as e:
errors.append(str(e))
return errors
def is_available(self) -> bool:
"""Check if the source is available."""
# For authenticated access, check dependencies
if self._access_token:
if not self._check_dependencies():
logger.warning(
"Dropbox SDK not installed. "
"Install with: pip install dropbox"
)
return False
return True
def _get_client(self):
"""Get or create the Dropbox client."""
if self._client:
return self._client
if not self._access_token:
return None
import dropbox
self._client = dropbox.Dropbox(self._access_token)
return self._client
def _fetch_public_file(self, url: str) -> bytes:
"""Fetch a public file using direct download URL."""
import urllib.request
import urllib.error
download_url = convert_share_link(url)
request = urllib.request.Request(download_url)
request.add_header('User-Agent', 'Potato-Annotation-Tool/1.0')
try:
with urllib.request.urlopen(request, timeout=60) as response:
return response.read()
except urllib.error.HTTPError as e:
if e.code == 404:
raise ValueError("File not found or link has expired")
raise RuntimeError(f"HTTP error {e.code}: {e.reason}")
except urllib.error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}")
def _fetch_authenticated_file(self, path: str) -> bytes:
"""Fetch a file using authenticated API access."""
client = self._get_client()
if not client:
raise RuntimeError("No access token configured")
import dropbox
try:
# Ensure path starts with /
if not path.startswith('/'):
path = '/' + path
metadata, response = client.files_download(path)
logger.debug(f"Downloaded: {metadata.name} ({metadata.size} bytes)")
return response.content
except dropbox.exceptions.ApiError as e:
if e.error.is_path():
raise ValueError(f"File not found: {path}")
raise RuntimeError(f"Dropbox API error: {e}")
def _fetch_data(self) -> List[Dict[str, Any]]:
"""Fetch and parse data from Dropbox."""
# Fetch the file
if self._url:
content = self._fetch_public_file(self._url)
else:
content = self._fetch_authenticated_file(self._path)
# Decode and parse
text = content.decode('utf-8')
return self._parse_content(text)
def _parse_content(self, text: str) -> List[Dict[str, Any]]:
"""Parse file content."""
# Try JSON array first
try:
data = json.loads(text)
if isinstance(data, list):
return data
elif isinstance(data, dict):
return [data]
except json.JSONDecodeError:
pass
# Try JSONL
items = []
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
if isinstance(item, list):
items.extend(item)
else:
items.append(item)
except json.JSONDecodeError:
pass
if items:
return items
# Try CSV
import csv
from io import StringIO
try:
reader = csv.DictReader(StringIO(text))
items = [dict(row) for row in reader]
if items:
return items
except Exception:
pass
raise ValueError("Could not parse file content as JSON, JSONL, or CSV")
def read_items(
self,
start: int = 0,
count: Optional[int] = None
) -> Iterator[Dict[str, Any]]:
"""Read items from Dropbox file."""
if self._cached_data is None:
self._cached_data = self._fetch_data()
items = self._cached_data[start:]
if count is not None:
items = items[:count]
yield from items
def get_total_count(self) -> Optional[int]:
"""Get total number of items."""
if self._cached_data is None:
try:
self._cached_data = self._fetch_data()
except Exception as e:
logger.error(f"Error fetching data: {e}")
return None
return len(self._cached_data)
def supports_partial_reading(self) -> bool:
"""Partial reading is supported after initial fetch."""
return True
def refresh(self) -> bool:
"""Refresh by clearing cached data."""
self._cached_data = None
return True
def get_status(self) -> Dict[str, Any]:
"""Get source status."""
status = super().get_status()
status["url"] = self._url
status["path"] = self._path
status["authenticated"] = self._access_token is not None
status["cached"] = self._cached_data is not None
return status
def close(self) -> None:
"""Close the source."""
self._client = None
self._cached_data = None
|