hfproxydemo / quick_test.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
import asyncio
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction
from auto_repair_executor import AutoRepairExecutor
from repair_loop_engine import RepairLoopEngine, LoopConfig
from rollback_manager import RollbackManager
from safety_validator import SafetyValidator
from integration_orchestrator import RepairOrchestrator
class MockHFClient:
async def get_space_info(self, space_id: str):
return {"id": space_id, "status": "error"}
async def get_space_runtime(self, space_id: str):
return {"stage": "BUILDING", "state": "ERROR"}
async def trigger_rebuild(self, space_id: str):
return True
async def quick_demo():
hf_client = MockHFClient()
repair_executor = AutoRepairExecutor(hf_client, repo_path=".")
rollback_manager = RollbackManager("test_backups")
loop_config = LoopConfig(max_iterations=2, timeout_minutes=5)
loop_engine = RepairLoopEngine(repair_executor, loop_config)
orchestrator = RepairOrchestrator(hf_client)
orchestrator.set_components(repair_executor, loop_engine, rollback_manager)
space_info = SpaceInfo(
space_id="test/demo-space",
name="demo-space",
repository_url="https://huggingface.co/spaces/test/demo-space",
current_status=SpaceStatus.ERROR,
last_updated=datetime.now(),
dockerfile_path="Dockerfile"
)
error_info = ErrorInfo(
error_type=ErrorType.DEPENDENCY_INSTALL,
message="pip install failed",
confidence=0.9
)
repair_strategy = RepairStrategy(
action=RepairAction.UPDATE_DEPENDENCIES,
description="Update dependencies",
modifications={"type": "dependency_update", "strategy": "version_bump"},
risk_level="medium",
success_rate=0.8
)
await orchestrator.start_monitoring()
loop_engine.add_space(space_info)
workflow_id = await orchestrator.trigger_repair(space_info, error_info, repair_strategy)
print(f"Workflow started: {workflow_id}")
for i in range(5):
await asyncio.sleep(2)
status = orchestrator.get_workflow_status(workflow_id)
if status:
print(f"Status: {status['state']}")
if status['state'] in ['completed', 'failed']:
break
stats = orchestrator.get_orchestrator_stats()
print(f"Stats: {stats}")
await orchestrator.stop_monitoring()
if __name__ == "__main__":
asyncio.run(quick_demo())