spaces-dashboard / hf_api.py
mrfakename's picture
show error
42cce6f
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutor, as_completed
class HuggingFaceAPI:
BASE_URL = "https://huggingface.co/api"
def __init__(self, token=None):
self.token = token
self.session = requests.Session()
if token:
self.session.headers["Authorization"] = f"Bearer {token}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((requests.RequestException, requests.Timeout)),
)
def _request(self, method, endpoint, **kwargs):
kwargs.setdefault("timeout", 15)
url = f"{self.BASE_URL}/{endpoint}"
resp = self.session.request(method, url, **kwargs)
resp.raise_for_status()
return resp.json()
def _get(self, endpoint, **kwargs):
return self._request("GET", endpoint, **kwargs)
def _post(self, endpoint, **kwargs):
return self._request("POST", endpoint, **kwargs)
def list_spaces(self, author):
return self._get(f"spaces?author={author}")
def get_space(self, space_id):
return self._get(f"spaces/{space_id}")
def get_space_discussions(self, space_id):
data = self._get(f"spaces/{space_id}/discussions")
return data.get("discussions", [])
def restart_space(self, space_id):
return self._post(f"spaces/{space_id}/restart")
def fetch_spaces_with_details(self, author, progress_callback=None):
"""Fetch all spaces for a user with full details (parallel)."""
space_list = self.list_spaces(author)
space_ids = [s.get("id", "") for s in space_list if s.get("id")]
total = len(space_ids)
spaces = []
completed = 0
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {
executor.submit(self._safe_get_space, sid): sid
for sid in space_ids
}
details_map = {}
for future in as_completed(future_to_id):
sid = future_to_id[future]
details_map[sid] = future.result()
completed += 1
if progress_callback:
progress_callback("spaces", completed, total)
for space in space_list:
sid = space.get("id", "")
detail = details_map.get(sid)
spaces.append(detail if detail else space)
spaces.sort(key=lambda x: x.get("likes", 0), reverse=True)
return spaces
def _safe_get_space(self, space_id):
try:
return self.get_space(space_id)
except Exception:
return None
def fetch_all_discussions(self, spaces, progress_callback=None):
"""Fetch discussions for all spaces (parallel)."""
total = len(spaces)
completed = 0
all_discussions = {}
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {
executor.submit(self._safe_get_discussions, s.get("id", "")): s.get("id", "")
for s in spaces
}
for future in as_completed(future_to_id):
sid = future_to_id[future]
all_discussions[sid] = future.result() or []
completed += 1
if progress_callback:
progress_callback("discussions", completed, total)
return all_discussions
def _safe_get_discussions(self, space_id):
try:
return self.get_space_discussions(space_id)
except Exception:
return []
def restart_space_safe(self, space_id):
"""Restart a space, returning detailed result."""
try:
url = f"{self.BASE_URL}/spaces/{space_id}/restart"
resp = self.session.post(url, timeout=15)
if resp.status_code == 200:
return {"id": space_id, "success": True}
else:
# Try to get error message from response
try:
error_data = resp.json()
error_msg = error_data.get("error", resp.text[:200])
except Exception:
error_msg = f"HTTP {resp.status_code}: {resp.text[:200]}"
return {"id": space_id, "success": False, "error": error_msg}
except Exception as e:
return {"id": space_id, "success": False, "error": str(e)}
def wake_spaces(self, space_ids, progress_callback=None):
"""Wake multiple spaces, returning results."""
total = len(space_ids)
results = []
for i, space_id in enumerate(space_ids):
result = self.restart_space_safe(space_id)
results.append(result)
if progress_callback:
progress_callback("wake", i + 1, total)
return results