open-navigator / discovery /dataverse_client.py
jcbowyer's picture
Clean HuggingFace deployment without binary files
61d29fc
"""
Dataverse API Client
Production-ready client for Harvard Dataverse following IQSS best practices.
Based on official API documentation: https://guides.dataverse.org/en/latest/api/index.html
Features:
- API token authentication
- Rate limiting with exponential backoff
- Checksum verification
- Version-aware caching
- Comprehensive error handling
- Pagination support
- Retry logic
Source: https://github.com/IQSS/dataverse
"""
import sys
from pathlib import Path
import hashlib
import asyncio
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from loguru import logger
import json
# Add project root to path
project_root = Path(__file__).parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
try:
import httpx
except ImportError:
logger.error("httpx required. Install with: pip install httpx")
httpx = None
from config import settings
class DataverseAPIError(Exception):
"""Custom exception for Dataverse API errors."""
pass
class DataverseClient:
"""
Official Dataverse API client following IQSS best practices.
Usage:
client = DataverseClient(api_key="your-key")
metadata = await client.get_dataset_metadata("doi:10.7910/DVN/NJTBEM")
result = await client.download_dataset("doi:10.7910/DVN/NJTBEM")
"""
# API endpoints
DATASET_ENDPOINT = "/api/datasets/:persistentId/"
FILE_DOWNLOAD_ENDPOINT = "/api/access/datafile/{file_id}"
SEARCH_ENDPOINT = "/api/search"
# Rate limiting (requests per minute)
DEFAULT_RATE_LIMIT = 100
RATE_LIMIT_PERIOD = 60 # seconds
def __init__(
self,
base_url: str = "https://dataverse.harvard.edu",
api_key: Optional[str] = None,
timeout: int = 120,
max_retries: int = 3,
cache_enabled: bool = True
):
"""
Initialize Dataverse client.
Args:
base_url: Dataverse instance URL (default: Harvard Dataverse)
api_key: API token for authentication (optional but recommended)
timeout: Request timeout in seconds
max_retries: Maximum retry attempts for failed requests
cache_enabled: Enable version-aware file caching
"""
if not httpx:
raise ImportError("httpx required. Install with: pip install httpx")
self.base_url = base_url.rstrip("/")
self.api_key = api_key or settings.dataverse_api_key
self.timeout = timeout
self.max_retries = max_retries
self.cache_enabled = cache_enabled
# Cache directory
self.cache_dir = Path("data/cache/dataverse")
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Metadata cache
self.metadata_cache_dir = self.cache_dir / "metadata"
self.metadata_cache_dir.mkdir(parents=True, exist_ok=True)
# Rate limiting state
self._request_times: List[datetime] = []
if self.api_key:
logger.info("Dataverse client initialized with API key")
else:
logger.warning("Dataverse client initialized without API key (rate limits may apply)")
def _get_headers(self) -> Dict[str, str]:
"""
Get HTTP headers for API requests.
Returns:
Headers dictionary with API key if available
"""
headers = {
"Content-Type": "application/json",
"User-Agent": "OralHealthPolicyPulse/1.0 (Civic Tech Research)"
}
if self.api_key:
headers["X-Dataverse-key"] = self.api_key
return headers
async def _rate_limit_wait(self):
"""
Implement client-side rate limiting.
Enforces maximum requests per minute to avoid 429 errors.
"""
now = datetime.now()
# Remove requests older than the rate limit period
self._request_times = [
t for t in self._request_times
if (now - t).total_seconds() < self.RATE_LIMIT_PERIOD
]
# Check if we've hit the limit
if len(self._request_times) >= self.DEFAULT_RATE_LIMIT:
oldest = min(self._request_times)
wait_time = self.RATE_LIMIT_PERIOD - (now - oldest).total_seconds()
if wait_time > 0:
logger.warning(f"Rate limit reached. Waiting {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
# Record this request
self._request_times.append(now)
async def _request_with_retry(
self,
method: str,
url: str,
**kwargs
) -> httpx.Response:
"""
Make HTTP request with retry logic and exponential backoff.
Args:
method: HTTP method (GET, POST, etc.)
url: Full URL to request
**kwargs: Additional arguments for httpx.request()
Returns:
HTTP response
Raises:
DataverseAPIError: If all retry attempts fail
"""
await self._rate_limit_wait()
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client:
for attempt in range(self.max_retries):
try:
response = await client.request(method, url, **kwargs)
# Handle specific status codes
if response.status_code == 200:
return response
elif response.status_code == 401:
raise DataverseAPIError(
"Unauthorized: API key required or invalid. "
"Sign up at https://dataverse.harvard.edu/loginpage.xhtml"
)
elif response.status_code == 404:
raise DataverseAPIError(f"Not found: {url}")
elif response.status_code == 429:
# Rate limited by server
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"Server rate limit hit. Retrying after {retry_after}s")
await asyncio.sleep(retry_after)
continue
elif response.status_code >= 500:
# Server error - retry with backoff
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Server error {response.status_code}. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
raise DataverseAPIError(f"Server error: HTTP {response.status_code}")
else:
raise DataverseAPIError(
f"API error: HTTP {response.status_code} - {response.text}"
)
except httpx.TimeoutException:
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Request timeout. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
raise DataverseAPIError("Request timed out after all retry attempts")
except Exception as e:
if attempt < self.max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Request failed: {e}. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
raise DataverseAPIError(f"Request failed: {e}")
raise DataverseAPIError("All retry attempts exhausted")
def _get_cached_metadata_path(self, persistent_id: str, version: str) -> Path:
"""Get path to cached metadata file."""
safe_id = persistent_id.replace(":", "_").replace("/", "_")
return self.metadata_cache_dir / f"{safe_id}_{version}.json"
async def get_dataset_metadata(
self,
persistent_id: str,
version: str = ":latest",
use_cache: bool = True
) -> Optional[Dict[str, Any]]:
"""
Get dataset metadata from Dataverse.
Args:
persistent_id: DOI or handle (e.g., "doi:10.7910/DVN/NJTBEM")
version: Dataset version (":latest", ":draft", or specific version number)
use_cache: Use cached metadata if available (for :latest version only)
Returns:
Dataset metadata dictionary or None if not found
Example:
metadata = await client.get_dataset_metadata("doi:10.7910/DVN/NJTBEM")
files = metadata["data"]["latestVersion"]["files"]
"""
# Check cache
if use_cache and self.cache_enabled and version == ":latest":
cache_file = self._get_cached_metadata_path(persistent_id, version)
if cache_file.exists():
# Check if cache is recent (less than 1 day old)
cache_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)
if cache_age < timedelta(days=1):
logger.info(f"Using cached metadata (age: {cache_age.total_seconds() / 3600:.1f}h)")
with open(cache_file, 'r') as f:
return json.load(f)
# Fetch from API
url = f"{self.base_url}{self.DATASET_ENDPOINT}"
params = {
"persistentId": persistent_id,
}
# Add version if not :latest
if version != ":latest":
params["version"] = version
logger.info(f"Fetching metadata for {persistent_id} (version: {version})")
try:
response = await self._request_with_retry(
"GET",
url,
params=params,
headers=self._get_headers()
)
metadata = response.json()
# Cache the metadata
if self.cache_enabled and version == ":latest":
cache_file = self._get_cached_metadata_path(persistent_id, version)
with open(cache_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.debug(f"Cached metadata to {cache_file}")
return metadata
except DataverseAPIError as e:
logger.error(f"Failed to fetch metadata: {e}")
return None
def _verify_checksum(self, content: bytes, expected_md5: Optional[str]) -> bool:
"""
Verify file checksum.
Args:
content: File content bytes
expected_md5: Expected MD5 checksum
Returns:
True if checksum matches or no checksum provided
"""
if not expected_md5:
logger.warning("No checksum provided - skipping verification")
return True
actual_md5 = hashlib.md5(content).hexdigest()
if actual_md5.lower() == expected_md5.lower():
logger.debug(f"✓ Checksum verified: {actual_md5}")
return True
else:
logger.error(f"✗ Checksum mismatch! Expected: {expected_md5}, Got: {actual_md5}")
return False
async def download_file(
self,
file_id: int,
output_path: Path,
expected_checksum: Optional[str] = None,
verify_checksum: bool = True
) -> bool:
"""
Download a file from Dataverse with checksum verification.
Args:
file_id: Dataverse file ID
output_path: Where to save the file
expected_checksum: Expected MD5 checksum (if known)
verify_checksum: Whether to verify checksum
Returns:
True if download successful and checksum valid
Example:
success = await client.download_file(
file_id=123456,
output_path=Path("data/municipalities.csv"),
expected_checksum="abc123..."
)
"""
url = f"{self.base_url}{self.FILE_DOWNLOAD_ENDPOINT.format(file_id=file_id)}"
logger.info(f"Downloading file {file_id} to {output_path.name}")
try:
response = await self._request_with_retry(
"GET",
url,
headers=self._get_headers()
)
# Verify checksum if requested
if verify_checksum and expected_checksum:
if not self._verify_checksum(response.content, expected_checksum):
logger.error("Checksum verification failed - file may be corrupted")
return False
# Save file
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content)
file_size_mb = len(response.content) / (1024 * 1024)
logger.success(f"✓ Downloaded {output_path.name} ({file_size_mb:.2f} MB)")
return True
except DataverseAPIError as e:
logger.error(f"Download failed: {e}")
return False
async def download_dataset(
self,
persistent_id: str,
output_dir: Optional[Path] = None,
file_types: Optional[List[str]] = None,
verify_checksums: bool = True
) -> Dict[str, Any]:
"""
Download all files (or filtered subset) from a dataset.
Args:
persistent_id: Dataset DOI (e.g., "doi:10.7910/DVN/NJTBEM")
output_dir: Where to save files (defaults to cache_dir/dataset_name)
file_types: List of file extensions to download (e.g., [".csv", ".tab"])
If None, downloads all files
verify_checksums: Whether to verify MD5 checksums
Returns:
Summary dictionary with download statistics
Example:
result = await client.download_dataset(
"doi:10.7910/DVN/NJTBEM",
file_types=[".csv", ".tab"]
)
print(f"Downloaded {result['downloaded']} files to {result['output_dir']}")
"""
# Set output directory
if output_dir is None:
safe_id = persistent_id.replace(":", "_").replace("/", "_")
output_dir = self.cache_dir / safe_id
output_dir.mkdir(parents=True, exist_ok=True)
# Get metadata
logger.info(f"Fetching dataset metadata for {persistent_id}")
metadata = await self.get_dataset_metadata(persistent_id)
if not metadata:
return {
"status": "error",
"message": "Failed to fetch dataset metadata",
"downloaded": 0,
"failed": 0,
"files": []
}
# Extract file list
try:
files = metadata["data"]["latestVersion"]["files"]
logger.info(f"Found {len(files)} files in dataset")
except KeyError:
logger.error("Invalid metadata structure - cannot find files list")
return {
"status": "error",
"message": "Invalid metadata structure",
"downloaded": 0,
"failed": 0,
"files": []
}
# Filter by file type if specified
if file_types:
original_count = len(files)
files = [
f for f in files
if any(f["dataFile"]["filename"].lower().endswith(ext.lower()) for ext in file_types)
]
logger.info(f"Filtered to {len(files)} files matching {file_types} (from {original_count} total)")
# Download each file
downloaded = []
failed = []
for i, file_info in enumerate(files, 1):
try:
file_id = file_info["dataFile"]["id"]
filename = file_info["dataFile"]["filename"]
checksum = file_info["dataFile"].get("md5")
output_path = output_dir / filename
logger.info(f"[{i}/{len(files)}] Downloading {filename}...")
success = await self.download_file(
file_id,
output_path,
expected_checksum=checksum,
verify_checksum=verify_checksums
)
if success:
downloaded.append(str(output_path))
else:
failed.append(filename)
except Exception as e:
logger.error(f"Error downloading {filename}: {e}")
failed.append(filename)
# Summary
status = "success" if not failed else ("partial" if downloaded else "error")
logger.info("")
logger.info("=" * 60)
if status == "success":
logger.success(f"✓ Successfully downloaded all {len(downloaded)} files")
elif status == "partial":
logger.warning(f"⚠ Downloaded {len(downloaded)} files, {len(failed)} failed")
else:
logger.error(f"✗ All downloads failed")
logger.info("=" * 60)
return {
"status": status,
"downloaded": len(downloaded),
"failed": len(failed),
"failed_files": failed,
"files": downloaded,
"output_dir": str(output_dir)
}
async def search_datasets(
self,
query: str,
type: str = "dataset",
per_page: int = 10,
start: int = 0
) -> Dict[str, Any]:
"""
Search for datasets in Dataverse.
Args:
query: Search query string
type: Type of results ("dataset", "datafile", "all")
per_page: Number of results per page
start: Starting offset for pagination
Returns:
Search results dictionary
Example:
results = await client.search_datasets("municipal meetings")
for item in results["data"]["items"]:
print(item["name"], item["global_id"])
"""
url = f"{self.base_url}{self.SEARCH_ENDPOINT}"
params = {
"q": query,
"type": type,
"per_page": per_page,
"start": start
}
try:
response = await self._request_with_retry(
"GET",
url,
params=params,
headers=self._get_headers()
)
return response.json()
except DataverseAPIError as e:
logger.error(f"Search failed: {e}")
return {"status": "error", "message": str(e)}
# Convenience functions for common operations
async def download_localview_dataset(
api_key: Optional[str] = None,
output_dir: Optional[Path] = None
) -> Dict[str, Any]:
"""
Download the LocalView dataset from Harvard Dataverse.
This is the largest known database of municipal meeting videos.
Args:
api_key: Optional Dataverse API key (recommended)
output_dir: Where to save files (defaults to data/cache/dataverse/localview)
Returns:
Download summary dictionary
Example:
result = await download_localview_dataset()
print(f"Downloaded {result['downloaded']} files")
"""
client = DataverseClient(api_key=api_key)
logger.info("=" * 60)
logger.info("LocalView Dataset Download")
logger.info("=" * 60)
result = await client.download_dataset(
persistent_id="doi:10.7910/DVN/NJTBEM",
output_dir=output_dir or Path("data/cache/localview"),
file_types=[".csv", ".tab", ".tsv"] # Only download data files
)
return result
# CLI for testing
async def main():
"""Test the Dataverse client."""
import argparse
parser = argparse.ArgumentParser(description="Dataverse API Client")
parser.add_argument("--api-key", help="Dataverse API key")
parser.add_argument("--dataset", default="doi:10.7910/DVN/NJTBEM", help="Dataset DOI")
parser.add_argument("--output", help="Output directory")
parser.add_argument("--metadata-only", action="store_true", help="Only fetch metadata")
args = parser.parse_args()
client = DataverseClient(api_key=args.api_key)
if args.metadata_only:
# Just fetch metadata
metadata = await client.get_dataset_metadata(args.dataset)
if metadata:
print(json.dumps(metadata, indent=2))
else:
# Download full dataset
output_dir = Path(args.output) if args.output else None
result = await client.download_dataset(args.dataset, output_dir)
print("\nDownload Summary:")
print(f"Status: {result['status']}")
print(f"Downloaded: {result['downloaded']} files")
print(f"Failed: {result['failed']} files")
print(f"Output: {result['output_dir']}")
if __name__ == "__main__":
asyncio.run(main())