hfproxydemo / complete_system_example.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
"""
完整系统集成示例
展示如何使用自动修复和重部署循环机制的所有组件
"""
import asyncio
import logging
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any
# 导入所有组件
from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction
from auto_repair_executor import AutoRepairExecutor
from repair_loop_engine import RepairLoopEngine
from rollback_manager import RollbackManager
from safety_validator import SafetyValidator
from integration_orchestrator import RepairOrchestrator
from huggingface_client import HuggingFaceAPIClient
class CompleteRepairSystem:
"""完整的自动修复系统"""
def __init__(self, hf_token: str, repo_path: str = "."):
self.logger = logging.getLogger(__name__)
# 初始化 API 客户端
self.hf_client = HuggingFaceAPIClient(token=hf_token)
# 初始化核心组件
self.repair_executor = AutoRepairExecutor(self.hf_client, repo_path)
self.loop_engine = RepairLoopEngine()
self.rollback_manager = RollbackManager()
self.safety_validator = SafetyValidator()
# 初始化编排器
self.orchestrator = RepairOrchestrator(self.hf_client, repo_path)
# 配置组件依赖关系
self.orchestrator.set_components(
self.repair_executor,
self.loop_engine,
self.rollback_manager
)
# 系统统计
self.system_stats = {
'start_time': None,
'total_errors_detected': 0,
'total_repairs_attempted': 0,
'total_repairs_successful': 0,
'total_rollbacks_triggered': 0
}
async def start_system(self):
"""启动系统"""
self.logger.info("启动自动修复系统")
self.system_stats['start_time'] = datetime.now()
# 启动编排器
await self.orchestrator.start_monitoring()
# 启动循环引擎
await self.loop_engine.start()
# 设置错误处理回调
self.loop_engine.set_error_handler(self._on_error_detected)
self.logger.info("系统启动完成")
async def stop_system(self):
"""停止系统"""
self.logger.info("停止自动修复系统")
# 停止循环引擎
await self.loop_engine.stop()
# 停止编排器
await self.orchestrator.stop_monitoring()
self.logger.info("系统停止完成")
async def _on_error_detected(self, space_info: SpaceInfo, error_info: ErrorInfo):
"""错误检测回调"""
self.system_stats['total_errors_detected'] += 1
self.logger.warning(f"检测到错误: {space_info.space_id} - {error_info.message}")
# 分析错误并生成修复策略
strategy = await self._generate_repair_strategy(error_info)
if strategy:
# 触发修复
workflow_id = await self.orchestrator.trigger_repair(
space_info, error_info, strategy
)
if workflow_id:
self.system_stats['total_repairs_attempted'] += 1
self.logger.info(f"修复工作流已启动: {workflow_id}")
else:
self.logger.error("启动修复工作流失败")
else:
self.logger.warning("无法生成修复策略")
async def _generate_repair_strategy(self, error_info: ErrorInfo) -> RepairStrategy:
"""生成修复策略"""
error_type = error_info.error_type
if error_type == ErrorType.DEPENDENCY_INSTALL:
return RepairStrategy(
action=RepairAction.UPDATE_DEPENDENCIES,
description="修复依赖安装失败",
modifications={
"type": "dependency_update",
"strategy": "source_change"
},
risk_level="medium",
success_rate=0.7,
estimated_time=300
)
elif error_type == ErrorType.DOCKER_BUILD_ERROR:
return RepairStrategy(
action=RepairAction.MODIFY_DOCKERFILE,
description="修复 Docker 构建错误",
modifications={
"type": "syntax_fix",
"fix_type": "dockerfile_from",
"new_line": "FROM python:3.9-slim"
},
risk_level="high",
success_rate=0.8,
estimated_time=600
)
elif error_type == ErrorType.PORT_BINDING_ERROR:
return RepairStrategy(
action=RepairAction.CHANGE_PORT,
description="修复端口绑定错误",
modifications={
"type": "port_change",
"old_port": "7860",
"new_port": "7861"
},
risk_level="low",
success_rate=0.9,
estimated_time=120
)
elif error_type == ErrorType.PERMISSION_ERROR:
return RepairStrategy(
action=RepairAction.SET_PERMISSIONS,
description="修复权限错误",
modifications={
"type": "environment_fix",
"environment_variables": {
"CHMOD_CMD": "chmod +x /app/*"
}
},
risk_level="medium",
success_rate=0.6,
estimated_time=180
)
else:
# 默认策略
return RepairStrategy(
action=RepairAction.MODIFY_DOCKERFILE,
description="通用修复策略",
modifications={
"type": "syntax_fix",
"fix_type": "general"
},
risk_level="medium",
success_rate=0.5,
estimated_time=300
)
async def add_space_to_monitor(self, space_id: str) -> bool:
"""添加 Space 到监控列表"""
try:
# 获取 Space 信息
space_info = await self.hf_client.get_space_info(space_id)
if space_info:
await self.loop_engine.add_space(space_info)
self.logger.info(f"已添加到监控: {space_id}")
return True
else:
self.logger.error(f"无法获取 Space 信息: {space_id}")
return False
except Exception as e:
self.logger.error(f"添加监控失败 {space_id}: {e}")
return False
async def remove_space_from_monitor(self, space_id: str) -> bool:
"""从监控列表移除 Space"""
try:
await self.loop_engine.remove_space(space_id)
self.logger.info(f"已从监控移除: {space_id}")
return True
except Exception as e:
self.logger.error(f"移除监控失败 {space_id}: {e}")
return False
def get_system_status(self) -> Dict[str, Any]:
"""获取系统状态"""
uptime = None
if self.system_stats['start_time']:
uptime = (datetime.now() - self.system_stats['start_time']).total_seconds()
return {
'uptime_seconds': uptime,
'is_running': self.orchestrator.is_running,
'system_stats': self.system_stats.copy(),
'orchestrator_stats': self.orchestrator.get_orchestrator_stats(),
'loop_engine_stats': self.loop_engine.get_stats(),
'active_workflows': self.orchestrator.get_active_workflows(),
'monitored_spaces': len(self.loop_engine.monitored_spaces)
}
async def get_repair_history(self, space_id: Optional[str] = None) -> Dict[str, Any]:
"""获取修复历史"""
# 从编排器获取事件历史
events = self.orchestrator.get_events(space_id=space_id)
# 从修复执行器获取修复统计
repair_stats = self.repair_executor.get_repair_stats()
# 从回滚管理器获取回滚历史
rollback_history = self.rollback_manager.get_rollback_history()
return {
'events': events,
'repair_stats': repair_stats,
'rollback_history': rollback_history,
'timestamp': datetime.now().isoformat()
}
async def generate_comprehensive_report(self) -> Dict[str, Any]:
"""生成综合报告"""
system_status = self.get_system_status()
repair_history = await self.get_repair_history()
# 计算成功率
total_repairs = self.system_stats['total_repairs_attempted']
success_rate = 0.0
if total_repairs > 0:
success_rate = self.system_stats['total_repairs_successful'] / total_repairs
return {
'report_time': datetime.now().isoformat(),
'system_status': system_status,
'repair_history': repair_history,
'metrics': {
'error_detection_rate': self.system_stats['total_errors_detected'],
'repair_success_rate': success_rate,
'rollback_rate': self.system_stats['total_rollbacks_triggered'] / max(1, total_repairs),
'average_repair_time': None # 需要从历史数据计算
},
'recommendations': self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""生成系统优化建议"""
recommendations = []
# 基于统计数据的建议
if self.system_stats['total_rollbacks_triggered'] > self.system_stats['total_repairs_successful']:
recommendations.append("回滚率较高,建议改进修复策略的质量")
if self.system_stats['total_errors_detected'] == 0:
recommendations.append("考虑扩大监控范围以检测更多潜在问题")
active_workflows = self.orchestrator.get_active_workflows()
if len(active_workflows) > 5:
recommendations.append("活跃工作流较多,考虑优化并发控制")
return recommendations
async def main():
"""主函数示例"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 获取 HuggingFace Token(实际使用时应该从环境变量或配置文件读取)
hf_token = "your_huggingface_token_here"
if hf_token == "your_huggingface_token_here":
print("请设置有效的 HuggingFace Token")
return
# 创建并启动系统
system = CompleteRepairSystem(hf_token)
await system.start_system()
try:
# 添加要监控的 Spaces
test_spaces = [
"username/test-space-1",
"username/test-space-2"
]
for space_id in test_spaces:
await system.add_space_to_monitor(space_id)
# 模拟运行一段时间
print("系统运行中,按 Ctrl+C 停止...")
await asyncio.sleep(300) # 运行 5 分钟
# 生成报告
report = await system.generate_comprehensive_report()
print("\n=== 系统报告 ===")
print(json.dumps(report, indent=2, ensure_ascii=False))
except KeyboardInterrupt:
print("\n收到停止信号")
finally:
# 停止系统
await system.stop_system()
print("系统已停止")
if __name__ == "__main__":
asyncio.run(main())