Spaces:
Sleeping
Sleeping
| """ | |
| Integration test for workflow persistence with actual workflow execution | |
| """ | |
| import os | |
| import shutil | |
| import logging | |
| from workflows import WorkflowTask, WorkflowDefinition, WorkflowExecutor, WorkflowStore | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Mock tool for testing | |
| class MockTool: | |
| """Simple mock tool for testing workflow execution""" | |
| def __init__(self, name): | |
| self.name = name | |
| def forward(self, **kwargs): | |
| """Execute tool with given arguments""" | |
| if "value" in kwargs: | |
| return {"result": kwargs["value"]} | |
| elif "expression" in kwargs: | |
| try: | |
| result = eval(kwargs["expression"]) | |
| return {"result": result} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| return {"result": "mock_result"} | |
| def test_persistence_integration(): | |
| """Test that workflows are automatically persisted after successful execution""" | |
| test_cache = "./test_integration_cache" | |
| # Clean up any existing test cache | |
| if os.path.exists(test_cache): | |
| shutil.rmtree(test_cache) | |
| print("=" * 60) | |
| print("Testing Workflow Persistence Integration") | |
| print("=" * 60) | |
| # Step 1: Create tools registry | |
| print("\n[1] Setting up tools...") | |
| mock_tool = MockTool("mock_tool") | |
| tools_registry = { | |
| "mock_tool": mock_tool | |
| } | |
| print("[PASS] Tools registry created") | |
| # Step 2: Create workflow executor with custom cache path | |
| print("\n[2] Creating workflow executor...") | |
| executor = WorkflowExecutor( | |
| tools_registry=tools_registry, | |
| store_path=test_cache | |
| ) | |
| print("[PASS] Executor created with persistence enabled") | |
| print(f" - Cache path: {test_cache}") | |
| # Step 3: Create a simple workflow | |
| print("\n[3] Creating workflow...") | |
| workflow = WorkflowDefinition( | |
| name="math_workflow", | |
| description="Simple math computation workflow", | |
| tasks=[ | |
| WorkflowTask( | |
| id="add_task", | |
| tool="mock_tool", | |
| args={"expression": "10 + 5"}, | |
| depends_on=[] | |
| ), | |
| WorkflowTask( | |
| id="multiply_task", | |
| tool="mock_tool", | |
| args={"expression": "3 * 7"}, | |
| depends_on=[] | |
| ), | |
| WorkflowTask( | |
| id="combine_task", | |
| tool="mock_tool", | |
| args={"expression": "21 + 15"}, # Results from previous tasks | |
| depends_on=["add_task", "multiply_task"] | |
| ) | |
| ], | |
| final_task="combine_task", | |
| max_parallel=2 | |
| ) | |
| print(f"[PASS] Workflow created: {workflow.name}") | |
| print(f" - Tasks: {len(workflow.tasks)}") | |
| print(f" - Dependencies: add_task, multiply_task -> combine_task") | |
| # Step 4: Execute workflow | |
| print("\n[4] Executing workflow...") | |
| result = executor.execute(workflow) | |
| if not result["success"]: | |
| print(f"[FAIL] Workflow execution failed: {result.get('error')}") | |
| return False | |
| print("[PASS] Workflow executed successfully") | |
| print(f" - Execution time: {result['execution_time']:.2f}s") | |
| print(f" - Final result: {result['result']}") | |
| print(f" - Trace entries: {len(result['trace'])}") | |
| # Step 5: Verify workflow was persisted | |
| print("\n[5] Verifying persistence...") | |
| store = WorkflowStore(store_path=test_cache) | |
| workflow_ids = store.list_workflows() | |
| if len(workflow_ids) != 1: | |
| print(f"[FAIL] Expected 1 persisted workflow, found {len(workflow_ids)}") | |
| return False | |
| print(f"[PASS] Found {len(workflow_ids)} persisted workflow") | |
| print(f" - Workflow ID: {workflow_ids[0]}") | |
| # Step 6: Load and verify persisted workflow | |
| print("\n[6] Loading persisted workflow...") | |
| workflow_id = workflow_ids[0] | |
| loaded_data = store.load_workflow(workflow_id) | |
| if loaded_data is None: | |
| print("[FAIL] Failed to load persisted workflow") | |
| return False | |
| print("[PASS] Workflow loaded successfully") | |
| print(f" - Workflow ID: {loaded_data['workflow_id']}") | |
| print(f" - Workflow name: {loaded_data['workflow']['name']}") | |
| # Step 7: Verify loaded data matches execution | |
| print("\n[7] Validating persisted data...") | |
| # Check workflow definition | |
| assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch" | |
| assert len(loaded_data["workflow"]["tasks"]) == len(workflow.tasks), "Task count mismatch" | |
| assert loaded_data["workflow"]["final_task"] == workflow.final_task, "Final task mismatch" | |
| # Check execution result | |
| assert loaded_data["result"]["success"] == result["success"], "Success status mismatch" | |
| assert loaded_data["result"]["result"] == result["result"], "Final result mismatch" | |
| assert "execution_time" in loaded_data["result"], "Missing execution time" | |
| assert "trace" in loaded_data["result"], "Missing trace" | |
| print("[PASS] All data validated successfully") | |
| print(" - Workflow definition: OK") | |
| print(" - Execution result: OK") | |
| print(" - Trace: OK") | |
| # Step 8: Execute another workflow to test multiple workflows | |
| print("\n[8] Testing multiple workflow persistence...") | |
| workflow2 = WorkflowDefinition( | |
| name="simple_calc", | |
| description="Simple calculation", | |
| tasks=[ | |
| WorkflowTask( | |
| id="calc", | |
| tool="mock_tool", | |
| args={"expression": "100 * 2"}, | |
| depends_on=[] | |
| ) | |
| ], | |
| final_task="calc" | |
| ) | |
| result2 = executor.execute(workflow2) | |
| assert result2["success"], "Second workflow execution failed" | |
| workflow_ids = store.list_workflows() | |
| assert len(workflow_ids) == 2, f"Expected 2 workflows, found {len(workflow_ids)}" | |
| print(f"[PASS] Multiple workflows persisted correctly") | |
| print(f" - Total workflows: {len(workflow_ids)}") | |
| # Cleanup | |
| print("\n[Cleanup] Removing test cache...") | |
| shutil.rmtree(test_cache) | |
| print("[PASS] Test cache removed") | |
| print("\n" + "=" * 60) | |
| print("[SUCCESS] INTEGRATION TEST PASSED") | |
| print("=" * 60) | |
| print("\nVerified:") | |
| print(" - Workflows auto-persist after successful execution") | |
| print(" - Persisted data matches execution results") | |
| print(" - Multiple workflows can be stored") | |
| print(" - Workflows can be loaded and validated") | |
| return True | |
| if __name__ == "__main__": | |
| try: | |
| success = test_persistence_integration() | |
| exit(0 if success else 1) | |
| except Exception as e: | |
| logger.error(f"Test failed with exception: {e}", exc_info=True) | |
| exit(1) | |