""" 完整系统集成示例 展示如何使用自动修复和重部署循环机制的所有组件 """ import asyncio import logging import json from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any # 导入所有组件 from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction from auto_repair_executor import AutoRepairExecutor from repair_loop_engine import RepairLoopEngine from rollback_manager import RollbackManager from safety_validator import SafetyValidator from integration_orchestrator import RepairOrchestrator from huggingface_client import HuggingFaceAPIClient class CompleteRepairSystem: """完整的自动修复系统""" def __init__(self, hf_token: str, repo_path: str = "."): self.logger = logging.getLogger(__name__) # 初始化 API 客户端 self.hf_client = HuggingFaceAPIClient(token=hf_token) # 初始化核心组件 self.repair_executor = AutoRepairExecutor(self.hf_client, repo_path) self.loop_engine = RepairLoopEngine() self.rollback_manager = RollbackManager() self.safety_validator = SafetyValidator() # 初始化编排器 self.orchestrator = RepairOrchestrator(self.hf_client, repo_path) # 配置组件依赖关系 self.orchestrator.set_components( self.repair_executor, self.loop_engine, self.rollback_manager ) # 系统统计 self.system_stats = { 'start_time': None, 'total_errors_detected': 0, 'total_repairs_attempted': 0, 'total_repairs_successful': 0, 'total_rollbacks_triggered': 0 } async def start_system(self): """启动系统""" self.logger.info("启动自动修复系统") self.system_stats['start_time'] = datetime.now() # 启动编排器 await self.orchestrator.start_monitoring() # 启动循环引擎 await self.loop_engine.start() # 设置错误处理回调 self.loop_engine.set_error_handler(self._on_error_detected) self.logger.info("系统启动完成") async def stop_system(self): """停止系统""" self.logger.info("停止自动修复系统") # 停止循环引擎 await self.loop_engine.stop() # 停止编排器 await self.orchestrator.stop_monitoring() self.logger.info("系统停止完成") async def _on_error_detected(self, space_info: SpaceInfo, error_info: ErrorInfo): """错误检测回调""" self.system_stats['total_errors_detected'] += 1 self.logger.warning(f"检测到错误: {space_info.space_id} - {error_info.message}") # 分析错误并生成修复策略 strategy = await self._generate_repair_strategy(error_info) if strategy: # 触发修复 workflow_id = await self.orchestrator.trigger_repair( space_info, error_info, strategy ) if workflow_id: self.system_stats['total_repairs_attempted'] += 1 self.logger.info(f"修复工作流已启动: {workflow_id}") else: self.logger.error("启动修复工作流失败") else: self.logger.warning("无法生成修复策略") async def _generate_repair_strategy(self, error_info: ErrorInfo) -> RepairStrategy: """生成修复策略""" error_type = error_info.error_type if error_type == ErrorType.DEPENDENCY_INSTALL: return RepairStrategy( action=RepairAction.UPDATE_DEPENDENCIES, description="修复依赖安装失败", modifications={ "type": "dependency_update", "strategy": "source_change" }, risk_level="medium", success_rate=0.7, estimated_time=300 ) elif error_type == ErrorType.DOCKER_BUILD_ERROR: return RepairStrategy( action=RepairAction.MODIFY_DOCKERFILE, description="修复 Docker 构建错误", modifications={ "type": "syntax_fix", "fix_type": "dockerfile_from", "new_line": "FROM python:3.9-slim" }, risk_level="high", success_rate=0.8, estimated_time=600 ) elif error_type == ErrorType.PORT_BINDING_ERROR: return RepairStrategy( action=RepairAction.CHANGE_PORT, description="修复端口绑定错误", modifications={ "type": "port_change", "old_port": "7860", "new_port": "7861" }, risk_level="low", success_rate=0.9, estimated_time=120 ) elif error_type == ErrorType.PERMISSION_ERROR: return RepairStrategy( action=RepairAction.SET_PERMISSIONS, description="修复权限错误", modifications={ "type": "environment_fix", "environment_variables": { "CHMOD_CMD": "chmod +x /app/*" } }, risk_level="medium", success_rate=0.6, estimated_time=180 ) else: # 默认策略 return RepairStrategy( action=RepairAction.MODIFY_DOCKERFILE, description="通用修复策略", modifications={ "type": "syntax_fix", "fix_type": "general" }, risk_level="medium", success_rate=0.5, estimated_time=300 ) async def add_space_to_monitor(self, space_id: str) -> bool: """添加 Space 到监控列表""" try: # 获取 Space 信息 space_info = await self.hf_client.get_space_info(space_id) if space_info: await self.loop_engine.add_space(space_info) self.logger.info(f"已添加到监控: {space_id}") return True else: self.logger.error(f"无法获取 Space 信息: {space_id}") return False except Exception as e: self.logger.error(f"添加监控失败 {space_id}: {e}") return False async def remove_space_from_monitor(self, space_id: str) -> bool: """从监控列表移除 Space""" try: await self.loop_engine.remove_space(space_id) self.logger.info(f"已从监控移除: {space_id}") return True except Exception as e: self.logger.error(f"移除监控失败 {space_id}: {e}") return False def get_system_status(self) -> Dict[str, Any]: """获取系统状态""" uptime = None if self.system_stats['start_time']: uptime = (datetime.now() - self.system_stats['start_time']).total_seconds() return { 'uptime_seconds': uptime, 'is_running': self.orchestrator.is_running, 'system_stats': self.system_stats.copy(), 'orchestrator_stats': self.orchestrator.get_orchestrator_stats(), 'loop_engine_stats': self.loop_engine.get_stats(), 'active_workflows': self.orchestrator.get_active_workflows(), 'monitored_spaces': len(self.loop_engine.monitored_spaces) } async def get_repair_history(self, space_id: Optional[str] = None) -> Dict[str, Any]: """获取修复历史""" # 从编排器获取事件历史 events = self.orchestrator.get_events(space_id=space_id) # 从修复执行器获取修复统计 repair_stats = self.repair_executor.get_repair_stats() # 从回滚管理器获取回滚历史 rollback_history = self.rollback_manager.get_rollback_history() return { 'events': events, 'repair_stats': repair_stats, 'rollback_history': rollback_history, 'timestamp': datetime.now().isoformat() } async def generate_comprehensive_report(self) -> Dict[str, Any]: """生成综合报告""" system_status = self.get_system_status() repair_history = await self.get_repair_history() # 计算成功率 total_repairs = self.system_stats['total_repairs_attempted'] success_rate = 0.0 if total_repairs > 0: success_rate = self.system_stats['total_repairs_successful'] / total_repairs return { 'report_time': datetime.now().isoformat(), 'system_status': system_status, 'repair_history': repair_history, 'metrics': { 'error_detection_rate': self.system_stats['total_errors_detected'], 'repair_success_rate': success_rate, 'rollback_rate': self.system_stats['total_rollbacks_triggered'] / max(1, total_repairs), 'average_repair_time': None # 需要从历史数据计算 }, 'recommendations': self._generate_recommendations() } def _generate_recommendations(self) -> List[str]: """生成系统优化建议""" recommendations = [] # 基于统计数据的建议 if self.system_stats['total_rollbacks_triggered'] > self.system_stats['total_repairs_successful']: recommendations.append("回滚率较高,建议改进修复策略的质量") if self.system_stats['total_errors_detected'] == 0: recommendations.append("考虑扩大监控范围以检测更多潜在问题") active_workflows = self.orchestrator.get_active_workflows() if len(active_workflows) > 5: recommendations.append("活跃工作流较多,考虑优化并发控制") return recommendations async def main(): """主函数示例""" # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 获取 HuggingFace Token(实际使用时应该从环境变量或配置文件读取) hf_token = "your_huggingface_token_here" if hf_token == "your_huggingface_token_here": print("请设置有效的 HuggingFace Token") return # 创建并启动系统 system = CompleteRepairSystem(hf_token) await system.start_system() try: # 添加要监控的 Spaces test_spaces = [ "username/test-space-1", "username/test-space-2" ] for space_id in test_spaces: await system.add_space_to_monitor(space_id) # 模拟运行一段时间 print("系统运行中,按 Ctrl+C 停止...") await asyncio.sleep(300) # 运行 5 分钟 # 生成报告 report = await system.generate_comprehensive_report() print("\n=== 系统报告 ===") print(json.dumps(report, indent=2, ensure_ascii=False)) except KeyboardInterrupt: print("\n收到停止信号") finally: # 停止系统 await system.stop_system() print("系统已停止") if __name__ == "__main__": asyncio.run(main())