import asyncio import logging from datetime import datetime from pathlib import Path logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') from data_models import SpaceInfo, ErrorInfo, RepairStrategy, SpaceStatus, ErrorType, RepairAction from auto_repair_executor import AutoRepairExecutor from repair_loop_engine import RepairLoopEngine, LoopConfig from rollback_manager import RollbackManager from safety_validator import SafetyValidator from integration_orchestrator import RepairOrchestrator class MockHFClient: async def get_space_info(self, space_id: str): return {"id": space_id, "status": "error"} async def get_space_runtime(self, space_id: str): return {"stage": "BUILDING", "state": "ERROR"} async def trigger_rebuild(self, space_id: str): return True async def quick_demo(): hf_client = MockHFClient() repair_executor = AutoRepairExecutor(hf_client, repo_path=".") rollback_manager = RollbackManager("test_backups") loop_config = LoopConfig(max_iterations=2, timeout_minutes=5) loop_engine = RepairLoopEngine(repair_executor, loop_config) orchestrator = RepairOrchestrator(hf_client) orchestrator.set_components(repair_executor, loop_engine, rollback_manager) space_info = SpaceInfo( space_id="test/demo-space", name="demo-space", repository_url="https://huggingface.co/spaces/test/demo-space", current_status=SpaceStatus.ERROR, last_updated=datetime.now(), dockerfile_path="Dockerfile" ) error_info = ErrorInfo( error_type=ErrorType.DEPENDENCY_INSTALL, message="pip install failed", confidence=0.9 ) repair_strategy = RepairStrategy( action=RepairAction.UPDATE_DEPENDENCIES, description="Update dependencies", modifications={"type": "dependency_update", "strategy": "version_bump"}, risk_level="medium", success_rate=0.8 ) await orchestrator.start_monitoring() loop_engine.add_space(space_info) workflow_id = await orchestrator.trigger_repair(space_info, error_info, repair_strategy) print(f"Workflow started: {workflow_id}") for i in range(5): await asyncio.sleep(2) status = orchestrator.get_workflow_status(workflow_id) if status: print(f"Status: {status['state']}") if status['state'] in ['completed', 'failed']: break stats = orchestrator.get_orchestrator_stats() print(f"Stats: {stats}") await orchestrator.stop_monitoring() if __name__ == "__main__": asyncio.run(quick_demo())