Spaces:
Build error
Build error
| """ | |
| 完整系统集成示例 | |
| 展示如何使用自动修复和重部署循环机制的所有组件 | |
| """ | |
| import asyncio | |
| import logging | |
| import json | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Any | |
| # 导入所有组件 | |
| from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction | |
| from auto_repair_executor import AutoRepairExecutor | |
| from repair_loop_engine import RepairLoopEngine | |
| from rollback_manager import RollbackManager | |
| from safety_validator import SafetyValidator | |
| from integration_orchestrator import RepairOrchestrator | |
| from huggingface_client import HuggingFaceAPIClient | |
| class CompleteRepairSystem: | |
| """完整的自动修复系统""" | |
| def __init__(self, hf_token: str, repo_path: str = "."): | |
| self.logger = logging.getLogger(__name__) | |
| # 初始化 API 客户端 | |
| self.hf_client = HuggingFaceAPIClient(token=hf_token) | |
| # 初始化核心组件 | |
| self.repair_executor = AutoRepairExecutor(self.hf_client, repo_path) | |
| self.loop_engine = RepairLoopEngine() | |
| self.rollback_manager = RollbackManager() | |
| self.safety_validator = SafetyValidator() | |
| # 初始化编排器 | |
| self.orchestrator = RepairOrchestrator(self.hf_client, repo_path) | |
| # 配置组件依赖关系 | |
| self.orchestrator.set_components( | |
| self.repair_executor, | |
| self.loop_engine, | |
| self.rollback_manager | |
| ) | |
| # 系统统计 | |
| self.system_stats = { | |
| 'start_time': None, | |
| 'total_errors_detected': 0, | |
| 'total_repairs_attempted': 0, | |
| 'total_repairs_successful': 0, | |
| 'total_rollbacks_triggered': 0 | |
| } | |
| async def start_system(self): | |
| """启动系统""" | |
| self.logger.info("启动自动修复系统") | |
| self.system_stats['start_time'] = datetime.now() | |
| # 启动编排器 | |
| await self.orchestrator.start_monitoring() | |
| # 启动循环引擎 | |
| await self.loop_engine.start() | |
| # 设置错误处理回调 | |
| self.loop_engine.set_error_handler(self._on_error_detected) | |
| self.logger.info("系统启动完成") | |
| async def stop_system(self): | |
| """停止系统""" | |
| self.logger.info("停止自动修复系统") | |
| # 停止循环引擎 | |
| await self.loop_engine.stop() | |
| # 停止编排器 | |
| await self.orchestrator.stop_monitoring() | |
| self.logger.info("系统停止完成") | |
| async def _on_error_detected(self, space_info: SpaceInfo, error_info: ErrorInfo): | |
| """错误检测回调""" | |
| self.system_stats['total_errors_detected'] += 1 | |
| self.logger.warning(f"检测到错误: {space_info.space_id} - {error_info.message}") | |
| # 分析错误并生成修复策略 | |
| strategy = await self._generate_repair_strategy(error_info) | |
| if strategy: | |
| # 触发修复 | |
| workflow_id = await self.orchestrator.trigger_repair( | |
| space_info, error_info, strategy | |
| ) | |
| if workflow_id: | |
| self.system_stats['total_repairs_attempted'] += 1 | |
| self.logger.info(f"修复工作流已启动: {workflow_id}") | |
| else: | |
| self.logger.error("启动修复工作流失败") | |
| else: | |
| self.logger.warning("无法生成修复策略") | |
| async def _generate_repair_strategy(self, error_info: ErrorInfo) -> RepairStrategy: | |
| """生成修复策略""" | |
| error_type = error_info.error_type | |
| if error_type == ErrorType.DEPENDENCY_INSTALL: | |
| return RepairStrategy( | |
| action=RepairAction.UPDATE_DEPENDENCIES, | |
| description="修复依赖安装失败", | |
| modifications={ | |
| "type": "dependency_update", | |
| "strategy": "source_change" | |
| }, | |
| risk_level="medium", | |
| success_rate=0.7, | |
| estimated_time=300 | |
| ) | |
| elif error_type == ErrorType.DOCKER_BUILD_ERROR: | |
| return RepairStrategy( | |
| action=RepairAction.MODIFY_DOCKERFILE, | |
| description="修复 Docker 构建错误", | |
| modifications={ | |
| "type": "syntax_fix", | |
| "fix_type": "dockerfile_from", | |
| "new_line": "FROM python:3.9-slim" | |
| }, | |
| risk_level="high", | |
| success_rate=0.8, | |
| estimated_time=600 | |
| ) | |
| elif error_type == ErrorType.PORT_BINDING_ERROR: | |
| return RepairStrategy( | |
| action=RepairAction.CHANGE_PORT, | |
| description="修复端口绑定错误", | |
| modifications={ | |
| "type": "port_change", | |
| "old_port": "7860", | |
| "new_port": "7861" | |
| }, | |
| risk_level="low", | |
| success_rate=0.9, | |
| estimated_time=120 | |
| ) | |
| elif error_type == ErrorType.PERMISSION_ERROR: | |
| return RepairStrategy( | |
| action=RepairAction.SET_PERMISSIONS, | |
| description="修复权限错误", | |
| modifications={ | |
| "type": "environment_fix", | |
| "environment_variables": { | |
| "CHMOD_CMD": "chmod +x /app/*" | |
| } | |
| }, | |
| risk_level="medium", | |
| success_rate=0.6, | |
| estimated_time=180 | |
| ) | |
| else: | |
| # 默认策略 | |
| return RepairStrategy( | |
| action=RepairAction.MODIFY_DOCKERFILE, | |
| description="通用修复策略", | |
| modifications={ | |
| "type": "syntax_fix", | |
| "fix_type": "general" | |
| }, | |
| risk_level="medium", | |
| success_rate=0.5, | |
| estimated_time=300 | |
| ) | |
| async def add_space_to_monitor(self, space_id: str) -> bool: | |
| """添加 Space 到监控列表""" | |
| try: | |
| # 获取 Space 信息 | |
| space_info = await self.hf_client.get_space_info(space_id) | |
| if space_info: | |
| await self.loop_engine.add_space(space_info) | |
| self.logger.info(f"已添加到监控: {space_id}") | |
| return True | |
| else: | |
| self.logger.error(f"无法获取 Space 信息: {space_id}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"添加监控失败 {space_id}: {e}") | |
| return False | |
| async def remove_space_from_monitor(self, space_id: str) -> bool: | |
| """从监控列表移除 Space""" | |
| try: | |
| await self.loop_engine.remove_space(space_id) | |
| self.logger.info(f"已从监控移除: {space_id}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"移除监控失败 {space_id}: {e}") | |
| return False | |
| def get_system_status(self) -> Dict[str, Any]: | |
| """获取系统状态""" | |
| uptime = None | |
| if self.system_stats['start_time']: | |
| uptime = (datetime.now() - self.system_stats['start_time']).total_seconds() | |
| return { | |
| 'uptime_seconds': uptime, | |
| 'is_running': self.orchestrator.is_running, | |
| 'system_stats': self.system_stats.copy(), | |
| 'orchestrator_stats': self.orchestrator.get_orchestrator_stats(), | |
| 'loop_engine_stats': self.loop_engine.get_stats(), | |
| 'active_workflows': self.orchestrator.get_active_workflows(), | |
| 'monitored_spaces': len(self.loop_engine.monitored_spaces) | |
| } | |
| async def get_repair_history(self, space_id: Optional[str] = None) -> Dict[str, Any]: | |
| """获取修复历史""" | |
| # 从编排器获取事件历史 | |
| events = self.orchestrator.get_events(space_id=space_id) | |
| # 从修复执行器获取修复统计 | |
| repair_stats = self.repair_executor.get_repair_stats() | |
| # 从回滚管理器获取回滚历史 | |
| rollback_history = self.rollback_manager.get_rollback_history() | |
| return { | |
| 'events': events, | |
| 'repair_stats': repair_stats, | |
| 'rollback_history': rollback_history, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| async def generate_comprehensive_report(self) -> Dict[str, Any]: | |
| """生成综合报告""" | |
| system_status = self.get_system_status() | |
| repair_history = await self.get_repair_history() | |
| # 计算成功率 | |
| total_repairs = self.system_stats['total_repairs_attempted'] | |
| success_rate = 0.0 | |
| if total_repairs > 0: | |
| success_rate = self.system_stats['total_repairs_successful'] / total_repairs | |
| return { | |
| 'report_time': datetime.now().isoformat(), | |
| 'system_status': system_status, | |
| 'repair_history': repair_history, | |
| 'metrics': { | |
| 'error_detection_rate': self.system_stats['total_errors_detected'], | |
| 'repair_success_rate': success_rate, | |
| 'rollback_rate': self.system_stats['total_rollbacks_triggered'] / max(1, total_repairs), | |
| 'average_repair_time': None # 需要从历史数据计算 | |
| }, | |
| 'recommendations': self._generate_recommendations() | |
| } | |
| def _generate_recommendations(self) -> List[str]: | |
| """生成系统优化建议""" | |
| recommendations = [] | |
| # 基于统计数据的建议 | |
| if self.system_stats['total_rollbacks_triggered'] > self.system_stats['total_repairs_successful']: | |
| recommendations.append("回滚率较高,建议改进修复策略的质量") | |
| if self.system_stats['total_errors_detected'] == 0: | |
| recommendations.append("考虑扩大监控范围以检测更多潜在问题") | |
| active_workflows = self.orchestrator.get_active_workflows() | |
| if len(active_workflows) > 5: | |
| recommendations.append("活跃工作流较多,考虑优化并发控制") | |
| return recommendations | |
| async def main(): | |
| """主函数示例""" | |
| # 配置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| # 获取 HuggingFace Token(实际使用时应该从环境变量或配置文件读取) | |
| hf_token = "your_huggingface_token_here" | |
| if hf_token == "your_huggingface_token_here": | |
| print("请设置有效的 HuggingFace Token") | |
| return | |
| # 创建并启动系统 | |
| system = CompleteRepairSystem(hf_token) | |
| await system.start_system() | |
| try: | |
| # 添加要监控的 Spaces | |
| test_spaces = [ | |
| "username/test-space-1", | |
| "username/test-space-2" | |
| ] | |
| for space_id in test_spaces: | |
| await system.add_space_to_monitor(space_id) | |
| # 模拟运行一段时间 | |
| print("系统运行中,按 Ctrl+C 停止...") | |
| await asyncio.sleep(300) # 运行 5 分钟 | |
| # 生成报告 | |
| report = await system.generate_comprehensive_report() | |
| print("\n=== 系统报告 ===") | |
| print(json.dumps(report, indent=2, ensure_ascii=False)) | |
| except KeyboardInterrupt: | |
| print("\n收到停止信号") | |
| finally: | |
| # 停止系统 | |
| await system.stop_system() | |
| print("系统已停止") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |