Spaces:
Sleeping
Sleeping
| import requests | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| class HuggingFaceAPI: | |
| BASE_URL = "https://huggingface.co/api" | |
| def __init__(self, token=None): | |
| self.token = token | |
| self.session = requests.Session() | |
| if token: | |
| self.session.headers["Authorization"] = f"Bearer {token}" | |
| def _request(self, method, endpoint, **kwargs): | |
| kwargs.setdefault("timeout", 15) | |
| url = f"{self.BASE_URL}/{endpoint}" | |
| resp = self.session.request(method, url, **kwargs) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def _get(self, endpoint, **kwargs): | |
| return self._request("GET", endpoint, **kwargs) | |
| def _post(self, endpoint, **kwargs): | |
| return self._request("POST", endpoint, **kwargs) | |
| def list_spaces(self, author): | |
| return self._get(f"spaces?author={author}") | |
| def get_space(self, space_id): | |
| return self._get(f"spaces/{space_id}") | |
| def get_space_discussions(self, space_id): | |
| data = self._get(f"spaces/{space_id}/discussions") | |
| return data.get("discussions", []) | |
| def restart_space(self, space_id): | |
| return self._post(f"spaces/{space_id}/restart") | |
| def fetch_spaces_with_details(self, author, progress_callback=None): | |
| """Fetch all spaces for a user with full details (parallel).""" | |
| space_list = self.list_spaces(author) | |
| space_ids = [s.get("id", "") for s in space_list if s.get("id")] | |
| total = len(space_ids) | |
| spaces = [] | |
| completed = 0 | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| future_to_id = { | |
| executor.submit(self._safe_get_space, sid): sid | |
| for sid in space_ids | |
| } | |
| details_map = {} | |
| for future in as_completed(future_to_id): | |
| sid = future_to_id[future] | |
| details_map[sid] = future.result() | |
| completed += 1 | |
| if progress_callback: | |
| progress_callback("spaces", completed, total) | |
| for space in space_list: | |
| sid = space.get("id", "") | |
| detail = details_map.get(sid) | |
| spaces.append(detail if detail else space) | |
| spaces.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| return spaces | |
| def _safe_get_space(self, space_id): | |
| try: | |
| return self.get_space(space_id) | |
| except Exception: | |
| return None | |
| def fetch_all_discussions(self, spaces, progress_callback=None): | |
| """Fetch discussions for all spaces (parallel).""" | |
| total = len(spaces) | |
| completed = 0 | |
| all_discussions = {} | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| future_to_id = { | |
| executor.submit(self._safe_get_discussions, s.get("id", "")): s.get("id", "") | |
| for s in spaces | |
| } | |
| for future in as_completed(future_to_id): | |
| sid = future_to_id[future] | |
| all_discussions[sid] = future.result() or [] | |
| completed += 1 | |
| if progress_callback: | |
| progress_callback("discussions", completed, total) | |
| return all_discussions | |
| def _safe_get_discussions(self, space_id): | |
| try: | |
| return self.get_space_discussions(space_id) | |
| except Exception: | |
| return [] | |
| def restart_space_safe(self, space_id): | |
| """Restart a space, returning detailed result.""" | |
| try: | |
| url = f"{self.BASE_URL}/spaces/{space_id}/restart" | |
| resp = self.session.post(url, timeout=15) | |
| if resp.status_code == 200: | |
| return {"id": space_id, "success": True} | |
| else: | |
| # Try to get error message from response | |
| try: | |
| error_data = resp.json() | |
| error_msg = error_data.get("error", resp.text[:200]) | |
| except Exception: | |
| error_msg = f"HTTP {resp.status_code}: {resp.text[:200]}" | |
| return {"id": space_id, "success": False, "error": error_msg} | |
| except Exception as e: | |
| return {"id": space_id, "success": False, "error": str(e)} | |
| def wake_spaces(self, space_ids, progress_callback=None): | |
| """Wake multiple spaces, returning results.""" | |
| total = len(space_ids) | |
| results = [] | |
| for i, space_id in enumerate(space_ids): | |
| result = self.restart_space_safe(space_id) | |
| results.append(result) | |
| if progress_callback: | |
| progress_callback("wake", i + 1, total) | |
| return results | |