Spaces:
Build error
Build error
| """ | |
| 使用示例和最佳实践 | |
| 展示系统的基本使用流程和高级功能 | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| from typing import List, Dict, Any | |
| from core_system import AutoRepairSystem, SpaceStatus, ErrorType | |
| from huggingface_client import HuggingFaceAPIClient | |
| from error_analyzer import IntelligentErrorAnalyzer | |
| # ============================================================================ | |
| # 基本使用示例 | |
| # ============================================================================ | |
| async def basic_usage_example(): | |
| """基本使用示例""" | |
| # 1. 初始化系统 | |
| system = AutoRepairSystem("config.json") | |
| # 2. 配置要监控的 Spaces | |
| space_ids = [ | |
| "your-username/space-1", | |
| "your-username/space-2", | |
| "your-username/space-3" | |
| ] | |
| print(f"开始监控 {len(space_ids)} 个 Space...") | |
| try: | |
| # 3. 启动系统 | |
| await system.start(space_ids) | |
| except KeyboardInterrupt: | |
| print("\n停止监控...") | |
| system.monitor.stop() | |
| # ============================================================================ | |
| # 高级使用示例 | |
| # ============================================================================ | |
| class AdvancedUsageExample: | |
| """高级使用示例类""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| async def custom_monitoring_workflow(self, space_ids: List[str]) -> None: | |
| """自定义监控工作流""" | |
| # 初始化各个组件 | |
| hf_client = HuggingFaceAPIClient("your_token_here") | |
| error_analyzer = IntelligentErrorAnalyzer() | |
| for space_id in space_ids: | |
| # 1. 检查状态 | |
| status = await hf_client.get_space_status(space_id) | |
| print(f"Space {space_id}: {status.value}") | |
| # 2. 如果有错误,分析日志 | |
| if status == SpaceStatus.ERROR: | |
| logs = await hf_client.get_space_logs(space_id, lines=100) | |
| errors = await error_analyzer.analyze_logs(logs) | |
| # 3. 分类并处理错误 | |
| for error in errors: | |
| if error.confidence > 0.8: | |
| await self._handle_high_confidence_error(space_id, error) | |
| else: | |
| await self._handle_low_confidence_error(space_id, error) | |
| async def _handle_high_confidence_error(self, space_id: str, error) -> None: | |
| """处理高置信度错误""" | |
| print(f"高置信度错误 {space_id}: {error.error_type.value}") | |
| if error.error_type == ErrorType.DEPENDENCY_INSTALL: | |
| await self._fix_dependency_error(space_id, error) | |
| elif error.error_type == ErrorType.DOCKERFILE_SYNTAX: | |
| await self._fix_dockerfile_error(space_id, error) | |
| # ... 其他错误类型处理 | |
| async def _fix_dependency_error(self, space_id: str, error) -> None: | |
| """修复依赖错误""" | |
| print(f"修复 {space_id} 的依赖错误...") | |
| # 实现具体的修复逻辑 | |
| # 1. 分析依赖类型(Python/Node.js) | |
| # 2. 尝试更换源地址 | |
| # 3. 调整版本号 | |
| # 4. 重新安装依赖 | |
| async def _fix_dockerfile_error(self, space_id: str, error) -> None: | |
| """修复 Dockerfile 错误""" | |
| print(f"修复 {space_id} 的 Dockerfile 错误...") | |
| # 实现具体的修复逻辑 | |
| # 1. 定位错误行 | |
| # 2. 语法修正 | |
| # 3. 优化命令结构 | |
| # ============================================================================ | |
| # 批量处理示例 | |
| # ============================================================================ | |
| class BatchProcessingExample: | |
| """批量处理示例""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| async def batch_monitor_spaces(self, space_configs: List[Dict[str, Any]]) -> None: | |
| """批量监控 Spaces""" | |
| tasks = [] | |
| for config in space_configs: | |
| task = self._monitor_single_space(config) | |
| tasks.append(task) | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| async def _monitor_single_space(self, config: Dict[str, Any]) -> None: | |
| """监控单个 Space""" | |
| space_id = config['space_id'] | |
| monitoring_interval = config.get('interval', 60) | |
| max_retries = config.get('max_retries', 3) | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| # 监控逻辑 | |
| status = await self._check_space_status(space_id) | |
| if status != SpaceStatus.ERROR: | |
| break | |
| retry_count += 1 | |
| if retry_count < max_retries: | |
| await asyncio.sleep(monitoring_interval) | |
| except Exception as e: | |
| self.logger.error(f"监控 {space_id} 失败: {e}") | |
| break | |
| async def _check_space_status(self, space_id: str) -> SpaceStatus: | |
| """检查 Space 状态""" | |
| # 实现状态检查逻辑 | |
| pass | |
| # ============================================================================ | |
| # 自定义错误分析示例 | |
| # ============================================================================ | |
| class CustomErrorAnalyzer: | |
| """自定义错误分析器""" | |
| def __init__(self): | |
| self.custom_patterns = self._load_custom_patterns() | |
| async def analyze_with_custom_rules(self, logs: str) -> List[Dict]: | |
| """使用自定义规则分析""" | |
| results = [] | |
| # 1. 应用自定义模式 | |
| for pattern in self.custom_patterns: | |
| matches = pattern['regex'].findall(logs) | |
| if matches: | |
| results.append({ | |
| 'type': pattern['type'], | |
| 'matches': matches, | |
| 'severity': pattern['severity'], | |
| 'suggested_fix': pattern['fix'] | |
| }) | |
| # 2. 应用机器学习模型(如果可用) | |
| ml_results = await self._ml_analysis(logs) | |
| results.extend(ml_results) | |
| # 3. 综合评分 | |
| scored_results = self._score_results(results) | |
| return scored_results | |
| def _load_custom_patterns(self) -> List[Dict]: | |
| """加载自定义错误模式""" | |
| return [ | |
| { | |
| 'name': 'Custom GPU Error', | |
| 'regex': re.compile(r'GPU.*out of memory|CUDA.*error'), | |
| 'type': 'gpu_error', | |
| 'severity': 'high', | |
| 'fix': '减少批处理大小或使用更小的模型' | |
| }, | |
| { | |
| 'name': 'Custom Timeout Pattern', | |
| 'regex': re.compile(r'operation.*timeout.*after.*(\d+)ms'), | |
| 'type': 'custom_timeout', | |
| 'severity': 'medium', | |
| 'fix': '增加超时设置或优化性能' | |
| } | |
| ] | |
| async def _ml_analysis(self, logs: str) -> List[Dict]: | |
| """机器学习分析""" | |
| # 这里可以集成预训练的错误分类模型 | |
| return [] | |
| def _score_results(self, results: List[Dict]) -> List[Dict]: | |
| """对结果进行评分""" | |
| for result in results: | |
| if result['severity'] == 'high': | |
| result['score'] = 0.9 | |
| elif result['severity'] == 'medium': | |
| result['score'] = 0.7 | |
| else: | |
| result['score'] = 0.5 | |
| return sorted(results, key=lambda x: x['score'], reverse=True) | |
| # ============================================================================ | |
| # Webhook 集成示例 | |
| # ============================================================================ | |
| class WebhookIntegrationExample: | |
| """Webhook 集成示例""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| async def setup_webhook_server(self) -> None: | |
| """设置 Webhook 服务器""" | |
| from fastapi import FastAPI, Request | |
| import uvicorn | |
| app = FastAPI() | |
| async def handle_hf_webhook(request: Request): | |
| payload = await request.json() | |
| # 处理不同的事件类型 | |
| event_type = payload.get('event') | |
| if event_type == 'space.status_updated': | |
| await self._handle_status_update(payload) | |
| elif event_type == 'space.build_error': | |
| await self._handle_build_error(payload) | |
| elif event_type == 'space.started': | |
| await self._handle_space_started(payload) | |
| return {"status": "ok"} | |
| # 启动服务器 | |
| config = uvicorn.Config(app, host="0.0.0.0", port=8000) | |
| server = uvicorn.Server(config) | |
| await server.serve() | |
| async def _handle_status_update(self, payload: Dict) -> None: | |
| """处理状态更新事件""" | |
| space_id = payload.get('space', {}).get('id') | |
| new_status = payload.get('space', {}).get('runtime', {}).get('stage') | |
| self.logger.info(f"Space {space_id} 状态更新: {new_status}") | |
| # 触发相应处理逻辑 | |
| if new_status == 'ERROR': | |
| await self._trigger_repair_workflow(space_id) | |
| async def _trigger_repair_workflow(self, space_id: str) -> None: | |
| """触发修复工作流""" | |
| # 实现修复工作流 | |
| pass | |
| # ============================================================================ | |
| # 测试和调试示例 | |
| # ============================================================================ | |
| class TestingExample: | |
| """测试和调试示例""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| async def test_error_analysis(self) -> None: | |
| """测试错误分析功能""" | |
| # 模拟日志数据 | |
| sample_logs = """ | |
| ERROR: Could not find a version that satisfies the requirement torch==2.0.0 | |
| ERROR: No matching distribution found for torch==2.0.0 | |
| Build failed | |
| """ | |
| analyzer = IntelligentErrorAnalyzer() | |
| errors = await analyzer.analyze_logs(sample_logs) | |
| print(f"检测到 {len(errors)} 个错误:") | |
| for error in errors: | |
| print(f"- {error.error_type.value}: {error.message}") | |
| print(f" 置信度: {error.confidence}") | |
| async def test_repair_strategies(self) -> None: | |
| """测试修复策略""" | |
| # 测试不同错误类型的修复策略 | |
| from core_system import SmartRepairEngine, ErrorInfo, SpaceInfo | |
| repair_engine = SmartRepairEngine() | |
| test_errors = [ | |
| ErrorInfo( | |
| error_type=ErrorType.DEPENDENCY_INSTALL, | |
| message="pip install failed", | |
| log_snippet="ERROR: Could not find torch", | |
| confidence=0.9 | |
| ), | |
| ErrorInfo( | |
| error_type=ErrorType.DOCKERFILE_SYNTAX, | |
| message="Dockerfile syntax error", | |
| log_snippet="failed to solve: syntax error", | |
| confidence=0.85 | |
| ) | |
| ] | |
| space_info = SpaceInfo( | |
| space_id="test/space", | |
| name="Test Space", | |
| repository_url="", | |
| current_status=SpaceStatus.ERROR, | |
| last_updated=datetime.now() | |
| ) | |
| for error in test_errors: | |
| strategy = await repair_engine.generate_strategy(error, space_info) | |
| if strategy: | |
| print(f"修复策略: {strategy.action.value}") | |
| print(f"描述: {strategy.description}") | |
| print(f"成功率: {strategy.success_rate}") | |
| print(f"风险等级: {strategy.risk_level}") | |
| print() | |
| # ============================================================================ | |
| # 性能监控示例 | |
| # ============================================================================ | |
| class PerformanceMonitoringExample: | |
| """性能监控示例""" | |
| def __init__(self): | |
| self.metrics = {} | |
| async def monitor_system_performance(self) -> None: | |
| """监控系统性能""" | |
| while True: | |
| # 收集性能指标 | |
| current_metrics = await self._collect_metrics() | |
| # 存储和比较指标 | |
| self._store_metrics(current_metrics) | |
| # 检查异常 | |
| anomalies = self._detect_anomalies(current_metrics) | |
| if anomalies: | |
| await self._handle_anomalies(anomalies) | |
| await asyncio.sleep(60) # 每分钟检查一次 | |
| async def _collect_metrics(self) -> Dict[str, Any]: | |
| """收集性能指标""" | |
| return { | |
| 'timestamp': datetime.now(), | |
| 'cpu_usage': self._get_cpu_usage(), | |
| 'memory_usage': self._get_memory_usage(), | |
| 'active_repairs': self._get_active_repairs(), | |
| 'queue_size': self._get_queue_size(), | |
| 'error_rate': self._get_error_rate() | |
| } | |
| def _store_metrics(self, metrics: Dict[str, Any]) -> None: | |
| """存储指标""" | |
| # 存储到数据库或时间序列数据库 | |
| pass | |
| def _detect_anomalies(self, metrics: Dict[str, Any]) -> List[str]: | |
| """检测异常""" | |
| anomalies = [] | |
| if metrics['cpu_usage'] > 80: | |
| anomalies.append(f"CPU 使用率过高: {metrics['cpu_usage']}%") | |
| if metrics['memory_usage'] > 90: | |
| anomalies.append(f"内存使用率过高: {metrics['memory_usage']}%") | |
| if metrics['error_rate'] > 0.1: | |
| anomalies.append(f"错误率过高: {metrics['error_rate']}") | |
| return anomalies | |
| async def _handle_anomalies(self, anomalies: List[str]) -> None: | |
| """处理异常""" | |
| for anomaly in anomalies: | |
| self.logger.warning(f"性能异常: {anomaly}") | |
| # 发送告警或自动调整 | |
| # ============================================================================ | |
| # 主程序示例 | |
| # ============================================================================ | |
| async def main(): | |
| """主程序示例""" | |
| print("HuggingFace Spaces 自动修复系统示例") | |
| print("=" * 50) | |
| # 选择运行的示例 | |
| examples = { | |
| "1": ("基本使用", basic_usage_example), | |
| "2": ("高级使用", lambda: AdvancedUsageExample().custom_monitoring_workflow( | |
| ["user/space1", "user/space2"] | |
| )), | |
| "3": ("测试错误分析", lambda: TestingExample().test_error_analysis()), | |
| "4": ("性能监控", lambda: PerformanceMonitoringExample().monitor_system_performance()), | |
| "5": ("Webhook 服务器", lambda: WebhookIntegrationExample().setup_webhook_server()) | |
| } | |
| print("请选择要运行的示例:") | |
| for key, (desc, _) in examples.items(): | |
| print(f"{key}. {desc}") | |
| choice = input("请输入选择 (1-5): ").strip() | |
| if choice in examples: | |
| desc, func = examples[choice] | |
| print(f"\n运行: {desc}") | |
| try: | |
| await func() | |
| except KeyboardInterrupt: | |
| print("\n程序被用户中断") | |
| except Exception as e: | |
| print(f"运行出错: {e}") | |
| else: | |
| print("无效的选择") | |
| if __name__ == "__main__": | |
| # 设置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| # 运行主程序 | |
| asyncio.run(main()) |