hfproxydemo / complete_system_demo.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
"""
完整的自动修复和重部署循环系统示例
演示所有组件的集成使用
"""
import asyncio
import logging
from datetime import datetime
from pathlib import Path
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction
from auto_repair_executor import AutoRepairExecutor
from repair_loop_engine import RepairLoopEngine, LoopConfig
from rollback_manager import RollbackManager
from safety_validator import SafetyValidator
from integration_orchestrator import RepairOrchestrator
class MockHuggingFaceClient:
"""模拟 HuggingFace 客户端"""
async def get_space_info(self, space_id: str):
"""获取 Space 信息"""
return {"id": space_id, "status": "error"}
async def get_space_runtime(self, space_id: str):
"""获取 Space 运行时信息"""
return {"stage": "BUILDING", "state": "ERROR"}
async def trigger_rebuild(self, space_id: str):
"""触发重新构建"""
return True
async def complete_system_demo():
"""完整系统演示"""
print("🚀 启动完整自动修复系统演示")
# 1. 创建模拟客户端
hf_client = MockHuggingFaceClient()
# 2. 创建核心组件
repair_executor = AutoRepairExecutor(hf_client, repo_path=".")
rollback_manager = RollbackManager("demo_backups")
safety_validator = SafetyValidator()
# 3. 创建循环配置
loop_config = LoopConfig(
max_iterations=3,
timeout_minutes=10,
check_interval_seconds=30,
success_wait_seconds=60,
failure_wait_seconds=120,
max_concurrent_repairs=2
)
# 4. 创建循环引擎
loop_engine = RepairLoopEngine(repair_executor, loop_config)
# 5. 创建编排器
orchestrator = RepairOrchestrator(hf_client)
orchestrator.set_components(repair_executor, loop_engine, rollback_manager)
# 6. 创建示例 Space 和错误
space_info = SpaceInfo(
space_id="demo/test-space",
name="demo-test-space",
repository_url="https://huggingface.co/spaces/demo/test-space",
current_status=SpaceStatus.ERROR,
last_updated=datetime.now(),
dockerfile_path="Dockerfile"
)
error_info = ErrorInfo(
error_type=ErrorType.DEPENDENCY_INSTALL,
message="pip install failed: Could not find a version",
log_snippet="ERROR: Could not find a version that satisfies the requirement transformers>=4.0.0",
confidence=0.9,
occurred_at=datetime.now()
)
# 7. 创建修复策略
repair_strategy = RepairStrategy(
action=RepairAction.UPDATE_DEPENDENCIES,
description="更新依赖版本并更换安装源",
modifications={
"type": "dependency_update",
"strategy": "source_change",
"target_files": ["requirements.txt"],
"changes": [
{"action": "add_source", "source": "https://pypi.tuna.tsinghua.edu.cn/simple"},
{"action": "remove_version_pins", "packages": ["transformers"]},
{"action": "update_package", "package": "torch", "version": "latest"}
]
},
risk_level="medium",
success_rate=0.8,
estimated_time=300,
prerequisites=["备份原始文件", "验证网络连接"],
side_effects=["可能影响其他依赖", "需要重新构建环境"],
rollback_possible=True,
manual_review_required=False
)
print("✅ 组件初始化完成")
try:
# 8. 启动编排器监控
await orchestrator.start_monitoring()
print("📊 编排器监控已启动")
# 9. 添加监控 Space
loop_engine.add_space(space_info)
loop_engine.update_space_status(space_info.space_id, SpaceStatus.ERROR, error_info)
print(f"📝 已添加监控 Space: {space_info.space_id}")
# 10. 触发修复流程
workflow_id = await orchestrator.trigger_repair(space_info, error_info, repair_strategy)
print(f"🔧 修复工作流已启动: {workflow_id}")
# 11. 监控修复进度
for i in range(10):
await asyncio.sleep(2)
status = orchestrator.get_workflow_status(workflow_id)
if status:
print(f"⏳ 工作流状态: {status['state']} (运行中: {status['is_running']})")
if status['state'] in ['completed', 'failed']:
break
else:
print("⚠️ 无法获取工作流状态")
# 12. 查看最终结果
final_status = orchestrator.get_workflow_status(workflow_id)
print(f"🏁 最终状态: {final_status}")
# 13. 查看统计信息
stats = orchestrator.get_orchestrator_stats()
print(f"📈 编排器统计: {stats}")
# 14. 生成报告
report = await orchestrator.generate_report()
print(f"📋 系统报告: {report}")
except Exception as e:
print(f"❌ 演示过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 15. 清理资源
await orchestrator.stop_monitoring()
print("🧹 资源清理完成")
print("✨ 演示结束")
async def component_demo():
"""组件单独演示"""
print("\n🔧 组件功能演示")
# 创建模拟客户端
hf_client = MockHuggingFaceClient()
# 1. 安全验证器演示
print("\n🛡️ 安全验证器演示")
safety_validator = SafetyValidator()
# 模拟代码内容
sample_code = '''
import os
import subprocess
def insecure_function(user_input):
# 危险代码示例
os.system(f"echo {user_input}")
eval(user_input)
return "done"
api_key = "sk-1234567890abcdef"
password = "secret123"
'''
# 创建测试文件
test_file = Path("test_app.py")
test_file.write_text(sample_code)
space_info = SpaceInfo(
space_id="demo/security-test",
name="security-test",
repository_url="https://huggingface.co/spaces/demo/security-test",
current_status=SpaceStatus.ERROR,
last_updated=datetime.now(),
dockerfile_path="Dockerfile"
)
error_info = ErrorInfo(
error_type=ErrorType.DEPENDENCY_INSTALL,
message="Security test",
confidence=0.9
)
strategy = RepairStrategy(
action=RepairAction.UPDATE_DEPENDENCIES,
description="安全测试修复",
modifications={"type": "test"}
)
safety_result = await safety_validator.validate_repair_safety(
space_info, error_info, strategy, [str(test_file)]
)
print(f"安全验证结果: {safety_result.status.value}")
print(f"风险级别: {safety_result.risk_level.value}")
print(f"建议: {safety_result.recommendations}")
# 2. 回滚管理器演示
print("\n🔄 回滚管理器演示")
rollback_manager = RollbackManager("demo_rollbacks")
# 创建备份
backup_id = await rollback_manager.backup_strategy.create_backup(
"demo/backup-test",
str(test_file),
rollback_manager.backup_strategy.BackupType.FILE,
"测试备份"
)
print(f"备份已创建: {backup_id}")
# 修改文件
test_file.write_text("修改后的内容")
print("文件已修改")
# 执行回滚
rollback_success = await rollback_manager.execute_rollback(backup_id)
print(f"回滚结果: {rollback_success}")
# 验证回滚
restored_content = test_file.read_text()
print(f"恢复后的内容: {restored_content[:50]}...")
# 清理
test_file.unlink()
# 3. 修复执行器演示
print("\n🔨 修复执行器演示")
repair_executor = AutoRepairExecutor(hf_client, repo_path=".")
# 创建测试 Dockerfile
dockerfile_content = '''
FROM python:3.8
RUN pip install -r requirements.txt
COPY . /app
WORKDIR /app
CMD ["python", "app.py"]
'''
dockerfile = Path("Dockerfile")
dockerfile.write_text(dockerfile_content)
space_info.dockerfile_path = "Dockerfile"
# 模拟修复执行(由于是演示,可能会失败)
try:
success, commit_sha = await repair_executor.execute_repair(
space_info, error_info, strategy
)
print(f"修复执行结果: {success}, 提交: {commit_sha}")
except Exception as e:
print(f"修复执行(预期)失败: {e}")
# 清理
if dockerfile.exists():
dockerfile.unlink()
print("✅ 组件演示完成")
async def integration_test():
"""集成测试"""
print("\n🧪 集成测试")
# 测试数据
test_cases = [
{
"name": "依赖安装失败",
"error_type": ErrorType.DEPENDENCY_INSTALL,
"action": RepairAction.UPDATE_DEPENDENCIES,
"files": ["requirements.txt"]
},
{
"name": "Dockerfile 语法错误",
"error_type": ErrorType.DOCKERFILE_SYNTAX,
"action": RepairAction.MODIFY_DOCKERFILE,
"files": ["Dockerfile"]
},
{
"name": "端口冲突",
"error_type": ErrorType.PORT_CONFLICT,
"action": RepairAction.CHANGE_PORT,
"files": ["app.py"]
}
]
hf_client = MockHuggingFaceClient()
for i, test_case in enumerate(test_cases, 1):
print(f"\n📋 测试用例 {i}: {test_case['name']}")
# 创建测试数据
space_info = SpaceInfo(
space_id=f"test/space-{i}",
name=f"test-space-{i}",
repository_url=f"https://huggingface.co/spaces/test/space-{i}",
current_status=SpaceStatus.ERROR,
last_updated=datetime.now(),
dockerfile_path="Dockerfile"
)
error_info = ErrorInfo(
error_type=test_case["error_type"],
message=f"测试错误: {test_case['name']}",
log_snippet=f"ERROR: {test_case['name']}",
confidence=0.8
)
strategy = RepairStrategy(
action=test_case["action"],
description=f"测试修复: {test_case['name']}",
modifications={"type": "test", "target_files": test_case["files"]},
risk_level="low",
success_rate=0.7,
estimated_time=120
)
# 创建临时文件
for file_path in test_case["files"]:
Path(file_path).write_text(f"# 测试文件: {file_path}")
try:
# 安全验证
safety_validator = SafetyValidator()
safety_result = await safety_validator.validate_repair_safety(
space_info, error_info, strategy, test_case["files"]
)
print(f" ✅ 安全验证: {safety_result.status.value}")
print(f" 📊 风险级别: {safety_result.risk_level.value}")
# 如果验证通过,模拟修复执行
if safety_result.status.value in ["passed", "warning"]:
repair_executor = AutoRepairExecutor(hf_client)
print(f" 🔧 模拟修复执行: {test_case['name']}")
# 实际修复需要真实的 Git 仓库和 HF API
print(f" ✅ 测试用例 {i} 完成")
else:
print(f" ⚠️ 测试用例 {i} 被安全检查阻止")
except Exception as e:
print(f" ❌ 测试用例 {i} 失败: {e}")
finally:
# 清理临时文件
for file_path in test_case["files"]:
file = Path(file_path)
if file.exists():
file.unlink()
print("\n🎯 集成测试完成")
async def main():
"""主函数"""
print("=" * 60)
print("🤖 HuggingFace Spaces 自动修复系统完整演示")
print("=" * 60)
try:
# 1. 组件功能演示
await component_demo()
# 2. 集成测试
await integration_test()
# 3. 完整系统演示
await complete_system_demo()
except KeyboardInterrupt:
print("\n⏹️ 用户中断演示")
except Exception as e:
print(f"\n❌ 演示过程中发生错误: {e}")
import traceback
traceback.print_exc()
print("\n🎉 演示结束")
if __name__ == "__main__":
asyncio.run(main())