Spaces:
Runtime error
Runtime error
| import logging | |
| import json | |
| from typing import Any, List, Optional, Dict, Tuple | |
| import requests | |
| from huggingface_hub import HfApi | |
| from app.core.config import settings | |
| from app.schemas.dataset_common import ImpactLevel | |
| from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key, get_redis_sync | |
| import time | |
| import asyncio | |
| import redis | |
| import gzip | |
| from datetime import datetime, timezone | |
| import os | |
| from app.schemas.dataset import ImpactAssessment | |
| from app.schemas.dataset_common import DatasetMetrics | |
| import httpx | |
| import redis.asyncio as aioredis | |
| log = logging.getLogger(__name__) | |
| api = HfApi() | |
| redis_client = redis.Redis(host="redis", port=6379, decode_responses=True) | |
| # Thresholds for impact categorization | |
| SIZE_THRESHOLD_LOW = 100 * 1024 * 1024 # 100 MB | |
| SIZE_THRESHOLD_MEDIUM = 1024 * 1024 * 1024 # 1 GB | |
| DOWNLOADS_THRESHOLD_LOW = 1000 | |
| DOWNLOADS_THRESHOLD_MEDIUM = 10000 | |
| LIKES_THRESHOLD_LOW = 10 | |
| LIKES_THRESHOLD_MEDIUM = 100 | |
| HF_API_URL = "https://huggingface.co/api/datasets" | |
| DATASET_CACHE_TTL = 60 * 60 # 1 hour | |
| # Redis and HuggingFace API setup | |
| REDIS_KEY = "hf:datasets:all:compressed" | |
| REDIS_META_KEY = "hf:datasets:meta" | |
| REDIS_TTL = 60 * 60 # 1 hour | |
| # Impact thresholds (in bytes) | |
| SIZE_LOW = 100 * 1024 * 1024 | |
| SIZE_MEDIUM = 1024 * 1024 * 1024 | |
| def get_hf_token(): | |
| token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") | |
| if not token: | |
| raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") | |
| return token | |
| def get_dataset_commits(dataset_id: str, limit: int = 20): | |
| from huggingface_hub import HfApi | |
| import logging | |
| log = logging.getLogger(__name__) | |
| api = HfApi() | |
| log.info(f"[get_dataset_commits] Fetching commits for dataset_id={dataset_id}") | |
| try: | |
| commits = api.list_repo_commits(repo_id=dataset_id, repo_type="dataset") | |
| log.info(f"[get_dataset_commits] Received {len(commits)} commits for {dataset_id}") | |
| except Exception as e: | |
| log.error(f"[get_dataset_commits] Error fetching commits for {dataset_id}: {e}", exc_info=True) | |
| raise # Let the API layer catch and handle this | |
| result = [] | |
| for c in commits[:limit]: | |
| try: | |
| commit_id = getattr(c, "commit_id", "") | |
| title = getattr(c, "title", "") | |
| message = getattr(c, "message", title) | |
| authors = getattr(c, "authors", []) | |
| author_name = authors[0] if authors and isinstance(authors, list) else "" | |
| created_at = getattr(c, "created_at", None) | |
| if created_at: | |
| if hasattr(created_at, "isoformat"): | |
| date = created_at.isoformat() | |
| else: | |
| date = str(created_at) | |
| else: | |
| date = "" | |
| result.append({ | |
| "id": commit_id or "", | |
| "title": title or message or "", | |
| "message": message or title or "", | |
| "author": {"name": author_name, "email": ""}, | |
| "date": date, | |
| }) | |
| except Exception as e: | |
| log.error(f"[get_dataset_commits] Error parsing commit: {e} | Commit: {getattr(c, '__dict__', str(c))}", exc_info=True) | |
| log.info(f"[get_dataset_commits] Returning {len(result)} parsed commits for {dataset_id}") | |
| return result | |
| def get_dataset_files(dataset_id: str) -> List[str]: | |
| return api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
| def get_file_url(dataset_id: str, filename: str, revision: Optional[str] = None) -> str: | |
| from huggingface_hub import hf_hub_url | |
| return hf_hub_url(repo_id=dataset_id, filename=filename, repo_type="dataset", revision=revision) | |
| def get_datasets_page_from_zset(offset: int = 0, limit: int = 10, search: str = None) -> dict: | |
| import redis | |
| import json | |
| redis_client = redis.Redis(host="redis", port=6379, db=0, decode_responses=True) | |
| zset_key = "hf:datasets:all:zset" | |
| hash_key = "hf:datasets:all:hash" | |
| # Get total count | |
| total = redis_client.zcard(zset_key) | |
| # Get dataset IDs for the page | |
| ids = redis_client.zrange(zset_key, offset, offset + limit - 1) | |
| # Fetch metadata for those IDs | |
| if not ids: | |
| return {"items": [], "count": total} | |
| items = redis_client.hmget(hash_key, ids) | |
| # Parse JSON and filter/search if needed | |
| parsed = [] | |
| for raw in items: | |
| if not raw: | |
| continue | |
| try: | |
| item = json.loads(raw) | |
| parsed.append(item) | |
| except Exception: | |
| continue | |
| if search: | |
| parsed = [d for d in parsed if search.lower() in (d.get("id") or "").lower()] | |
| return {"items": parsed, "count": total} | |
| async def _fetch_size(session: httpx.AsyncClient, dataset_id: str) -> Optional[int]: | |
| """Fetch dataset size from the datasets server asynchronously.""" | |
| url = f"https://datasets-server.huggingface.co/size?dataset={dataset_id}" | |
| try: | |
| resp = await session.get(url, timeout=30) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return data.get("size", {}).get("dataset", {}).get("num_bytes_original_files") | |
| except Exception as e: | |
| log.warning(f"Could not fetch size for {dataset_id}: {e}") | |
| return None | |
| async def _fetch_sizes(dataset_ids: List[str]) -> Dict[str, Optional[int]]: | |
| """Fetch dataset sizes in parallel.""" | |
| results: Dict[str, Optional[int]] = {} | |
| async with httpx.AsyncClient() as session: | |
| tasks = {dataset_id: asyncio.create_task(_fetch_size(session, dataset_id)) for dataset_id in dataset_ids} | |
| for dataset_id, task in tasks.items(): | |
| results[dataset_id] = await task | |
| return results | |
| def process_datasets_page(offset, limit): | |
| """ | |
| Fetch and process a single page of datasets from Hugging Face and cache them in Redis. | |
| """ | |
| import redis | |
| import os | |
| import json | |
| import asyncio | |
| log = logging.getLogger(__name__) | |
| log.info(f"[process_datasets_page] ENTRY: offset={offset}, limit={limit}") | |
| token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") | |
| if not token: | |
| log.error("[process_datasets_page] HUGGINGFACEHUB_API_TOKEN environment variable is not set.") | |
| raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "User-Agent": "Mozilla/5.0 (compatible; CollinearTool/1.0; +https://yourdomain.com)" | |
| } | |
| params = {"limit": limit, "offset": offset, "full": "True"} | |
| redis_client = redis.Redis(host="redis", port=6379, db=0, decode_responses=True) | |
| stream_key = "hf:datasets:all:stream" | |
| zset_key = "hf:datasets:all:zset" | |
| hash_key = "hf:datasets:all:hash" | |
| try: | |
| log.info(f"[process_datasets_page] Requesting {HF_API_URL} with params={params}") | |
| response = requests.get(HF_API_URL, headers=headers, params=params, timeout=120) | |
| response.raise_for_status() | |
| page_items = response.json() | |
| log.info(f"[process_datasets_page] Received {len(page_items)} datasets at offset {offset}") | |
| dataset_ids = [ds.get("id") for ds in page_items] | |
| size_map = asyncio.run(_fetch_sizes(dataset_ids)) | |
| for ds in page_items: | |
| dataset_id = ds.get("id") | |
| size_bytes = size_map.get(dataset_id) | |
| downloads = ds.get("downloads") | |
| likes = ds.get("likes") | |
| impact_level, assessment_method = determine_impact_level_by_criteria(size_bytes, downloads, likes) | |
| metrics = DatasetMetrics(size_bytes=size_bytes, downloads=downloads, likes=likes) | |
| thresholds = { | |
| "size_bytes": { | |
| "low": str(100 * 1024 * 1024), | |
| "medium": str(1 * 1024 * 1024 * 1024), | |
| "high": str(10 * 1024 * 1024 * 1024) | |
| } | |
| } | |
| impact_assessment = ImpactAssessment( | |
| dataset_id=dataset_id, | |
| impact_level=impact_level, | |
| assessment_method=assessment_method, | |
| metrics=metrics, | |
| thresholds=thresholds | |
| ).model_dump() | |
| item = { | |
| "id": dataset_id, | |
| "name": ds.get("name"), | |
| "description": ds.get("description"), | |
| "size_bytes": size_bytes, | |
| "impact_level": impact_level.value if isinstance(impact_level, ImpactLevel) else impact_level, | |
| "downloads": downloads, | |
| "likes": likes, | |
| "tags": ds.get("tags", []), | |
| "impact_assessment": json.dumps(impact_assessment) | |
| } | |
| final_item = {} | |
| for k, v in item.items(): | |
| if isinstance(v, list) or isinstance(v, dict): | |
| final_item[k] = json.dumps(v) | |
| elif v is None: | |
| final_item[k] = 'null' | |
| else: | |
| final_item[k] = str(v) | |
| redis_client.xadd(stream_key, final_item) | |
| redis_client.zadd(zset_key, {dataset_id: offset}) | |
| redis_client.hset(hash_key, dataset_id, json.dumps(item)) | |
| log.info(f"[process_datasets_page] EXIT: Cached {len(page_items)} datasets at offset {offset}") | |
| return len(page_items) | |
| except Exception as exc: | |
| log.error(f"[process_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True) | |
| raise | |
| def refresh_datasets_cache(): | |
| """ | |
| Orchestrator: Enqueue Celery tasks to fetch all Hugging Face datasets in parallel. | |
| Uses direct calls to HF API. | |
| """ | |
| import requests | |
| log.info("[refresh_datasets_cache] Orchestrating dataset fetch tasks using direct HF API calls.") | |
| token = os.environ.get("HUGGINGFACEHUB_API_TOKEN") | |
| if not token: | |
| log.error("[refresh_datasets_cache] HUGGINGFACEHUB_API_TOKEN environment variable is not set.") | |
| raise RuntimeError("HUGGINGFACEHUB_API_TOKEN environment variable is not set. Please set it securely.") | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "User-Agent": "Mozilla/5.0 (compatible; CollinearTool/1.0; +https://yourdomain.com)" | |
| } | |
| limit = 500 | |
| params = {"limit": 1, "offset": 0} | |
| try: | |
| response = requests.get(HF_API_URL, headers=headers, params=params, timeout=120) | |
| response.raise_for_status() | |
| total_str = response.headers.get('X-Total-Count') | |
| if not total_str: | |
| log.error("[refresh_datasets_cache] 'X-Total-Count' header not found in HF API response.") | |
| raise ValueError("'X-Total-Count' header missing from Hugging Face API response.") | |
| total = int(total_str) | |
| log.info(f"[refresh_datasets_cache] Total datasets reported by HF API: {total}") | |
| except requests.RequestException as e: | |
| log.error(f"[refresh_datasets_cache] Error fetching total dataset count from HF API: {e}") | |
| raise | |
| except ValueError as e: | |
| log.error(f"[refresh_datasets_cache] Error parsing total dataset count: {e}") | |
| raise | |
| num_pages = (total + limit - 1) // limit | |
| from app.tasks.dataset_tasks import fetch_datasets_page | |
| from celery import group | |
| tasks = [] | |
| for page_num in range(num_pages): | |
| offset = page_num * limit | |
| tasks.append(fetch_datasets_page.s(offset, limit)) | |
| log.info(f"[refresh_datasets_cache] Scheduled page at offset {offset}, limit {limit}.") | |
| if tasks: | |
| group(tasks).apply_async() | |
| log.info(f"[refresh_datasets_cache] Enqueued {len(tasks)} fetch tasks.") | |
| else: | |
| log.warning("[refresh_datasets_cache] No dataset pages found to schedule.") | |
| def determine_impact_level_by_criteria(size_bytes, downloads=None, likes=None): | |
| try: | |
| size = int(size_bytes) if size_bytes not in (None, 'null') else 0 | |
| except Exception: | |
| size = 0 | |
| # Prefer size_bytes if available | |
| if size >= 10 * 1024 * 1024 * 1024: | |
| return ("high", "large_size") | |
| elif size >= 1 * 1024 * 1024 * 1024: | |
| return ("medium", "medium_size") | |
| elif size >= 100 * 1024 * 1024: | |
| return ("low", "small_size") | |
| # Fallback to downloads if size_bytes is missing or too small | |
| if downloads is not None: | |
| try: | |
| downloads = int(downloads) | |
| if downloads >= 100000: | |
| return ("high", "downloads") | |
| elif downloads >= 10000: | |
| return ("medium", "downloads") | |
| elif downloads >= 1000: | |
| return ("low", "downloads") | |
| except Exception: | |
| pass | |
| # Fallback to likes if downloads is missing | |
| if likes is not None: | |
| try: | |
| likes = int(likes) | |
| if likes >= 1000: | |
| return ("high", "likes") | |
| elif likes >= 100: | |
| return ("medium", "likes") | |
| elif likes >= 10: | |
| return ("low", "likes") | |
| except Exception: | |
| pass | |
| return ("not_available", "size_and_downloads_and_likes_unknown") | |
| def get_dataset_size(dataset: dict, dataset_id: str = None): | |
| """ | |
| Extract the size in bytes from a dataset dictionary. | |
| Tries multiple locations based on possible HuggingFace API responses. | |
| """ | |
| # Try top-level key | |
| size_bytes = dataset.get("size_bytes") | |
| if size_bytes not in (None, 'null'): | |
| return size_bytes | |
| # Try nested structure from the size API | |
| size_bytes = ( | |
| dataset.get("size", {}) | |
| .get("dataset", {}) | |
| .get("num_bytes_original_files") | |
| ) | |
| if size_bytes not in (None, 'null'): | |
| return size_bytes | |
| # Try metrics or info sub-dictionaries if present | |
| for key in ["metrics", "info"]: | |
| sub = dataset.get(key, {}) | |
| if isinstance(sub, dict): | |
| size_bytes = sub.get("size_bytes") | |
| if size_bytes not in (None, 'null'): | |
| return size_bytes | |
| # Not found | |
| return None | |
| async def get_datasets_page_from_zset_async(offset: int = 0, limit: int = 10, search: str = None) -> dict: | |
| redis_client = aioredis.Redis(host="redis", port=6379, db=0, decode_responses=True) | |
| zset_key = "hf:datasets:all:zset" | |
| hash_key = "hf:datasets:all:hash" | |
| total = await redis_client.zcard(zset_key) | |
| ids = await redis_client.zrange(zset_key, offset, offset + limit - 1) | |
| if not ids: | |
| return {"items": [], "count": total} | |
| items = await redis_client.hmget(hash_key, ids) | |
| parsed = [] | |
| for raw in items: | |
| if not raw: | |
| continue | |
| try: | |
| item = json.loads(raw) | |
| parsed.append(item) | |
| except Exception: | |
| continue | |
| if search: | |
| parsed = [d for d in parsed if search.lower() in (d.get("id") or "").lower()] | |
| return {"items": parsed, "count": total} | |
| async def get_dataset_commits_async(dataset_id: str, limit: int = 20): | |
| from huggingface_hub import HfApi | |
| import logging | |
| log = logging.getLogger(__name__) | |
| api = HfApi() | |
| log.info(f"[get_dataset_commits_async] Fetching commits for dataset_id={dataset_id}") | |
| try: | |
| # huggingface_hub is sync, so run in threadpool | |
| import anyio | |
| commits = await anyio.to_thread.run_sync(api.list_repo_commits, repo_id=dataset_id, repo_type="dataset") | |
| log.info(f"[get_dataset_commits_async] Received {len(commits)} commits for {dataset_id}") | |
| except Exception as e: | |
| log.error(f"[get_dataset_commits_async] Error fetching commits for {dataset_id}: {e}", exc_info=True) | |
| raise | |
| result = [] | |
| for c in commits[:limit]: | |
| try: | |
| commit_id = getattr(c, "commit_id", "") | |
| title = getattr(c, "title", "") | |
| message = getattr(c, "message", title) | |
| authors = getattr(c, "authors", []) | |
| author_name = authors[0] if authors and isinstance(authors, list) else "" | |
| created_at = getattr(c, "created_at", None) | |
| if created_at: | |
| if hasattr(created_at, "isoformat"): | |
| date = created_at.isoformat() | |
| else: | |
| date = str(created_at) | |
| else: | |
| date = "" | |
| result.append({ | |
| "id": commit_id or "", | |
| "title": title or message or "", | |
| "message": message or title or "", | |
| "author": {"name": author_name, "email": ""}, | |
| "date": date, | |
| }) | |
| except Exception as e: | |
| log.error(f"[get_dataset_commits_async] Error parsing commit: {e} | Commit: {getattr(c, '__dict__', str(c))}", exc_info=True) | |
| log.info(f"[get_dataset_commits_async] Returning {len(result)} parsed commits for {dataset_id}") | |
| return result | |
| async def get_dataset_files_async(dataset_id: str) -> List[str]: | |
| from huggingface_hub import HfApi | |
| import anyio | |
| api = HfApi() | |
| # huggingface_hub is sync, so run in threadpool | |
| return await anyio.to_thread.run_sync(api.list_repo_files, repo_id=dataset_id, repo_type="dataset") | |
| async def get_file_url_async(dataset_id: str, filename: str, revision: Optional[str] = None) -> str: | |
| from huggingface_hub import hf_hub_url | |
| import anyio | |
| # huggingface_hub is sync, so run in threadpool | |
| return await anyio.to_thread.run_sync(hf_hub_url, repo_id=dataset_id, filename=filename, repo_type="dataset", revision=revision) | |
| # Fetch and cache all datasets | |
| class EnhancedJSONEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, datetime): | |
| return obj.isoformat() | |
| return super().default(obj) | |
| async def fetch_size(session, dataset_id, token=None): | |
| url = f"https://datasets-server.huggingface.co/size?dataset={dataset_id}" | |
| headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| try: | |
| resp = await session.get(url, headers=headers, timeout=30) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return dataset_id, data.get("size", {}).get("dataset", {}).get("num_bytes_original_files") | |
| except Exception as e: | |
| log.warning(f"Could not fetch size for {dataset_id}: {e}") | |
| return dataset_id, None | |
| async def fetch_all_sizes(dataset_ids, token=None, batch_size=50): | |
| results = {} | |
| async with httpx.AsyncClient() as session: | |
| for i in range(0, len(dataset_ids), batch_size): | |
| batch = dataset_ids[i:i+batch_size] | |
| tasks = [fetch_size(session, ds_id, token) for ds_id in batch] | |
| batch_results = await asyncio.gather(*tasks) | |
| for ds_id, size in batch_results: | |
| results[ds_id] = size | |
| return results | |
| def fetch_and_cache_all_datasets(token: str): | |
| api = HfApi(token=token) | |
| log.info("Fetching all datasets from Hugging Face Hub...") | |
| all_datasets = list(api.list_datasets()) | |
| all_datasets_dicts = [] | |
| dataset_ids = [d.id for d in all_datasets] | |
| # Fetch all sizes in batches | |
| sizes = asyncio.run(fetch_all_sizes(dataset_ids, token=token, batch_size=50)) | |
| for d in all_datasets: | |
| data = d.__dict__ | |
| size_bytes = sizes.get(d.id) | |
| downloads = data.get("downloads") | |
| likes = data.get("likes") | |
| data["size_bytes"] = size_bytes | |
| impact_level, _ = determine_impact_level_by_criteria(size_bytes, downloads, likes) | |
| data["impact_level"] = impact_level | |
| all_datasets_dicts.append(data) | |
| compressed = gzip.compress(json.dumps(all_datasets_dicts, cls=EnhancedJSONEncoder).encode("utf-8")) | |
| r = redis.Redis(host="redis", port=6379, decode_responses=False) | |
| r.set(REDIS_KEY, compressed) | |
| log.info(f"Cached {len(all_datasets_dicts)} datasets in Redis under {REDIS_KEY}") | |
| return len(all_datasets_dicts) | |
| # Native pagination from cache | |
| def get_datasets_page_from_cache(limit: int, offset: int): | |
| r = redis.Redis(host="redis", port=6379, decode_responses=False) | |
| compressed = r.get(REDIS_KEY) | |
| if not compressed: | |
| return {"error": "Cache not found. Please refresh datasets."}, 404 | |
| all_datasets = json.loads(gzip.decompress(compressed).decode("utf-8")) | |
| total = len(all_datasets) | |
| if offset < 0 or offset >= total: | |
| return {"error": "Offset out of range.", "total": total}, 400 | |
| page = all_datasets[offset:offset+limit] | |
| total_pages = (total + limit - 1) // limit | |
| current_page = (offset // limit) + 1 | |
| next_page = current_page + 1 if offset + limit < total else None | |
| prev_page = current_page - 1 if current_page > 1 else None | |
| return { | |
| "total": total, | |
| "current_page": current_page, | |
| "total_pages": total_pages, | |
| "next_page": next_page, | |
| "prev_page": prev_page, | |
| "items": page | |
| }, 200 |