import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from concurrent.futures import ThreadPoolExecutor, as_completed class HuggingFaceAPI: BASE_URL = "https://huggingface.co/api" def __init__(self, token=None): self.token = token self.session = requests.Session() if token: self.session.headers["Authorization"] = f"Bearer {token}" @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((requests.RequestException, requests.Timeout)), ) def _request(self, method, endpoint, **kwargs): kwargs.setdefault("timeout", 15) url = f"{self.BASE_URL}/{endpoint}" resp = self.session.request(method, url, **kwargs) resp.raise_for_status() return resp.json() def _get(self, endpoint, **kwargs): return self._request("GET", endpoint, **kwargs) def _post(self, endpoint, **kwargs): return self._request("POST", endpoint, **kwargs) def list_spaces(self, author): return self._get(f"spaces?author={author}") def get_space(self, space_id): return self._get(f"spaces/{space_id}") def get_space_discussions(self, space_id): data = self._get(f"spaces/{space_id}/discussions") return data.get("discussions", []) def restart_space(self, space_id): return self._post(f"spaces/{space_id}/restart") def fetch_spaces_with_details(self, author, progress_callback=None): """Fetch all spaces for a user with full details (parallel).""" space_list = self.list_spaces(author) space_ids = [s.get("id", "") for s in space_list if s.get("id")] total = len(space_ids) spaces = [] completed = 0 with ThreadPoolExecutor(max_workers=10) as executor: future_to_id = { executor.submit(self._safe_get_space, sid): sid for sid in space_ids } details_map = {} for future in as_completed(future_to_id): sid = future_to_id[future] details_map[sid] = future.result() completed += 1 if progress_callback: progress_callback("spaces", completed, total) for space in space_list: sid = space.get("id", "") detail = details_map.get(sid) spaces.append(detail if detail else space) spaces.sort(key=lambda x: x.get("likes", 0), reverse=True) return spaces def _safe_get_space(self, space_id): try: return self.get_space(space_id) except Exception: return None def fetch_all_discussions(self, spaces, progress_callback=None): """Fetch discussions for all spaces (parallel).""" total = len(spaces) completed = 0 all_discussions = {} with ThreadPoolExecutor(max_workers=10) as executor: future_to_id = { executor.submit(self._safe_get_discussions, s.get("id", "")): s.get("id", "") for s in spaces } for future in as_completed(future_to_id): sid = future_to_id[future] all_discussions[sid] = future.result() or [] completed += 1 if progress_callback: progress_callback("discussions", completed, total) return all_discussions def _safe_get_discussions(self, space_id): try: return self.get_space_discussions(space_id) except Exception: return [] def restart_space_safe(self, space_id): """Restart a space, returning detailed result.""" try: url = f"{self.BASE_URL}/spaces/{space_id}/restart" resp = self.session.post(url, timeout=15) if resp.status_code == 200: return {"id": space_id, "success": True} else: # Try to get error message from response try: error_data = resp.json() error_msg = error_data.get("error", resp.text[:200]) except Exception: error_msg = f"HTTP {resp.status_code}: {resp.text[:200]}" return {"id": space_id, "success": False, "error": error_msg} except Exception as e: return {"id": space_id, "success": False, "error": str(e)} def wake_spaces(self, space_ids, progress_callback=None): """Wake multiple spaces, returning results.""" total = len(space_ids) results = [] for i, space_id in enumerate(space_ids): result = self.restart_space_safe(space_id) results.append(result) if progress_callback: progress_callback("wake", i + 1, total) return results