""" 使用示例和最佳实践 展示系统的基本使用流程和高级功能 """ import asyncio import logging from datetime import datetime from typing import List, Dict, Any from core_system import AutoRepairSystem, SpaceStatus, ErrorType from huggingface_client import HuggingFaceAPIClient from error_analyzer import IntelligentErrorAnalyzer # ============================================================================ # 基本使用示例 # ============================================================================ async def basic_usage_example(): """基本使用示例""" # 1. 初始化系统 system = AutoRepairSystem("config.json") # 2. 配置要监控的 Spaces space_ids = [ "your-username/space-1", "your-username/space-2", "your-username/space-3" ] print(f"开始监控 {len(space_ids)} 个 Space...") try: # 3. 启动系统 await system.start(space_ids) except KeyboardInterrupt: print("\n停止监控...") system.monitor.stop() # ============================================================================ # 高级使用示例 # ============================================================================ class AdvancedUsageExample: """高级使用示例类""" def __init__(self): self.logger = logging.getLogger(__name__) async def custom_monitoring_workflow(self, space_ids: List[str]) -> None: """自定义监控工作流""" # 初始化各个组件 hf_client = HuggingFaceAPIClient("your_token_here") error_analyzer = IntelligentErrorAnalyzer() for space_id in space_ids: # 1. 检查状态 status = await hf_client.get_space_status(space_id) print(f"Space {space_id}: {status.value}") # 2. 如果有错误,分析日志 if status == SpaceStatus.ERROR: logs = await hf_client.get_space_logs(space_id, lines=100) errors = await error_analyzer.analyze_logs(logs) # 3. 分类并处理错误 for error in errors: if error.confidence > 0.8: await self._handle_high_confidence_error(space_id, error) else: await self._handle_low_confidence_error(space_id, error) async def _handle_high_confidence_error(self, space_id: str, error) -> None: """处理高置信度错误""" print(f"高置信度错误 {space_id}: {error.error_type.value}") if error.error_type == ErrorType.DEPENDENCY_INSTALL: await self._fix_dependency_error(space_id, error) elif error.error_type == ErrorType.DOCKERFILE_SYNTAX: await self._fix_dockerfile_error(space_id, error) # ... 其他错误类型处理 async def _fix_dependency_error(self, space_id: str, error) -> None: """修复依赖错误""" print(f"修复 {space_id} 的依赖错误...") # 实现具体的修复逻辑 # 1. 分析依赖类型(Python/Node.js) # 2. 尝试更换源地址 # 3. 调整版本号 # 4. 重新安装依赖 async def _fix_dockerfile_error(self, space_id: str, error) -> None: """修复 Dockerfile 错误""" print(f"修复 {space_id} 的 Dockerfile 错误...") # 实现具体的修复逻辑 # 1. 定位错误行 # 2. 语法修正 # 3. 优化命令结构 # ============================================================================ # 批量处理示例 # ============================================================================ class BatchProcessingExample: """批量处理示例""" def __init__(self): self.logger = logging.getLogger(__name__) async def batch_monitor_spaces(self, space_configs: List[Dict[str, Any]]) -> None: """批量监控 Spaces""" tasks = [] for config in space_configs: task = self._monitor_single_space(config) tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) async def _monitor_single_space(self, config: Dict[str, Any]) -> None: """监控单个 Space""" space_id = config['space_id'] monitoring_interval = config.get('interval', 60) max_retries = config.get('max_retries', 3) retry_count = 0 while retry_count < max_retries: try: # 监控逻辑 status = await self._check_space_status(space_id) if status != SpaceStatus.ERROR: break retry_count += 1 if retry_count < max_retries: await asyncio.sleep(monitoring_interval) except Exception as e: self.logger.error(f"监控 {space_id} 失败: {e}") break async def _check_space_status(self, space_id: str) -> SpaceStatus: """检查 Space 状态""" # 实现状态检查逻辑 pass # ============================================================================ # 自定义错误分析示例 # ============================================================================ class CustomErrorAnalyzer: """自定义错误分析器""" def __init__(self): self.custom_patterns = self._load_custom_patterns() async def analyze_with_custom_rules(self, logs: str) -> List[Dict]: """使用自定义规则分析""" results = [] # 1. 应用自定义模式 for pattern in self.custom_patterns: matches = pattern['regex'].findall(logs) if matches: results.append({ 'type': pattern['type'], 'matches': matches, 'severity': pattern['severity'], 'suggested_fix': pattern['fix'] }) # 2. 应用机器学习模型(如果可用) ml_results = await self._ml_analysis(logs) results.extend(ml_results) # 3. 综合评分 scored_results = self._score_results(results) return scored_results def _load_custom_patterns(self) -> List[Dict]: """加载自定义错误模式""" return [ { 'name': 'Custom GPU Error', 'regex': re.compile(r'GPU.*out of memory|CUDA.*error'), 'type': 'gpu_error', 'severity': 'high', 'fix': '减少批处理大小或使用更小的模型' }, { 'name': 'Custom Timeout Pattern', 'regex': re.compile(r'operation.*timeout.*after.*(\d+)ms'), 'type': 'custom_timeout', 'severity': 'medium', 'fix': '增加超时设置或优化性能' } ] async def _ml_analysis(self, logs: str) -> List[Dict]: """机器学习分析""" # 这里可以集成预训练的错误分类模型 return [] def _score_results(self, results: List[Dict]) -> List[Dict]: """对结果进行评分""" for result in results: if result['severity'] == 'high': result['score'] = 0.9 elif result['severity'] == 'medium': result['score'] = 0.7 else: result['score'] = 0.5 return sorted(results, key=lambda x: x['score'], reverse=True) # ============================================================================ # Webhook 集成示例 # ============================================================================ class WebhookIntegrationExample: """Webhook 集成示例""" def __init__(self): self.logger = logging.getLogger(__name__) async def setup_webhook_server(self) -> None: """设置 Webhook 服务器""" from fastapi import FastAPI, Request import uvicorn app = FastAPI() @app.post("/webhook/huggingface") async def handle_hf_webhook(request: Request): payload = await request.json() # 处理不同的事件类型 event_type = payload.get('event') if event_type == 'space.status_updated': await self._handle_status_update(payload) elif event_type == 'space.build_error': await self._handle_build_error(payload) elif event_type == 'space.started': await self._handle_space_started(payload) return {"status": "ok"} # 启动服务器 config = uvicorn.Config(app, host="0.0.0.0", port=8000) server = uvicorn.Server(config) await server.serve() async def _handle_status_update(self, payload: Dict) -> None: """处理状态更新事件""" space_id = payload.get('space', {}).get('id') new_status = payload.get('space', {}).get('runtime', {}).get('stage') self.logger.info(f"Space {space_id} 状态更新: {new_status}") # 触发相应处理逻辑 if new_status == 'ERROR': await self._trigger_repair_workflow(space_id) async def _trigger_repair_workflow(self, space_id: str) -> None: """触发修复工作流""" # 实现修复工作流 pass # ============================================================================ # 测试和调试示例 # ============================================================================ class TestingExample: """测试和调试示例""" def __init__(self): self.logger = logging.getLogger(__name__) async def test_error_analysis(self) -> None: """测试错误分析功能""" # 模拟日志数据 sample_logs = """ ERROR: Could not find a version that satisfies the requirement torch==2.0.0 ERROR: No matching distribution found for torch==2.0.0 Build failed """ analyzer = IntelligentErrorAnalyzer() errors = await analyzer.analyze_logs(sample_logs) print(f"检测到 {len(errors)} 个错误:") for error in errors: print(f"- {error.error_type.value}: {error.message}") print(f" 置信度: {error.confidence}") async def test_repair_strategies(self) -> None: """测试修复策略""" # 测试不同错误类型的修复策略 from core_system import SmartRepairEngine, ErrorInfo, SpaceInfo repair_engine = SmartRepairEngine() test_errors = [ ErrorInfo( error_type=ErrorType.DEPENDENCY_INSTALL, message="pip install failed", log_snippet="ERROR: Could not find torch", confidence=0.9 ), ErrorInfo( error_type=ErrorType.DOCKERFILE_SYNTAX, message="Dockerfile syntax error", log_snippet="failed to solve: syntax error", confidence=0.85 ) ] space_info = SpaceInfo( space_id="test/space", name="Test Space", repository_url="", current_status=SpaceStatus.ERROR, last_updated=datetime.now() ) for error in test_errors: strategy = await repair_engine.generate_strategy(error, space_info) if strategy: print(f"修复策略: {strategy.action.value}") print(f"描述: {strategy.description}") print(f"成功率: {strategy.success_rate}") print(f"风险等级: {strategy.risk_level}") print() # ============================================================================ # 性能监控示例 # ============================================================================ class PerformanceMonitoringExample: """性能监控示例""" def __init__(self): self.metrics = {} async def monitor_system_performance(self) -> None: """监控系统性能""" while True: # 收集性能指标 current_metrics = await self._collect_metrics() # 存储和比较指标 self._store_metrics(current_metrics) # 检查异常 anomalies = self._detect_anomalies(current_metrics) if anomalies: await self._handle_anomalies(anomalies) await asyncio.sleep(60) # 每分钟检查一次 async def _collect_metrics(self) -> Dict[str, Any]: """收集性能指标""" return { 'timestamp': datetime.now(), 'cpu_usage': self._get_cpu_usage(), 'memory_usage': self._get_memory_usage(), 'active_repairs': self._get_active_repairs(), 'queue_size': self._get_queue_size(), 'error_rate': self._get_error_rate() } def _store_metrics(self, metrics: Dict[str, Any]) -> None: """存储指标""" # 存储到数据库或时间序列数据库 pass def _detect_anomalies(self, metrics: Dict[str, Any]) -> List[str]: """检测异常""" anomalies = [] if metrics['cpu_usage'] > 80: anomalies.append(f"CPU 使用率过高: {metrics['cpu_usage']}%") if metrics['memory_usage'] > 90: anomalies.append(f"内存使用率过高: {metrics['memory_usage']}%") if metrics['error_rate'] > 0.1: anomalies.append(f"错误率过高: {metrics['error_rate']}") return anomalies async def _handle_anomalies(self, anomalies: List[str]) -> None: """处理异常""" for anomaly in anomalies: self.logger.warning(f"性能异常: {anomaly}") # 发送告警或自动调整 # ============================================================================ # 主程序示例 # ============================================================================ async def main(): """主程序示例""" print("HuggingFace Spaces 自动修复系统示例") print("=" * 50) # 选择运行的示例 examples = { "1": ("基本使用", basic_usage_example), "2": ("高级使用", lambda: AdvancedUsageExample().custom_monitoring_workflow( ["user/space1", "user/space2"] )), "3": ("测试错误分析", lambda: TestingExample().test_error_analysis()), "4": ("性能监控", lambda: PerformanceMonitoringExample().monitor_system_performance()), "5": ("Webhook 服务器", lambda: WebhookIntegrationExample().setup_webhook_server()) } print("请选择要运行的示例:") for key, (desc, _) in examples.items(): print(f"{key}. {desc}") choice = input("请输入选择 (1-5): ").strip() if choice in examples: desc, func = examples[choice] print(f"\n运行: {desc}") try: await func() except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"运行出错: {e}") else: print("无效的选择") if __name__ == "__main__": # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 运行主程序 asyncio.run(main())