hfproxydemo / usage_examples.py
OpenCode Deployer
update
4ca5973
"""
使用示例和最佳实践
展示系统的基本使用流程和高级功能
"""
import asyncio
import logging
from datetime import datetime
from typing import List, Dict, Any
from core_system import AutoRepairSystem, SpaceStatus, ErrorType
from huggingface_client import HuggingFaceAPIClient
from error_analyzer import IntelligentErrorAnalyzer
# ============================================================================
# 基本使用示例
# ============================================================================
async def basic_usage_example():
"""基本使用示例"""
# 1. 初始化系统
system = AutoRepairSystem("config.json")
# 2. 配置要监控的 Spaces
space_ids = [
"your-username/space-1",
"your-username/space-2",
"your-username/space-3"
]
print(f"开始监控 {len(space_ids)} 个 Space...")
try:
# 3. 启动系统
await system.start(space_ids)
except KeyboardInterrupt:
print("\n停止监控...")
system.monitor.stop()
# ============================================================================
# 高级使用示例
# ============================================================================
class AdvancedUsageExample:
"""高级使用示例类"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def custom_monitoring_workflow(self, space_ids: List[str]) -> None:
"""自定义监控工作流"""
# 初始化各个组件
hf_client = HuggingFaceAPIClient("your_token_here")
error_analyzer = IntelligentErrorAnalyzer()
for space_id in space_ids:
# 1. 检查状态
status = await hf_client.get_space_status(space_id)
print(f"Space {space_id}: {status.value}")
# 2. 如果有错误,分析日志
if status == SpaceStatus.ERROR:
logs = await hf_client.get_space_logs(space_id, lines=100)
errors = await error_analyzer.analyze_logs(logs)
# 3. 分类并处理错误
for error in errors:
if error.confidence > 0.8:
await self._handle_high_confidence_error(space_id, error)
else:
await self._handle_low_confidence_error(space_id, error)
async def _handle_high_confidence_error(self, space_id: str, error) -> None:
"""处理高置信度错误"""
print(f"高置信度错误 {space_id}: {error.error_type.value}")
if error.error_type == ErrorType.DEPENDENCY_INSTALL:
await self._fix_dependency_error(space_id, error)
elif error.error_type == ErrorType.DOCKERFILE_SYNTAX:
await self._fix_dockerfile_error(space_id, error)
# ... 其他错误类型处理
async def _fix_dependency_error(self, space_id: str, error) -> None:
"""修复依赖错误"""
print(f"修复 {space_id} 的依赖错误...")
# 实现具体的修复逻辑
# 1. 分析依赖类型(Python/Node.js)
# 2. 尝试更换源地址
# 3. 调整版本号
# 4. 重新安装依赖
async def _fix_dockerfile_error(self, space_id: str, error) -> None:
"""修复 Dockerfile 错误"""
print(f"修复 {space_id} 的 Dockerfile 错误...")
# 实现具体的修复逻辑
# 1. 定位错误行
# 2. 语法修正
# 3. 优化命令结构
# ============================================================================
# 批量处理示例
# ============================================================================
class BatchProcessingExample:
"""批量处理示例"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def batch_monitor_spaces(self, space_configs: List[Dict[str, Any]]) -> None:
"""批量监控 Spaces"""
tasks = []
for config in space_configs:
task = self._monitor_single_space(config)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def _monitor_single_space(self, config: Dict[str, Any]) -> None:
"""监控单个 Space"""
space_id = config['space_id']
monitoring_interval = config.get('interval', 60)
max_retries = config.get('max_retries', 3)
retry_count = 0
while retry_count < max_retries:
try:
# 监控逻辑
status = await self._check_space_status(space_id)
if status != SpaceStatus.ERROR:
break
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(monitoring_interval)
except Exception as e:
self.logger.error(f"监控 {space_id} 失败: {e}")
break
async def _check_space_status(self, space_id: str) -> SpaceStatus:
"""检查 Space 状态"""
# 实现状态检查逻辑
pass
# ============================================================================
# 自定义错误分析示例
# ============================================================================
class CustomErrorAnalyzer:
"""自定义错误分析器"""
def __init__(self):
self.custom_patterns = self._load_custom_patterns()
async def analyze_with_custom_rules(self, logs: str) -> List[Dict]:
"""使用自定义规则分析"""
results = []
# 1. 应用自定义模式
for pattern in self.custom_patterns:
matches = pattern['regex'].findall(logs)
if matches:
results.append({
'type': pattern['type'],
'matches': matches,
'severity': pattern['severity'],
'suggested_fix': pattern['fix']
})
# 2. 应用机器学习模型(如果可用)
ml_results = await self._ml_analysis(logs)
results.extend(ml_results)
# 3. 综合评分
scored_results = self._score_results(results)
return scored_results
def _load_custom_patterns(self) -> List[Dict]:
"""加载自定义错误模式"""
return [
{
'name': 'Custom GPU Error',
'regex': re.compile(r'GPU.*out of memory|CUDA.*error'),
'type': 'gpu_error',
'severity': 'high',
'fix': '减少批处理大小或使用更小的模型'
},
{
'name': 'Custom Timeout Pattern',
'regex': re.compile(r'operation.*timeout.*after.*(\d+)ms'),
'type': 'custom_timeout',
'severity': 'medium',
'fix': '增加超时设置或优化性能'
}
]
async def _ml_analysis(self, logs: str) -> List[Dict]:
"""机器学习分析"""
# 这里可以集成预训练的错误分类模型
return []
def _score_results(self, results: List[Dict]) -> List[Dict]:
"""对结果进行评分"""
for result in results:
if result['severity'] == 'high':
result['score'] = 0.9
elif result['severity'] == 'medium':
result['score'] = 0.7
else:
result['score'] = 0.5
return sorted(results, key=lambda x: x['score'], reverse=True)
# ============================================================================
# Webhook 集成示例
# ============================================================================
class WebhookIntegrationExample:
"""Webhook 集成示例"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def setup_webhook_server(self) -> None:
"""设置 Webhook 服务器"""
from fastapi import FastAPI, Request
import uvicorn
app = FastAPI()
@app.post("/webhook/huggingface")
async def handle_hf_webhook(request: Request):
payload = await request.json()
# 处理不同的事件类型
event_type = payload.get('event')
if event_type == 'space.status_updated':
await self._handle_status_update(payload)
elif event_type == 'space.build_error':
await self._handle_build_error(payload)
elif event_type == 'space.started':
await self._handle_space_started(payload)
return {"status": "ok"}
# 启动服务器
config = uvicorn.Config(app, host="0.0.0.0", port=8000)
server = uvicorn.Server(config)
await server.serve()
async def _handle_status_update(self, payload: Dict) -> None:
"""处理状态更新事件"""
space_id = payload.get('space', {}).get('id')
new_status = payload.get('space', {}).get('runtime', {}).get('stage')
self.logger.info(f"Space {space_id} 状态更新: {new_status}")
# 触发相应处理逻辑
if new_status == 'ERROR':
await self._trigger_repair_workflow(space_id)
async def _trigger_repair_workflow(self, space_id: str) -> None:
"""触发修复工作流"""
# 实现修复工作流
pass
# ============================================================================
# 测试和调试示例
# ============================================================================
class TestingExample:
"""测试和调试示例"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def test_error_analysis(self) -> None:
"""测试错误分析功能"""
# 模拟日志数据
sample_logs = """
ERROR: Could not find a version that satisfies the requirement torch==2.0.0
ERROR: No matching distribution found for torch==2.0.0
Build failed
"""
analyzer = IntelligentErrorAnalyzer()
errors = await analyzer.analyze_logs(sample_logs)
print(f"检测到 {len(errors)} 个错误:")
for error in errors:
print(f"- {error.error_type.value}: {error.message}")
print(f" 置信度: {error.confidence}")
async def test_repair_strategies(self) -> None:
"""测试修复策略"""
# 测试不同错误类型的修复策略
from core_system import SmartRepairEngine, ErrorInfo, SpaceInfo
repair_engine = SmartRepairEngine()
test_errors = [
ErrorInfo(
error_type=ErrorType.DEPENDENCY_INSTALL,
message="pip install failed",
log_snippet="ERROR: Could not find torch",
confidence=0.9
),
ErrorInfo(
error_type=ErrorType.DOCKERFILE_SYNTAX,
message="Dockerfile syntax error",
log_snippet="failed to solve: syntax error",
confidence=0.85
)
]
space_info = SpaceInfo(
space_id="test/space",
name="Test Space",
repository_url="",
current_status=SpaceStatus.ERROR,
last_updated=datetime.now()
)
for error in test_errors:
strategy = await repair_engine.generate_strategy(error, space_info)
if strategy:
print(f"修复策略: {strategy.action.value}")
print(f"描述: {strategy.description}")
print(f"成功率: {strategy.success_rate}")
print(f"风险等级: {strategy.risk_level}")
print()
# ============================================================================
# 性能监控示例
# ============================================================================
class PerformanceMonitoringExample:
"""性能监控示例"""
def __init__(self):
self.metrics = {}
async def monitor_system_performance(self) -> None:
"""监控系统性能"""
while True:
# 收集性能指标
current_metrics = await self._collect_metrics()
# 存储和比较指标
self._store_metrics(current_metrics)
# 检查异常
anomalies = self._detect_anomalies(current_metrics)
if anomalies:
await self._handle_anomalies(anomalies)
await asyncio.sleep(60) # 每分钟检查一次
async def _collect_metrics(self) -> Dict[str, Any]:
"""收集性能指标"""
return {
'timestamp': datetime.now(),
'cpu_usage': self._get_cpu_usage(),
'memory_usage': self._get_memory_usage(),
'active_repairs': self._get_active_repairs(),
'queue_size': self._get_queue_size(),
'error_rate': self._get_error_rate()
}
def _store_metrics(self, metrics: Dict[str, Any]) -> None:
"""存储指标"""
# 存储到数据库或时间序列数据库
pass
def _detect_anomalies(self, metrics: Dict[str, Any]) -> List[str]:
"""检测异常"""
anomalies = []
if metrics['cpu_usage'] > 80:
anomalies.append(f"CPU 使用率过高: {metrics['cpu_usage']}%")
if metrics['memory_usage'] > 90:
anomalies.append(f"内存使用率过高: {metrics['memory_usage']}%")
if metrics['error_rate'] > 0.1:
anomalies.append(f"错误率过高: {metrics['error_rate']}")
return anomalies
async def _handle_anomalies(self, anomalies: List[str]) -> None:
"""处理异常"""
for anomaly in anomalies:
self.logger.warning(f"性能异常: {anomaly}")
# 发送告警或自动调整
# ============================================================================
# 主程序示例
# ============================================================================
async def main():
"""主程序示例"""
print("HuggingFace Spaces 自动修复系统示例")
print("=" * 50)
# 选择运行的示例
examples = {
"1": ("基本使用", basic_usage_example),
"2": ("高级使用", lambda: AdvancedUsageExample().custom_monitoring_workflow(
["user/space1", "user/space2"]
)),
"3": ("测试错误分析", lambda: TestingExample().test_error_analysis()),
"4": ("性能监控", lambda: PerformanceMonitoringExample().monitor_system_performance()),
"5": ("Webhook 服务器", lambda: WebhookIntegrationExample().setup_webhook_server())
}
print("请选择要运行的示例:")
for key, (desc, _) in examples.items():
print(f"{key}. {desc}")
choice = input("请输入选择 (1-5): ").strip()
if choice in examples:
desc, func = examples[choice]
print(f"\n运行: {desc}")
try:
await func()
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"运行出错: {e}")
else:
print("无效的选择")
if __name__ == "__main__":
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 运行主程序
asyncio.run(main())