""" Integration test for workflow persistence with actual workflow execution """ import os import shutil import logging from workflows import WorkflowTask, WorkflowDefinition, WorkflowExecutor, WorkflowStore logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Mock tool for testing class MockTool: """Simple mock tool for testing workflow execution""" def __init__(self, name): self.name = name def forward(self, **kwargs): """Execute tool with given arguments""" if "value" in kwargs: return {"result": kwargs["value"]} elif "expression" in kwargs: try: result = eval(kwargs["expression"]) return {"result": result} except Exception as e: return {"error": str(e)} return {"result": "mock_result"} def test_persistence_integration(): """Test that workflows are automatically persisted after successful execution""" test_cache = "./test_integration_cache" # Clean up any existing test cache if os.path.exists(test_cache): shutil.rmtree(test_cache) print("=" * 60) print("Testing Workflow Persistence Integration") print("=" * 60) # Step 1: Create tools registry print("\n[1] Setting up tools...") mock_tool = MockTool("mock_tool") tools_registry = { "mock_tool": mock_tool } print("[PASS] Tools registry created") # Step 2: Create workflow executor with custom cache path print("\n[2] Creating workflow executor...") executor = WorkflowExecutor( tools_registry=tools_registry, store_path=test_cache ) print("[PASS] Executor created with persistence enabled") print(f" - Cache path: {test_cache}") # Step 3: Create a simple workflow print("\n[3] Creating workflow...") workflow = WorkflowDefinition( name="math_workflow", description="Simple math computation workflow", tasks=[ WorkflowTask( id="add_task", tool="mock_tool", args={"expression": "10 + 5"}, depends_on=[] ), WorkflowTask( id="multiply_task", tool="mock_tool", args={"expression": "3 * 7"}, depends_on=[] ), WorkflowTask( id="combine_task", tool="mock_tool", args={"expression": "21 + 15"}, # Results from previous tasks depends_on=["add_task", "multiply_task"] ) ], final_task="combine_task", max_parallel=2 ) print(f"[PASS] Workflow created: {workflow.name}") print(f" - Tasks: {len(workflow.tasks)}") print(f" - Dependencies: add_task, multiply_task -> combine_task") # Step 4: Execute workflow print("\n[4] Executing workflow...") result = executor.execute(workflow) if not result["success"]: print(f"[FAIL] Workflow execution failed: {result.get('error')}") return False print("[PASS] Workflow executed successfully") print(f" - Execution time: {result['execution_time']:.2f}s") print(f" - Final result: {result['result']}") print(f" - Trace entries: {len(result['trace'])}") # Step 5: Verify workflow was persisted print("\n[5] Verifying persistence...") store = WorkflowStore(store_path=test_cache) workflow_ids = store.list_workflows() if len(workflow_ids) != 1: print(f"[FAIL] Expected 1 persisted workflow, found {len(workflow_ids)}") return False print(f"[PASS] Found {len(workflow_ids)} persisted workflow") print(f" - Workflow ID: {workflow_ids[0]}") # Step 6: Load and verify persisted workflow print("\n[6] Loading persisted workflow...") workflow_id = workflow_ids[0] loaded_data = store.load_workflow(workflow_id) if loaded_data is None: print("[FAIL] Failed to load persisted workflow") return False print("[PASS] Workflow loaded successfully") print(f" - Workflow ID: {loaded_data['workflow_id']}") print(f" - Workflow name: {loaded_data['workflow']['name']}") # Step 7: Verify loaded data matches execution print("\n[7] Validating persisted data...") # Check workflow definition assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch" assert len(loaded_data["workflow"]["tasks"]) == len(workflow.tasks), "Task count mismatch" assert loaded_data["workflow"]["final_task"] == workflow.final_task, "Final task mismatch" # Check execution result assert loaded_data["result"]["success"] == result["success"], "Success status mismatch" assert loaded_data["result"]["result"] == result["result"], "Final result mismatch" assert "execution_time" in loaded_data["result"], "Missing execution time" assert "trace" in loaded_data["result"], "Missing trace" print("[PASS] All data validated successfully") print(" - Workflow definition: OK") print(" - Execution result: OK") print(" - Trace: OK") # Step 8: Execute another workflow to test multiple workflows print("\n[8] Testing multiple workflow persistence...") workflow2 = WorkflowDefinition( name="simple_calc", description="Simple calculation", tasks=[ WorkflowTask( id="calc", tool="mock_tool", args={"expression": "100 * 2"}, depends_on=[] ) ], final_task="calc" ) result2 = executor.execute(workflow2) assert result2["success"], "Second workflow execution failed" workflow_ids = store.list_workflows() assert len(workflow_ids) == 2, f"Expected 2 workflows, found {len(workflow_ids)}" print(f"[PASS] Multiple workflows persisted correctly") print(f" - Total workflows: {len(workflow_ids)}") # Cleanup print("\n[Cleanup] Removing test cache...") shutil.rmtree(test_cache) print("[PASS] Test cache removed") print("\n" + "=" * 60) print("[SUCCESS] INTEGRATION TEST PASSED") print("=" * 60) print("\nVerified:") print(" - Workflows auto-persist after successful execution") print(" - Persisted data matches execution results") print(" - Multiple workflows can be stored") print(" - Workflows can be loaded and validated") return True if __name__ == "__main__": try: success = test_persistence_integration() exit(0 if success else 1) except Exception as e: logger.error(f"Test failed with exception: {e}", exc_info=True) exit(1)