""" Dataverse API Client Production-ready client for Harvard Dataverse following IQSS best practices. Based on official API documentation: https://guides.dataverse.org/en/latest/api/index.html Features: - API token authentication - Rate limiting with exponential backoff - Checksum verification - Version-aware caching - Comprehensive error handling - Pagination support - Retry logic Source: https://github.com/IQSS/dataverse """ import sys from pathlib import Path import hashlib import asyncio from typing import Optional, Dict, Any, List from datetime import datetime, timedelta from loguru import logger import json # Add project root to path project_root = Path(__file__).parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) try: import httpx except ImportError: logger.error("httpx required. Install with: pip install httpx") httpx = None from config import settings class DataverseAPIError(Exception): """Custom exception for Dataverse API errors.""" pass class DataverseClient: """ Official Dataverse API client following IQSS best practices. Usage: client = DataverseClient(api_key="your-key") metadata = await client.get_dataset_metadata("doi:10.7910/DVN/NJTBEM") result = await client.download_dataset("doi:10.7910/DVN/NJTBEM") """ # API endpoints DATASET_ENDPOINT = "/api/datasets/:persistentId/" FILE_DOWNLOAD_ENDPOINT = "/api/access/datafile/{file_id}" SEARCH_ENDPOINT = "/api/search" # Rate limiting (requests per minute) DEFAULT_RATE_LIMIT = 100 RATE_LIMIT_PERIOD = 60 # seconds def __init__( self, base_url: str = "https://dataverse.harvard.edu", api_key: Optional[str] = None, timeout: int = 120, max_retries: int = 3, cache_enabled: bool = True ): """ Initialize Dataverse client. Args: base_url: Dataverse instance URL (default: Harvard Dataverse) api_key: API token for authentication (optional but recommended) timeout: Request timeout in seconds max_retries: Maximum retry attempts for failed requests cache_enabled: Enable version-aware file caching """ if not httpx: raise ImportError("httpx required. Install with: pip install httpx") self.base_url = base_url.rstrip("/") self.api_key = api_key or settings.dataverse_api_key self.timeout = timeout self.max_retries = max_retries self.cache_enabled = cache_enabled # Cache directory self.cache_dir = Path("data/cache/dataverse") self.cache_dir.mkdir(parents=True, exist_ok=True) # Metadata cache self.metadata_cache_dir = self.cache_dir / "metadata" self.metadata_cache_dir.mkdir(parents=True, exist_ok=True) # Rate limiting state self._request_times: List[datetime] = [] if self.api_key: logger.info("Dataverse client initialized with API key") else: logger.warning("Dataverse client initialized without API key (rate limits may apply)") def _get_headers(self) -> Dict[str, str]: """ Get HTTP headers for API requests. Returns: Headers dictionary with API key if available """ headers = { "Content-Type": "application/json", "User-Agent": "OralHealthPolicyPulse/1.0 (Civic Tech Research)" } if self.api_key: headers["X-Dataverse-key"] = self.api_key return headers async def _rate_limit_wait(self): """ Implement client-side rate limiting. Enforces maximum requests per minute to avoid 429 errors. """ now = datetime.now() # Remove requests older than the rate limit period self._request_times = [ t for t in self._request_times if (now - t).total_seconds() < self.RATE_LIMIT_PERIOD ] # Check if we've hit the limit if len(self._request_times) >= self.DEFAULT_RATE_LIMIT: oldest = min(self._request_times) wait_time = self.RATE_LIMIT_PERIOD - (now - oldest).total_seconds() if wait_time > 0: logger.warning(f"Rate limit reached. Waiting {wait_time:.1f}s...") await asyncio.sleep(wait_time) # Record this request self._request_times.append(now) async def _request_with_retry( self, method: str, url: str, **kwargs ) -> httpx.Response: """ Make HTTP request with retry logic and exponential backoff. Args: method: HTTP method (GET, POST, etc.) url: Full URL to request **kwargs: Additional arguments for httpx.request() Returns: HTTP response Raises: DataverseAPIError: If all retry attempts fail """ await self._rate_limit_wait() async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client: for attempt in range(self.max_retries): try: response = await client.request(method, url, **kwargs) # Handle specific status codes if response.status_code == 200: return response elif response.status_code == 401: raise DataverseAPIError( "Unauthorized: API key required or invalid. " "Sign up at https://dataverse.harvard.edu/loginpage.xhtml" ) elif response.status_code == 404: raise DataverseAPIError(f"Not found: {url}") elif response.status_code == 429: # Rate limited by server retry_after = int(response.headers.get("Retry-After", 60)) logger.warning(f"Server rate limit hit. Retrying after {retry_after}s") await asyncio.sleep(retry_after) continue elif response.status_code >= 500: # Server error - retry with backoff if attempt < self.max_retries - 1: wait_time = 2 ** attempt logger.warning(f"Server error {response.status_code}. Retrying in {wait_time}s...") await asyncio.sleep(wait_time) continue else: raise DataverseAPIError(f"Server error: HTTP {response.status_code}") else: raise DataverseAPIError( f"API error: HTTP {response.status_code} - {response.text}" ) except httpx.TimeoutException: if attempt < self.max_retries - 1: wait_time = 2 ** attempt logger.warning(f"Request timeout. Retrying in {wait_time}s...") await asyncio.sleep(wait_time) continue else: raise DataverseAPIError("Request timed out after all retry attempts") except Exception as e: if attempt < self.max_retries - 1: wait_time = 2 ** attempt logger.warning(f"Request failed: {e}. Retrying in {wait_time}s...") await asyncio.sleep(wait_time) continue else: raise DataverseAPIError(f"Request failed: {e}") raise DataverseAPIError("All retry attempts exhausted") def _get_cached_metadata_path(self, persistent_id: str, version: str) -> Path: """Get path to cached metadata file.""" safe_id = persistent_id.replace(":", "_").replace("/", "_") return self.metadata_cache_dir / f"{safe_id}_{version}.json" async def get_dataset_metadata( self, persistent_id: str, version: str = ":latest", use_cache: bool = True ) -> Optional[Dict[str, Any]]: """ Get dataset metadata from Dataverse. Args: persistent_id: DOI or handle (e.g., "doi:10.7910/DVN/NJTBEM") version: Dataset version (":latest", ":draft", or specific version number) use_cache: Use cached metadata if available (for :latest version only) Returns: Dataset metadata dictionary or None if not found Example: metadata = await client.get_dataset_metadata("doi:10.7910/DVN/NJTBEM") files = metadata["data"]["latestVersion"]["files"] """ # Check cache if use_cache and self.cache_enabled and version == ":latest": cache_file = self._get_cached_metadata_path(persistent_id, version) if cache_file.exists(): # Check if cache is recent (less than 1 day old) cache_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime) if cache_age < timedelta(days=1): logger.info(f"Using cached metadata (age: {cache_age.total_seconds() / 3600:.1f}h)") with open(cache_file, 'r') as f: return json.load(f) # Fetch from API url = f"{self.base_url}{self.DATASET_ENDPOINT}" params = { "persistentId": persistent_id, } # Add version if not :latest if version != ":latest": params["version"] = version logger.info(f"Fetching metadata for {persistent_id} (version: {version})") try: response = await self._request_with_retry( "GET", url, params=params, headers=self._get_headers() ) metadata = response.json() # Cache the metadata if self.cache_enabled and version == ":latest": cache_file = self._get_cached_metadata_path(persistent_id, version) with open(cache_file, 'w') as f: json.dump(metadata, f, indent=2) logger.debug(f"Cached metadata to {cache_file}") return metadata except DataverseAPIError as e: logger.error(f"Failed to fetch metadata: {e}") return None def _verify_checksum(self, content: bytes, expected_md5: Optional[str]) -> bool: """ Verify file checksum. Args: content: File content bytes expected_md5: Expected MD5 checksum Returns: True if checksum matches or no checksum provided """ if not expected_md5: logger.warning("No checksum provided - skipping verification") return True actual_md5 = hashlib.md5(content).hexdigest() if actual_md5.lower() == expected_md5.lower(): logger.debug(f"✓ Checksum verified: {actual_md5}") return True else: logger.error(f"✗ Checksum mismatch! Expected: {expected_md5}, Got: {actual_md5}") return False async def download_file( self, file_id: int, output_path: Path, expected_checksum: Optional[str] = None, verify_checksum: bool = True ) -> bool: """ Download a file from Dataverse with checksum verification. Args: file_id: Dataverse file ID output_path: Where to save the file expected_checksum: Expected MD5 checksum (if known) verify_checksum: Whether to verify checksum Returns: True if download successful and checksum valid Example: success = await client.download_file( file_id=123456, output_path=Path("data/municipalities.csv"), expected_checksum="abc123..." ) """ url = f"{self.base_url}{self.FILE_DOWNLOAD_ENDPOINT.format(file_id=file_id)}" logger.info(f"Downloading file {file_id} to {output_path.name}") try: response = await self._request_with_retry( "GET", url, headers=self._get_headers() ) # Verify checksum if requested if verify_checksum and expected_checksum: if not self._verify_checksum(response.content, expected_checksum): logger.error("Checksum verification failed - file may be corrupted") return False # Save file output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) file_size_mb = len(response.content) / (1024 * 1024) logger.success(f"✓ Downloaded {output_path.name} ({file_size_mb:.2f} MB)") return True except DataverseAPIError as e: logger.error(f"Download failed: {e}") return False async def download_dataset( self, persistent_id: str, output_dir: Optional[Path] = None, file_types: Optional[List[str]] = None, verify_checksums: bool = True ) -> Dict[str, Any]: """ Download all files (or filtered subset) from a dataset. Args: persistent_id: Dataset DOI (e.g., "doi:10.7910/DVN/NJTBEM") output_dir: Where to save files (defaults to cache_dir/dataset_name) file_types: List of file extensions to download (e.g., [".csv", ".tab"]) If None, downloads all files verify_checksums: Whether to verify MD5 checksums Returns: Summary dictionary with download statistics Example: result = await client.download_dataset( "doi:10.7910/DVN/NJTBEM", file_types=[".csv", ".tab"] ) print(f"Downloaded {result['downloaded']} files to {result['output_dir']}") """ # Set output directory if output_dir is None: safe_id = persistent_id.replace(":", "_").replace("/", "_") output_dir = self.cache_dir / safe_id output_dir.mkdir(parents=True, exist_ok=True) # Get metadata logger.info(f"Fetching dataset metadata for {persistent_id}") metadata = await self.get_dataset_metadata(persistent_id) if not metadata: return { "status": "error", "message": "Failed to fetch dataset metadata", "downloaded": 0, "failed": 0, "files": [] } # Extract file list try: files = metadata["data"]["latestVersion"]["files"] logger.info(f"Found {len(files)} files in dataset") except KeyError: logger.error("Invalid metadata structure - cannot find files list") return { "status": "error", "message": "Invalid metadata structure", "downloaded": 0, "failed": 0, "files": [] } # Filter by file type if specified if file_types: original_count = len(files) files = [ f for f in files if any(f["dataFile"]["filename"].lower().endswith(ext.lower()) for ext in file_types) ] logger.info(f"Filtered to {len(files)} files matching {file_types} (from {original_count} total)") # Download each file downloaded = [] failed = [] for i, file_info in enumerate(files, 1): try: file_id = file_info["dataFile"]["id"] filename = file_info["dataFile"]["filename"] checksum = file_info["dataFile"].get("md5") output_path = output_dir / filename logger.info(f"[{i}/{len(files)}] Downloading {filename}...") success = await self.download_file( file_id, output_path, expected_checksum=checksum, verify_checksum=verify_checksums ) if success: downloaded.append(str(output_path)) else: failed.append(filename) except Exception as e: logger.error(f"Error downloading {filename}: {e}") failed.append(filename) # Summary status = "success" if not failed else ("partial" if downloaded else "error") logger.info("") logger.info("=" * 60) if status == "success": logger.success(f"✓ Successfully downloaded all {len(downloaded)} files") elif status == "partial": logger.warning(f"⚠ Downloaded {len(downloaded)} files, {len(failed)} failed") else: logger.error(f"✗ All downloads failed") logger.info("=" * 60) return { "status": status, "downloaded": len(downloaded), "failed": len(failed), "failed_files": failed, "files": downloaded, "output_dir": str(output_dir) } async def search_datasets( self, query: str, type: str = "dataset", per_page: int = 10, start: int = 0 ) -> Dict[str, Any]: """ Search for datasets in Dataverse. Args: query: Search query string type: Type of results ("dataset", "datafile", "all") per_page: Number of results per page start: Starting offset for pagination Returns: Search results dictionary Example: results = await client.search_datasets("municipal meetings") for item in results["data"]["items"]: print(item["name"], item["global_id"]) """ url = f"{self.base_url}{self.SEARCH_ENDPOINT}" params = { "q": query, "type": type, "per_page": per_page, "start": start } try: response = await self._request_with_retry( "GET", url, params=params, headers=self._get_headers() ) return response.json() except DataverseAPIError as e: logger.error(f"Search failed: {e}") return {"status": "error", "message": str(e)} # Convenience functions for common operations async def download_localview_dataset( api_key: Optional[str] = None, output_dir: Optional[Path] = None ) -> Dict[str, Any]: """ Download the LocalView dataset from Harvard Dataverse. This is the largest known database of municipal meeting videos. Args: api_key: Optional Dataverse API key (recommended) output_dir: Where to save files (defaults to data/cache/dataverse/localview) Returns: Download summary dictionary Example: result = await download_localview_dataset() print(f"Downloaded {result['downloaded']} files") """ client = DataverseClient(api_key=api_key) logger.info("=" * 60) logger.info("LocalView Dataset Download") logger.info("=" * 60) result = await client.download_dataset( persistent_id="doi:10.7910/DVN/NJTBEM", output_dir=output_dir or Path("data/cache/localview"), file_types=[".csv", ".tab", ".tsv"] # Only download data files ) return result # CLI for testing async def main(): """Test the Dataverse client.""" import argparse parser = argparse.ArgumentParser(description="Dataverse API Client") parser.add_argument("--api-key", help="Dataverse API key") parser.add_argument("--dataset", default="doi:10.7910/DVN/NJTBEM", help="Dataset DOI") parser.add_argument("--output", help="Output directory") parser.add_argument("--metadata-only", action="store_true", help="Only fetch metadata") args = parser.parse_args() client = DataverseClient(api_key=args.api_key) if args.metadata_only: # Just fetch metadata metadata = await client.get_dataset_metadata(args.dataset) if metadata: print(json.dumps(metadata, indent=2)) else: # Download full dataset output_dir = Path(args.output) if args.output else None result = await client.download_dataset(args.dataset, output_dir) print("\nDownload Summary:") print(f"Status: {result['status']}") print(f"Downloaded: {result['downloaded']} files") print(f"Failed: {result['failed']} files") print(f"Output: {result['output_dir']}") if __name__ == "__main__": asyncio.run(main())