Spaces:
Build error
Build error
| import aiohttp | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Union | |
| from dataclasses import dataclass | |
| import json | |
| import time | |
| from enum import Enum | |
| from config import get_config, APIConfig | |
| from data_models import ( | |
| SpaceInfo, SpaceStatusInfo, SpaceStatus, SpaceRuntime, | |
| BuildLog, BuildLogEntry, WebhookEvent, EventType, LogLevel | |
| ) | |
| class HuggingFaceClient: | |
| HuggingFace API 客户端实现 | |
| def __init__(self, token: Optional[str] = None, config: Optional[APIConfig] = None): | |
| self.config = config or get_config().api | |
| self.token = token or self.config.token | |
| self.base_url = self.config.base_url | |
| self.headers = { | |
| "Authorization": f"Bearer {self.token}", | |
| "User-Agent": self.config.user_agent | |
| } | |
| self.logger = logging.getLogger(__name__) | |
| self.session: Optional[aiohttp.ClientSession] = None | |
| self._last_request_time = 0 | |
| self._request_count = 0 | |
| async def _get_session(self) -> aiohttp.ClientSession: | |
| if self.session is None: | |
| timeout = aiohttp.ClientTimeout(total=self.config.timeout) | |
| self.session = aiohttp.ClientSession( | |
| headers=self.headers, | |
| timeout=timeout | |
| ) | |
| return self.session | |
| async def close(self) -> None: | |
| if self.session: | |
| await self.session.close() | |
| self.session = None | |
| async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: | |
| session = await self._get_session() | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| await self._rate_limit() | |
| try: | |
| async with session.request(method, url, **kwargs) as response: | |
| if response.status == 200: | |
| return await response.json() | |
| elif response.status == 401: | |
| raise Exception("认证失败,请检查 HF_TOKEN") | |
| elif response.status == 403: | |
| raise Exception("权限不足,无法访问此资源") | |
| elif response.status == 404: | |
| raise Exception("资源不存在") | |
| elif response.status == 429: | |
| retry_after = int(response.headers.get('Retry-After', 60)) | |
| self.logger.warning(f"请求频率限制,等待 {retry_after} 秒") | |
| await asyncio.sleep(retry_after) | |
| return await self._make_request(method, endpoint, **kwargs) | |
| else: | |
| error_text = await response.text() | |
| raise Exception(f"HTTP {response.status}: {error_text}") | |
| except aiohttp.ClientError as e: | |
| self.logger.error(f"网络请求失败: {e}") | |
| raise | |
| except Exception as e: | |
| self.logger.error(f"请求异常: {e}") | |
| raise | |
| async def _rate_limit(self) -> None: | |
| now = time.time() | |
| if now - self._last_request_time < 60: | |
| self._request_count += 1 | |
| if self._request_count >= self.config.rate_limit_per_minute: | |
| wait_time = 60 - (now - self._last_request_time) | |
| if wait_time > 0: | |
| self.logger.debug(f"达到速率限制,等待 {wait_time:.1f} 秒") | |
| await asyncio.sleep(wait_time) | |
| self._request_count = 0 | |
| self._last_request_time = time.time() | |
| else: | |
| self._request_count = 1 | |
| self._last_request_time = now | |
| async def get_space_info(self, space_id: str) -> SpaceInfo: | |
| try: | |
| data = await self._make_request("GET", f"spaces/{space_id}") | |
| return SpaceInfo( | |
| space_id=data.get('id', space_id), | |
| name=data.get('id', space_id), | |
| repository_url=data.get('url', ''), | |
| description=data.get('description'), | |
| author=data.get('author', ''), | |
| tags=data.get('tags', []), | |
| sdk=data.get('sdk'), | |
| python_version=data.get('pythonVersion'), | |
| created_at=datetime.fromisoformat(data['createdAt'].replace('Z', '+00:00')) if data.get('createdAt') else None, | |
| last_modified=datetime.fromisoformat(data['lastModified'].replace('Z', '+00:00')) if data.get('lastModified') else None | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"获取 Space {space_id} 信息失败: {e}") | |
| raise | |
| async def get_space_runtime(self, space_id: str) -> SpaceRuntime: | |
| try: | |
| data = await self._make_request("GET", f"spaces/{space_id}/runtime") | |
| return SpaceRuntime( | |
| stage=data.get('stage', 'UNKNOWN'), | |
| state=data.get('state', 'UNKNOWN'), | |
| hardware=data.get('hardware', {}), | |
| replicas=data.get('replicas'), | |
| requested_hardware=data.get('requestedHardware', {}), | |
| acs_type=data.get('acsType'), | |
| storage=data.get('storage'), | |
| sha=data.get('sha') | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"获取 Space {space_id} 运行时信息失败: {e}") | |
| raise | |
| async def get_space_status(self, space_id: str) -> SpaceStatusInfo: | |
| try: | |
| data = await self._make_request("GET", f"spaces/{space_id}") | |
| runtime_data = await self.get_space_runtime(space_id) | |
| stage = runtime_data.stage.upper() | |
| state = runtime_data.state.upper() | |
| if stage == 'BUILDING': | |
| status = SpaceStatus.BUILDING | |
| elif stage == 'RUNNING': | |
| if state == 'RUNNING': | |
| status = SpaceStatus.RUNNING | |
| else: | |
| status = SpaceStatus.ERROR | |
| elif stage == 'STOPPED': | |
| status = SpaceStatus.STOPPED | |
| elif stage == 'PAUSED': | |
| status = SpaceStatus.PAUSED | |
| elif stage == 'SLEEPING': | |
| status = SpaceStatus.SLEEPING | |
| else: | |
| status = SpaceStatus.ERROR | |
| return SpaceStatusInfo( | |
| space_id=space_id, | |
| status=status, | |
| runtime=runtime_data, | |
| timestamp=datetime.now(), | |
| url=data.get('url'), | |
| emoji=data.get('emoji'), | |
| color=data.get('color'), | |
| likes=data.get('likes'), | |
| tags=data.get('tags', []) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"获取 Space {space_id} 状态失败: {e}") | |
| return SpaceStatusInfo( | |
| space_id=space_id, | |
| status=SpaceStatus.UNKNOWN, | |
| runtime=SpaceRuntime(stage='UNKNOWN', state='UNKNOWN'), | |
| timestamp=datetime.now() | |
| ) | |
| async def get_space_logs(self, space_id: str, lines: int = 100) -> BuildLog: | |
| try: | |
| data = await self._make_request("GET", f"spaces/{space_id}/logs", params={"lines": lines}) | |
| entries = [] | |
| if isinstance(data, list): | |
| for i, entry in enumerate(data): | |
| if isinstance(entry, dict): | |
| message = entry.get('message', str(entry)) | |
| level = LogLevel.INFO | |
| if 'error' in message.lower() or 'failed' in message.lower(): | |
| level = LogLevel.ERROR | |
| elif 'warning' in message.lower(): | |
| level = LogLevel.WARNING | |
| entries.append(BuildLogEntry( | |
| timestamp=datetime.now(), | |
| level=level, | |
| message=message, | |
| source=entry.get('source'), | |
| line_number=i | |
| )) | |
| elif isinstance(entry, str): | |
| level = LogLevel.INFO | |
| if 'error' in entry.lower() or 'failed' in entry.lower(): | |
| level = LogLevel.ERROR | |
| elif 'warning' in entry.lower(): | |
| level = LogLevel.WARNING | |
| entries.append(BuildLogEntry( | |
| timestamp=datetime.now(), | |
| level=level, | |
| message=entry, | |
| line_number=i | |
| )) | |
| return BuildLog( | |
| space_id=space_id, | |
| entries=entries, | |
| start_time=entries[0].timestamp if entries else datetime.now(), | |
| end_time=entries[-1].timestamp if entries else datetime.now(), | |
| total_lines=len(entries) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"获取 Space {space_id} 日志失败: {e}") | |
| return BuildLog( | |
| space_id=space_id, | |
| entries=[BuildLogEntry( | |
| timestamp=datetime.now(), | |
| level=LogLevel.ERROR, | |
| message=f"获取日志失败: {str(e)}" | |
| )], | |
| total_lines=1 | |
| ) | |
| async def restart_space(self, space_id: str) -> bool: | |
| try: | |
| await self._make_request("POST", f"spaces/{space_id}/restart") | |
| self.logger.info(f"成功重启 Space {space_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"重启 Space {space_id} 失败: {e}") | |
| return False | |
| async def pause_space(self, space_id: str) -> bool: | |
| try: | |
| await self._make_request("POST", f"spaces/{space_id}/pause") | |
| self.logger.info(f"成功暂停 Space {space_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"暂停 Space {space_id} 失败: {e}") | |
| return False | |
| async def resume_space(self, space_id: str) -> bool: | |
| try: | |
| await self._make_request("POST", f"spaces/{space_id}/resume") | |
| self.logger.info(f"成功恢复 Space {space_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"恢复 Space {space_id} 失败: {e}") | |
| return False | |
| async def get_space_discussions(self, space_id: str) -> List[Dict[str, Any]]: | |
| try: | |
| data = await self._make_request("GET", f"spaces/{space_id}/discussions") | |
| return data if isinstance(data, list) else [] | |
| except Exception as e: | |
| self.logger.error(f"获取 Space {space_id} 讨论失败: {e}") | |
| return [] | |
| async def search_spaces(self, query: str, limit: int = 20) -> List[SpaceInfo]: | |
| try: | |
| data = await self._make_request("GET", "spaces", params={ | |
| "search": query, | |
| "limit": limit | |
| }) | |
| spaces = [] | |
| if isinstance(data, list): | |
| for item in data: | |
| spaces.append(SpaceInfo( | |
| space_id=item.get('id', ''), | |
| name=item.get('id', ''), | |
| repository_url=item.get('url', ''), | |
| description=item.get('description'), | |
| author=item.get('author', ''), | |
| tags=item.get('tags', []), | |
| sdk=item.get('sdk'), | |
| python_version=item.get('pythonVersion'), | |
| last_modified=datetime.fromisoformat(item['lastModified'].replace('Z', '+00:00')) if item.get('lastModified') else None | |
| )) | |
| return spaces | |
| except Exception as e: | |
| self.logger.error(f"搜索 Spaces 失败: {e}") | |
| return [] | |
| async def get_user_spaces(self, author: Optional[str] = None) -> List[SpaceInfo]: | |
| try: | |
| params = {} | |
| if author: | |
| params["author"] = author | |
| data = await self._make_request("GET", "spaces", params=params) | |
| spaces = [] | |
| if isinstance(data, list): | |
| for item in data: | |
| spaces.append(SpaceInfo( | |
| space_id=item.get('id', ''), | |
| name=item.get('id', ''), | |
| repository_url=item.get('url', ''), | |
| description=item.get('description'), | |
| author=item.get('author', ''), | |
| tags=item.get('tags', []), | |
| sdk=item.get('sdk'), | |
| python_version=item.get('pythonVersion'), | |
| last_modified=datetime.fromisoformat(item['lastModified'].replace('Z', '+00:00')) if item.get('lastModified') else None | |
| )) | |
| return spaces | |
| except Exception as e: | |
| self.logger.error(f"获取用户 Spaces 失败: {e}") | |
| return [] | |
| async def validate_token(self) -> bool: | |
| try: | |
| await self._make_request("GET", "whoami-v2") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Token 验证失败: {e}") | |
| return False | |
| class WebhookHandler: | |
| HuggingFace Webhook 事件处理器 | |
| def __init__(self, client: HuggingFaceClient, secret: Optional[str] = None): | |
| self.client = client | |
| self.secret = secret | |
| self.logger = logging.getLogger(__name__) | |
| self.event_handlers = { | |
| 'space.status_updated': self._handle_status_update, | |
| 'space.build_error': self._handle_build_error, | |
| 'space.started': self._handle_space_started, | |
| 'space.stopped': self._handle_space_stopped, | |
| 'space.paused': self._handle_space_paused, | |
| 'space.resumed': self._handle_space_resumed | |
| } | |
| async def handle_webhook(self, payload: Dict[str, Any], headers: Dict[str, str]) -> WebhookEvent: | |
| try: | |
| if self.secret: | |
| self._verify_signature(payload, headers) | |
| event_type = payload.get('event', 'unknown') | |
| space_id = payload.get('space', {}).get('id', 'unknown') | |
| webhook_event = WebhookEvent( | |
| event_type=EventType(event_type) if event_type in [e.value for e in EventType] else EventType.WEBHOOK_RECEIVED, | |
| space_id=space_id, | |
| timestamp=datetime.now(), | |
| payload=payload | |
| ) | |
| if event_type in self.event_handlers: | |
| await self.event_handlers[event_type](payload) | |
| webhook_event.processed = True | |
| else: | |
| self.logger.warning(f"未知事件类型: {event_type}") | |
| return webhook_event | |
| except Exception as e: | |
| self.logger.error(f"处理 Webhook 失败: {e}") | |
| return WebhookEvent( | |
| event_type=EventType.WEBHOOK_RECEIVED, | |
| space_id=payload.get('space', {}).get('id', 'unknown'), | |
| timestamp=datetime.now(), | |
| payload=payload, | |
| error_message=str(e) | |
| ) | |
| def _verify_signature(self, payload: Dict[str, Any], headers: Dict[str, str]) -> None: | |
| import hmac | |
| import hashlib | |
| signature = headers.get('X-Hub-Signature-256') | |
| if not signature: | |
| raise ValueError("缺少签名头部") | |
| expected_signature = hmac.new( | |
| self.secret.encode(), | |
| json.dumps(payload, sort_keys=True).encode(), | |
| hashlib.sha256 | |
| ).hexdigest() | |
| expected_signature = f"sha256={expected_signature}" | |
| if not hmac.compare_digest(signature, expected_signature): | |
| raise ValueError("签名验证失败") | |
| async def _handle_status_update(self, payload: Dict[str, Any]) -> None: | |
| space_data = payload.get('space', {}) | |
| space_id = space_data.get('id') | |
| runtime_data = space_data.get('runtime', {}) | |
| self.logger.info(f"Space {space_id} 状态更新: {runtime_data}") | |
| async def _handle_build_error(self, payload: Dict[str, Any]) -> None: | |
| space_id = payload.get('space', {}).get('id') | |
| self.logger.error(f"Space {space_id} 构建失败") | |
| try: | |
| logs = await self.client.get_space_logs(space_id, lines=50) | |
| error_entries = [entry for entry in logs.entries if entry.level == LogLevel.ERROR] | |
| if error_entries: | |
| self.logger.error(f"发现 {len(error_entries)} 条错误日志") | |
| for entry in error_entries[-5:]: | |
| self.logger.error(f" {entry.message}") | |
| except Exception as e: | |
| self.logger.error(f"获取错误日志失败: {e}") | |
| async def _handle_space_started(self, payload: Dict[str, Any]) -> None: | |
| space_id = payload.get('space', {}).get('id') | |
| self.logger.info(f"Space {space_id} 启动成功") | |
| async def _handle_space_stopped(self, payload: Dict[str, Any]) -> None: | |
| space_id = payload.get('space', {}).get('id') | |
| self.logger.info(f"Space {space_id} 已停止") | |
| async def _handle_space_paused(self, payload: Dict[str, Any]) -> None: | |
| space_id = payload.get('space', {}).get('id') | |
| self.logger.info(f"Space {space_id} 已暂停") | |
| async def _handle_space_resumed(self, payload: Dict[str, Any]) -> None: | |
| space_id = payload.get('space', {}).get('id') | |
| self.logger.info(f"Space {space_id} 已恢复") | |
| class RetryClient: | |
| 带重试机制的客户端包装器 | |
| def __init__(self, client: HuggingFaceClient, max_retries: int = 3, | |
| base_delay: float = 1.0, max_delay: float = 60.0): | |
| self.client = client | |
| self.max_retries = max_retries | |
| self.base_delay = base_delay | |
| self.max_delay = max_delay | |
| self.logger = logging.getLogger(__name__) | |
| async def get_space_status(self, space_id: str) -> SpaceStatusInfo: | |
| last_exception = None | |
| for attempt in range(self.max_retries + 1): | |
| try: | |
| return await self.client.get_space_status(space_id) | |
| except Exception as e: | |
| last_exception = e | |
| if attempt < self.max_retries: | |
| delay = min(self.base_delay * (2 ** attempt), self.max_delay) | |
| self.logger.warning(f"获取状态失败,{delay} 秒后重试 ({attempt + 1}/{self.max_retries}): {e}") | |
| await asyncio.sleep(delay) | |
| else: | |
| self.logger.error(f"获取状态最终失败: {e}") | |
| raise last_exception | |
| async def get_space_logs(self, space_id: str, lines: int = 100) -> BuildLog: | |
| last_exception = None | |
| for attempt in range(self.max_retries + 1): | |
| try: | |
| return await self.client.get_space_logs(space_id, lines) | |
| except Exception as e: | |
| last_exception = e | |
| if attempt < self.max_retries: | |
| delay = min(self.base_delay * (2 ** attempt), self.max_delay) | |
| self.logger.warning(f"获取日志失败,{delay} 秒后重试 ({attempt + 1}/{self.max_retries}): {e}") | |
| await asyncio.sleep(delay) | |
| else: | |
| self.logger.error(f"获取日志最终失败: {e}") | |
| raise last_exception |