hfproxydemo / huggingface_client_v2.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
import aiohttp
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
import json
import time
from enum import Enum
from config import get_config, APIConfig
from data_models import (
SpaceInfo, SpaceStatusInfo, SpaceStatus, SpaceRuntime,
BuildLog, BuildLogEntry, WebhookEvent, EventType, LogLevel
)
class HuggingFaceClient:
HuggingFace API 客户端实现
def __init__(self, token: Optional[str] = None, config: Optional[APIConfig] = None):
self.config = config or get_config().api
self.token = token or self.config.token
self.base_url = self.config.base_url
self.headers = {
"Authorization": f"Bearer {self.token}",
"User-Agent": self.config.user_agent
}
self.logger = logging.getLogger(__name__)
self.session: Optional[aiohttp.ClientSession] = None
self._last_request_time = 0
self._request_count = 0
async def _get_session(self) -> aiohttp.ClientSession:
if self.session is None:
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
self.session = aiohttp.ClientSession(
headers=self.headers,
timeout=timeout
)
return self.session
async def close(self) -> None:
if self.session:
await self.session.close()
self.session = None
async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
session = await self._get_session()
url = f"{self.base_url}/{endpoint.lstrip('/')}"
await self._rate_limit()
try:
async with session.request(method, url, **kwargs) as response:
if response.status == 200:
return await response.json()
elif response.status == 401:
raise Exception("认证失败,请检查 HF_TOKEN")
elif response.status == 403:
raise Exception("权限不足,无法访问此资源")
elif response.status == 404:
raise Exception("资源不存在")
elif response.status == 429:
retry_after = int(response.headers.get('Retry-After', 60))
self.logger.warning(f"请求频率限制,等待 {retry_after} 秒")
await asyncio.sleep(retry_after)
return await self._make_request(method, endpoint, **kwargs)
else:
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
except aiohttp.ClientError as e:
self.logger.error(f"网络请求失败: {e}")
raise
except Exception as e:
self.logger.error(f"请求异常: {e}")
raise
async def _rate_limit(self) -> None:
now = time.time()
if now - self._last_request_time < 60:
self._request_count += 1
if self._request_count >= self.config.rate_limit_per_minute:
wait_time = 60 - (now - self._last_request_time)
if wait_time > 0:
self.logger.debug(f"达到速率限制,等待 {wait_time:.1f} 秒")
await asyncio.sleep(wait_time)
self._request_count = 0
self._last_request_time = time.time()
else:
self._request_count = 1
self._last_request_time = now
async def get_space_info(self, space_id: str) -> SpaceInfo:
try:
data = await self._make_request("GET", f"spaces/{space_id}")
return SpaceInfo(
space_id=data.get('id', space_id),
name=data.get('id', space_id),
repository_url=data.get('url', ''),
description=data.get('description'),
author=data.get('author', ''),
tags=data.get('tags', []),
sdk=data.get('sdk'),
python_version=data.get('pythonVersion'),
created_at=datetime.fromisoformat(data['createdAt'].replace('Z', '+00:00')) if data.get('createdAt') else None,
last_modified=datetime.fromisoformat(data['lastModified'].replace('Z', '+00:00')) if data.get('lastModified') else None
)
except Exception as e:
self.logger.error(f"获取 Space {space_id} 信息失败: {e}")
raise
async def get_space_runtime(self, space_id: str) -> SpaceRuntime:
try:
data = await self._make_request("GET", f"spaces/{space_id}/runtime")
return SpaceRuntime(
stage=data.get('stage', 'UNKNOWN'),
state=data.get('state', 'UNKNOWN'),
hardware=data.get('hardware', {}),
replicas=data.get('replicas'),
requested_hardware=data.get('requestedHardware', {}),
acs_type=data.get('acsType'),
storage=data.get('storage'),
sha=data.get('sha')
)
except Exception as e:
self.logger.error(f"获取 Space {space_id} 运行时信息失败: {e}")
raise
async def get_space_status(self, space_id: str) -> SpaceStatusInfo:
try:
data = await self._make_request("GET", f"spaces/{space_id}")
runtime_data = await self.get_space_runtime(space_id)
stage = runtime_data.stage.upper()
state = runtime_data.state.upper()
if stage == 'BUILDING':
status = SpaceStatus.BUILDING
elif stage == 'RUNNING':
if state == 'RUNNING':
status = SpaceStatus.RUNNING
else:
status = SpaceStatus.ERROR
elif stage == 'STOPPED':
status = SpaceStatus.STOPPED
elif stage == 'PAUSED':
status = SpaceStatus.PAUSED
elif stage == 'SLEEPING':
status = SpaceStatus.SLEEPING
else:
status = SpaceStatus.ERROR
return SpaceStatusInfo(
space_id=space_id,
status=status,
runtime=runtime_data,
timestamp=datetime.now(),
url=data.get('url'),
emoji=data.get('emoji'),
color=data.get('color'),
likes=data.get('likes'),
tags=data.get('tags', [])
)
except Exception as e:
self.logger.error(f"获取 Space {space_id} 状态失败: {e}")
return SpaceStatusInfo(
space_id=space_id,
status=SpaceStatus.UNKNOWN,
runtime=SpaceRuntime(stage='UNKNOWN', state='UNKNOWN'),
timestamp=datetime.now()
)
async def get_space_logs(self, space_id: str, lines: int = 100) -> BuildLog:
try:
data = await self._make_request("GET", f"spaces/{space_id}/logs", params={"lines": lines})
entries = []
if isinstance(data, list):
for i, entry in enumerate(data):
if isinstance(entry, dict):
message = entry.get('message', str(entry))
level = LogLevel.INFO
if 'error' in message.lower() or 'failed' in message.lower():
level = LogLevel.ERROR
elif 'warning' in message.lower():
level = LogLevel.WARNING
entries.append(BuildLogEntry(
timestamp=datetime.now(),
level=level,
message=message,
source=entry.get('source'),
line_number=i
))
elif isinstance(entry, str):
level = LogLevel.INFO
if 'error' in entry.lower() or 'failed' in entry.lower():
level = LogLevel.ERROR
elif 'warning' in entry.lower():
level = LogLevel.WARNING
entries.append(BuildLogEntry(
timestamp=datetime.now(),
level=level,
message=entry,
line_number=i
))
return BuildLog(
space_id=space_id,
entries=entries,
start_time=entries[0].timestamp if entries else datetime.now(),
end_time=entries[-1].timestamp if entries else datetime.now(),
total_lines=len(entries)
)
except Exception as e:
self.logger.error(f"获取 Space {space_id} 日志失败: {e}")
return BuildLog(
space_id=space_id,
entries=[BuildLogEntry(
timestamp=datetime.now(),
level=LogLevel.ERROR,
message=f"获取日志失败: {str(e)}"
)],
total_lines=1
)
async def restart_space(self, space_id: str) -> bool:
try:
await self._make_request("POST", f"spaces/{space_id}/restart")
self.logger.info(f"成功重启 Space {space_id}")
return True
except Exception as e:
self.logger.error(f"重启 Space {space_id} 失败: {e}")
return False
async def pause_space(self, space_id: str) -> bool:
try:
await self._make_request("POST", f"spaces/{space_id}/pause")
self.logger.info(f"成功暂停 Space {space_id}")
return True
except Exception as e:
self.logger.error(f"暂停 Space {space_id} 失败: {e}")
return False
async def resume_space(self, space_id: str) -> bool:
try:
await self._make_request("POST", f"spaces/{space_id}/resume")
self.logger.info(f"成功恢复 Space {space_id}")
return True
except Exception as e:
self.logger.error(f"恢复 Space {space_id} 失败: {e}")
return False
async def get_space_discussions(self, space_id: str) -> List[Dict[str, Any]]:
try:
data = await self._make_request("GET", f"spaces/{space_id}/discussions")
return data if isinstance(data, list) else []
except Exception as e:
self.logger.error(f"获取 Space {space_id} 讨论失败: {e}")
return []
async def search_spaces(self, query: str, limit: int = 20) -> List[SpaceInfo]:
try:
data = await self._make_request("GET", "spaces", params={
"search": query,
"limit": limit
})
spaces = []
if isinstance(data, list):
for item in data:
spaces.append(SpaceInfo(
space_id=item.get('id', ''),
name=item.get('id', ''),
repository_url=item.get('url', ''),
description=item.get('description'),
author=item.get('author', ''),
tags=item.get('tags', []),
sdk=item.get('sdk'),
python_version=item.get('pythonVersion'),
last_modified=datetime.fromisoformat(item['lastModified'].replace('Z', '+00:00')) if item.get('lastModified') else None
))
return spaces
except Exception as e:
self.logger.error(f"搜索 Spaces 失败: {e}")
return []
async def get_user_spaces(self, author: Optional[str] = None) -> List[SpaceInfo]:
try:
params = {}
if author:
params["author"] = author
data = await self._make_request("GET", "spaces", params=params)
spaces = []
if isinstance(data, list):
for item in data:
spaces.append(SpaceInfo(
space_id=item.get('id', ''),
name=item.get('id', ''),
repository_url=item.get('url', ''),
description=item.get('description'),
author=item.get('author', ''),
tags=item.get('tags', []),
sdk=item.get('sdk'),
python_version=item.get('pythonVersion'),
last_modified=datetime.fromisoformat(item['lastModified'].replace('Z', '+00:00')) if item.get('lastModified') else None
))
return spaces
except Exception as e:
self.logger.error(f"获取用户 Spaces 失败: {e}")
return []
async def validate_token(self) -> bool:
try:
await self._make_request("GET", "whoami-v2")
return True
except Exception as e:
self.logger.error(f"Token 验证失败: {e}")
return False
class WebhookHandler:
HuggingFace Webhook 事件处理器
def __init__(self, client: HuggingFaceClient, secret: Optional[str] = None):
self.client = client
self.secret = secret
self.logger = logging.getLogger(__name__)
self.event_handlers = {
'space.status_updated': self._handle_status_update,
'space.build_error': self._handle_build_error,
'space.started': self._handle_space_started,
'space.stopped': self._handle_space_stopped,
'space.paused': self._handle_space_paused,
'space.resumed': self._handle_space_resumed
}
async def handle_webhook(self, payload: Dict[str, Any], headers: Dict[str, str]) -> WebhookEvent:
try:
if self.secret:
self._verify_signature(payload, headers)
event_type = payload.get('event', 'unknown')
space_id = payload.get('space', {}).get('id', 'unknown')
webhook_event = WebhookEvent(
event_type=EventType(event_type) if event_type in [e.value for e in EventType] else EventType.WEBHOOK_RECEIVED,
space_id=space_id,
timestamp=datetime.now(),
payload=payload
)
if event_type in self.event_handlers:
await self.event_handlers[event_type](payload)
webhook_event.processed = True
else:
self.logger.warning(f"未知事件类型: {event_type}")
return webhook_event
except Exception as e:
self.logger.error(f"处理 Webhook 失败: {e}")
return WebhookEvent(
event_type=EventType.WEBHOOK_RECEIVED,
space_id=payload.get('space', {}).get('id', 'unknown'),
timestamp=datetime.now(),
payload=payload,
error_message=str(e)
)
def _verify_signature(self, payload: Dict[str, Any], headers: Dict[str, str]) -> None:
import hmac
import hashlib
signature = headers.get('X-Hub-Signature-256')
if not signature:
raise ValueError("缺少签名头部")
expected_signature = hmac.new(
self.secret.encode(),
json.dumps(payload, sort_keys=True).encode(),
hashlib.sha256
).hexdigest()
expected_signature = f"sha256={expected_signature}"
if not hmac.compare_digest(signature, expected_signature):
raise ValueError("签名验证失败")
async def _handle_status_update(self, payload: Dict[str, Any]) -> None:
space_data = payload.get('space', {})
space_id = space_data.get('id')
runtime_data = space_data.get('runtime', {})
self.logger.info(f"Space {space_id} 状态更新: {runtime_data}")
async def _handle_build_error(self, payload: Dict[str, Any]) -> None:
space_id = payload.get('space', {}).get('id')
self.logger.error(f"Space {space_id} 构建失败")
try:
logs = await self.client.get_space_logs(space_id, lines=50)
error_entries = [entry for entry in logs.entries if entry.level == LogLevel.ERROR]
if error_entries:
self.logger.error(f"发现 {len(error_entries)} 条错误日志")
for entry in error_entries[-5:]:
self.logger.error(f" {entry.message}")
except Exception as e:
self.logger.error(f"获取错误日志失败: {e}")
async def _handle_space_started(self, payload: Dict[str, Any]) -> None:
space_id = payload.get('space', {}).get('id')
self.logger.info(f"Space {space_id} 启动成功")
async def _handle_space_stopped(self, payload: Dict[str, Any]) -> None:
space_id = payload.get('space', {}).get('id')
self.logger.info(f"Space {space_id} 已停止")
async def _handle_space_paused(self, payload: Dict[str, Any]) -> None:
space_id = payload.get('space', {}).get('id')
self.logger.info(f"Space {space_id} 已暂停")
async def _handle_space_resumed(self, payload: Dict[str, Any]) -> None:
space_id = payload.get('space', {}).get('id')
self.logger.info(f"Space {space_id} 已恢复")
class RetryClient:
带重试机制的客户端包装器
def __init__(self, client: HuggingFaceClient, max_retries: int = 3,
base_delay: float = 1.0, max_delay: float = 60.0):
self.client = client
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.logger = logging.getLogger(__name__)
async def get_space_status(self, space_id: str) -> SpaceStatusInfo:
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await self.client.get_space_status(space_id)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
self.logger.warning(f"获取状态失败,{delay} 秒后重试 ({attempt + 1}/{self.max_retries}): {e}")
await asyncio.sleep(delay)
else:
self.logger.error(f"获取状态最终失败: {e}")
raise last_exception
async def get_space_logs(self, space_id: str, lines: int = 100) -> BuildLog:
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await self.client.get_space_logs(space_id, lines)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
self.logger.warning(f"获取日志失败,{delay} 秒后重试 ({attempt + 1}/{self.max_retries}): {e}")
await asyncio.sleep(delay)
else:
self.logger.error(f"获取日志最终失败: {e}")
raise last_exception