Spaces:
Build error
Build error
| """ | |
| Unit tests for workflow orchestration engine | |
| """ | |
| import pytest | |
| import asyncio | |
| import json | |
| from unittest.mock import Mock, AsyncMock, patch | |
| from datetime import datetime | |
| from uuid import uuid4 | |
| from src.infrastructure.workflow.workflow_engine import ( | |
| WorkflowEngine, | |
| WorkflowBuilder, | |
| WorkflowDefinition, | |
| WorkflowExecution, | |
| WorkflowStep, | |
| WorkflowStatus, | |
| WorkflowType, | |
| WorkflowState, | |
| register_workflow, | |
| execute_workflow, | |
| get_execution_status, | |
| create_workflow_builder, | |
| workflow_engine | |
| ) | |
| class TestWorkflowStep: | |
| """Test workflow step functionality""" | |
| def test_workflow_step_creation(self): | |
| """Test creating a workflow step""" | |
| step = WorkflowStep( | |
| step_id="step_1", | |
| name="Test Step", | |
| description="A test step", | |
| agent_id="agent_1", | |
| input_mapping={"input": "input.data"}, | |
| output_mapping={"output": "output.result"}, | |
| timeout=30.0, | |
| retry_count=3, | |
| retry_delay=1.0, | |
| dependencies=["step_0"], | |
| condition="state.step_results['step_0']['status'] == 'success'", | |
| parallel=False | |
| ) | |
| assert step.step_id == "step_1" | |
| assert step.name == "Test Step" | |
| assert step.description == "A test step" | |
| assert step.agent_id == "agent_1" | |
| assert step.input_mapping == {"input": "input.data"} | |
| assert step.output_mapping == {"output": "output.result"} | |
| assert step.timeout == 30.0 | |
| assert step.retry_count == 3 | |
| assert step.retry_delay == 1.0 | |
| assert step.dependencies == ["step_0"] | |
| assert step.condition == "state.step_results['step_0']['status'] == 'success'" | |
| assert step.parallel is False | |
| class TestWorkflowDefinition: | |
| """Test workflow definition functionality""" | |
| def test_workflow_definition_creation(self): | |
| """Test creating a workflow definition""" | |
| steps = [ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Step 1", | |
| description="First step", | |
| agent_id="agent_1" | |
| ), | |
| WorkflowStep( | |
| step_id="step_2", | |
| name="Step 2", | |
| description="Second step", | |
| agent_id="agent_2" | |
| ) | |
| ] | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=steps, | |
| input_schema={"data": "string"}, | |
| output_schema={"result": "string"}, | |
| timeout=60.0, | |
| max_retries=3, | |
| metadata={"version": "1.0"} | |
| ) | |
| assert workflow.workflow_id == "workflow_1" | |
| assert workflow.name == "Test Workflow" | |
| assert workflow.description == "A test workflow" | |
| assert workflow.workflow_type == WorkflowType.SEQUENTIAL | |
| assert len(workflow.steps) == 2 | |
| assert workflow.input_schema == {"data": "string"} | |
| assert workflow.output_schema == {"result": "string"} | |
| assert workflow.timeout == 60.0 | |
| assert workflow.max_retries == 3 | |
| assert workflow.metadata == {"version": "1.0"} | |
| class TestWorkflowExecution: | |
| """Test workflow execution functionality""" | |
| def test_workflow_execution_creation(self): | |
| """Test creating a workflow execution""" | |
| execution = WorkflowExecution( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.RUNNING, | |
| input_data={"data": "test"}, | |
| output_data={"result": "success"}, | |
| step_results={"step_1": {"status": "success"}}, | |
| error_message=None, | |
| start_time=datetime.now(), | |
| end_time=datetime.now(), | |
| metadata={"duration": 10.0} | |
| ) | |
| assert execution.execution_id == "exec_1" | |
| assert execution.workflow_id == "workflow_1" | |
| assert execution.status == WorkflowStatus.RUNNING | |
| assert execution.input_data == {"data": "test"} | |
| assert execution.output_data == {"result": "success"} | |
| assert execution.step_results == {"step_1": {"status": "success"}} | |
| assert execution.error_message is None | |
| assert execution.metadata == {"duration": 10.0} | |
| class TestWorkflowState: | |
| """Test workflow state functionality""" | |
| def test_workflow_state_creation(self): | |
| """Test creating a workflow state""" | |
| state = WorkflowState( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| current_step="step_1", | |
| step_results={"step_1": {"status": "success"}}, | |
| input_data={"data": "test"}, | |
| output_data={"result": "success"}, | |
| error=None, | |
| metadata={"progress": 50}, | |
| step_history=["step_1"], | |
| retry_count={"step_1": 0} | |
| ) | |
| assert state.execution_id == "exec_1" | |
| assert state.workflow_id == "workflow_1" | |
| assert state.current_step == "step_1" | |
| assert state.step_results == {"step_1": {"status": "success"}} | |
| assert state.input_data == {"data": "test"} | |
| assert state.output_data == {"result": "success"} | |
| assert state.error is None | |
| assert state.metadata == {"progress": 50} | |
| assert state.step_history == ["step_1"] | |
| assert state.retry_count == {"step_1": 0} | |
| class TestWorkflowBuilder: | |
| """Test workflow builder functionality""" | |
| def test_workflow_builder_creation(self): | |
| """Test creating a workflow builder""" | |
| builder = WorkflowBuilder("Test Workflow", "A test workflow") | |
| assert builder.name == "Test Workflow" | |
| assert builder.description == "A test workflow" | |
| assert builder.workflow_type == WorkflowType.SEQUENTIAL | |
| assert len(builder.steps) == 0 | |
| def test_set_type(self): | |
| """Test setting workflow type""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| builder.set_type(WorkflowType.PARALLEL) | |
| assert builder.workflow_type == WorkflowType.PARALLEL | |
| def test_add_step(self): | |
| """Test adding a step""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| step = WorkflowStep( | |
| step_id="step_1", | |
| name="Test Step", | |
| description="A test step", | |
| agent_id="agent_1" | |
| ) | |
| builder.add_step(step) | |
| assert len(builder.steps) == 1 | |
| assert builder.steps[0] == step | |
| def test_add_agent_step(self): | |
| """Test adding an agent step""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| builder.add_agent_step( | |
| name="Agent Step", | |
| agent_id="agent_1", | |
| description="An agent step", | |
| input_mapping={"input": "input.data"}, | |
| output_mapping={"output": "output.result"} | |
| ) | |
| assert len(builder.steps) == 1 | |
| step = builder.steps[0] | |
| assert step.name == "Agent Step" | |
| assert step.agent_id == "agent_1" | |
| assert step.description == "An agent step" | |
| assert step.input_mapping == {"input": "input.data"} | |
| assert step.output_mapping == {"output": "output.result"} | |
| def test_add_tool_step(self): | |
| """Test adding a tool step""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| builder.add_tool_step( | |
| name="Tool Step", | |
| tool_name="test_tool", | |
| description="A tool step", | |
| input_mapping={"input": "input.data"}, | |
| output_mapping={"output": "output.result"} | |
| ) | |
| assert len(builder.steps) == 1 | |
| step = builder.steps[0] | |
| assert step.name == "Tool Step" | |
| assert step.tool_name == "test_tool" | |
| assert step.description == "A tool step" | |
| def test_set_schemas(self): | |
| """Test setting input and output schemas""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| input_schema = {"data": "string"} | |
| output_schema = {"result": "string"} | |
| builder.set_input_schema(input_schema) | |
| builder.set_output_schema(output_schema) | |
| assert builder.input_schema == input_schema | |
| assert builder.output_schema == output_schema | |
| def test_set_timeout_and_retries(self): | |
| """Test setting timeout and retries""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| builder.set_timeout(60.0) | |
| builder.set_max_retries(5) | |
| assert builder.timeout == 60.0 | |
| assert builder.max_retries == 5 | |
| def test_add_metadata(self): | |
| """Test adding metadata""" | |
| builder = WorkflowBuilder("Test Workflow") | |
| builder.add_metadata("version", "1.0") | |
| builder.add_metadata("author", "test") | |
| assert builder.metadata["version"] == "1.0" | |
| assert builder.metadata["author"] == "test" | |
| def test_build(self): | |
| """Test building a workflow definition""" | |
| builder = WorkflowBuilder("Test Workflow", "A test workflow") | |
| builder.set_type(WorkflowType.SEQUENTIAL) | |
| builder.add_agent_step("Step 1", "agent_1") | |
| builder.add_tool_step("Step 2", "tool_1") | |
| builder.set_input_schema({"data": "string"}) | |
| builder.set_output_schema({"result": "string"}) | |
| builder.set_timeout(60.0) | |
| builder.set_max_retries(3) | |
| builder.add_metadata("version", "1.0") | |
| workflow = builder.build() | |
| assert isinstance(workflow, WorkflowDefinition) | |
| assert workflow.name == "Test Workflow" | |
| assert workflow.description == "A test workflow" | |
| assert workflow.workflow_type == WorkflowType.SEQUENTIAL | |
| assert len(workflow.steps) == 2 | |
| assert workflow.input_schema == {"data": "string"} | |
| assert workflow.output_schema == {"result": "string"} | |
| assert workflow.timeout == 60.0 | |
| assert workflow.max_retries == 3 | |
| assert workflow.metadata["version"] == "1.0" | |
| class TestWorkflowEngine: | |
| """Test workflow engine functionality""" | |
| def engine(self): | |
| """Create a workflow engine for testing""" | |
| return WorkflowEngine() | |
| def mock_agent(self): | |
| """Create a mock agent""" | |
| agent = AsyncMock() | |
| agent.ainvoke.return_value.content = "Agent response" | |
| agent.ainvoke.return_value.additional_kwargs = {} | |
| return agent | |
| def mock_tool(self): | |
| """Create a mock tool""" | |
| tool = AsyncMock() | |
| tool.ainvoke.return_value = "Tool result" | |
| return tool | |
| async def test_register_workflow(self, engine): | |
| """Test registering a workflow""" | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| result = await engine.register_workflow(workflow) | |
| assert result is True | |
| assert "workflow_1" in engine.workflows | |
| assert engine.workflows["workflow_1"] == workflow | |
| async def test_unregister_workflow(self, engine): | |
| """Test unregistering a workflow""" | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| result = await engine.unregister_workflow("workflow_1") | |
| assert result is True | |
| assert "workflow_1" not in engine.workflows | |
| async def test_unregister_nonexistent_workflow(self, engine): | |
| """Test unregistering a non-existent workflow""" | |
| result = await engine.unregister_workflow("nonexistent") | |
| assert result is False | |
| async def test_register_agent(self, engine): | |
| """Test registering an agent""" | |
| agent = Mock() | |
| await engine.register_agent("agent_1", agent) | |
| assert "agent_1" in engine.agents | |
| assert engine.agents["agent_1"] == agent | |
| async def test_register_tool(self, engine): | |
| """Test registering a tool""" | |
| tool = Mock() | |
| await engine.register_tool("tool_1", tool) | |
| assert "tool_1" in engine.tools | |
| assert engine.tools["tool_1"] == tool | |
| async def test_execute_workflow_success(self, engine, mock_agent): | |
| """Test successful workflow execution""" | |
| # Register agent | |
| await engine.register_agent("agent_1", mock_agent) | |
| # Create workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Test Step", | |
| description="A test step", | |
| agent_id="agent_1", | |
| input_mapping={"message": "input.data"}, | |
| output_mapping={"response": "output.result"} | |
| ) | |
| ], | |
| input_schema={"data": "string"}, | |
| output_schema={"result": "string"} | |
| ) | |
| await engine.register_workflow(workflow) | |
| # Execute workflow | |
| execution = await engine.execute_workflow( | |
| "workflow_1", | |
| {"data": "test input"} | |
| ) | |
| assert execution.workflow_id == "workflow_1" | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert execution.input_data == {"data": "test input"} | |
| assert execution.output_data == {"result": "Agent response"} | |
| assert "step_1" in execution.step_results | |
| assert execution.step_results["step_1"]["status"] == "success" | |
| async def test_execute_workflow_with_tool(self, engine, mock_tool): | |
| """Test workflow execution with tool""" | |
| # Register tool | |
| await engine.register_tool("tool_1", mock_tool) | |
| # Create workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Tool Step", | |
| description="A tool step", | |
| tool_name="tool_1", | |
| input_mapping={"input": "input.data"}, | |
| output_mapping={"result": "output.result"} | |
| ) | |
| ], | |
| input_schema={"data": "string"}, | |
| output_schema={"result": "string"} | |
| ) | |
| await engine.register_workflow(workflow) | |
| # Execute workflow | |
| execution = await engine.execute_workflow( | |
| "workflow_1", | |
| {"data": "test input"} | |
| ) | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert execution.output_data == {"result": "Tool result"} | |
| async def test_execute_nonexistent_workflow(self, engine): | |
| """Test executing a non-existent workflow""" | |
| with pytest.raises(ValueError, match="Workflow workflow_1 not found"): | |
| await engine.execute_workflow("workflow_1", {}) | |
| async def test_execute_workflow_with_missing_agent(self, engine): | |
| """Test workflow execution with missing agent""" | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Test Step", | |
| description="A test step", | |
| agent_id="missing_agent" | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("workflow_1", {}) | |
| assert execution.status == WorkflowStatus.FAILED | |
| assert "Agent missing_agent not found" in execution.error_message | |
| async def test_execute_workflow_with_missing_tool(self, engine): | |
| """Test workflow execution with missing tool""" | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Tool Step", | |
| description="A tool step", | |
| tool_name="missing_tool" | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("workflow_1", {}) | |
| assert execution.status == WorkflowStatus.FAILED | |
| assert "Tool missing_tool not found" in execution.error_message | |
| async def test_execute_workflow_with_agent_error(self, engine): | |
| """Test workflow execution with agent error""" | |
| # Create agent that raises an error | |
| error_agent = AsyncMock() | |
| error_agent.ainvoke.side_effect = ValueError("Agent error") | |
| await engine.register_agent("error_agent", error_agent) | |
| workflow = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Error Step", | |
| description="A step that will error", | |
| agent_id="error_agent", | |
| retry_count=1 | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("workflow_1", {}) | |
| assert execution.status == WorkflowStatus.FAILED | |
| assert "Agent error" in execution.error_message | |
| async def test_get_execution_status(self, engine): | |
| """Test getting execution status""" | |
| execution = WorkflowExecution( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.COMPLETED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| engine.executions["exec_1"] = execution | |
| result = await engine.get_execution_status("exec_1") | |
| assert result == execution | |
| async def test_get_nonexistent_execution_status(self, engine): | |
| """Test getting non-existent execution status""" | |
| result = await engine.get_execution_status("nonexistent") | |
| assert result is None | |
| async def test_cancel_execution(self, engine): | |
| """Test canceling an execution""" | |
| execution = WorkflowExecution( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.RUNNING, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| engine.executions["exec_1"] = execution | |
| result = await engine.cancel_execution("exec_1") | |
| assert result is True | |
| assert execution.status == WorkflowStatus.CANCELLED | |
| async def test_cancel_completed_execution(self, engine): | |
| """Test canceling a completed execution""" | |
| execution = WorkflowExecution( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.COMPLETED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| engine.executions["exec_1"] = execution | |
| result = await engine.cancel_execution("exec_1") | |
| assert result is False | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| async def test_get_workflow_definitions(self, engine): | |
| """Test getting workflow definitions""" | |
| workflow1 = WorkflowDefinition( | |
| workflow_id="workflow_1", | |
| name="Workflow 1", | |
| description="First workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| workflow2 = WorkflowDefinition( | |
| workflow_id="workflow_2", | |
| name="Workflow 2", | |
| description="Second workflow", | |
| workflow_type=WorkflowType.PARALLEL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow1) | |
| await engine.register_workflow(workflow2) | |
| definitions = await engine.get_workflow_definitions() | |
| assert len(definitions) == 2 | |
| workflow_ids = [w.workflow_id for w in definitions] | |
| assert "workflow_1" in workflow_ids | |
| assert "workflow_2" in workflow_ids | |
| async def test_get_execution_history(self, engine): | |
| """Test getting execution history""" | |
| execution1 = WorkflowExecution( | |
| execution_id="exec_1", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.COMPLETED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| execution2 = WorkflowExecution( | |
| execution_id="exec_2", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.FAILED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| execution3 = WorkflowExecution( | |
| execution_id="exec_3", | |
| workflow_id="workflow_2", | |
| status=WorkflowStatus.COMPLETED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| engine.executions["exec_1"] = execution1 | |
| engine.executions["exec_2"] = execution2 | |
| engine.executions["exec_3"] = execution3 | |
| # Get all executions | |
| all_executions = await engine.get_execution_history() | |
| assert len(all_executions) == 3 | |
| # Get executions for specific workflow | |
| workflow1_executions = await engine.get_execution_history("workflow_1") | |
| assert len(workflow1_executions) == 2 | |
| assert all(e.workflow_id == "workflow_1" for e in workflow1_executions) | |
| class TestUtilityFunctions: | |
| """Test utility functions""" | |
| async def test_register_workflow_global(self): | |
| """Test registering workflow with global engine""" | |
| workflow = WorkflowDefinition( | |
| workflow_id="global_workflow", | |
| name="Global Workflow", | |
| description="A global workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| result = await register_workflow(workflow) | |
| assert result is True | |
| assert "global_workflow" in workflow_engine.workflows | |
| async def test_execute_workflow_global(self): | |
| """Test executing workflow with global engine""" | |
| # Register a simple workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="test_workflow", | |
| name="Test Workflow", | |
| description="A test workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await register_workflow(workflow) | |
| execution = await execute_workflow("test_workflow", {"data": "test"}) | |
| assert execution.workflow_id == "test_workflow" | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert execution.input_data == {"data": "test"} | |
| async def test_get_execution_status_global(self): | |
| """Test getting execution status with global engine""" | |
| execution = WorkflowExecution( | |
| execution_id="global_exec", | |
| workflow_id="workflow_1", | |
| status=WorkflowStatus.COMPLETED, | |
| input_data={}, | |
| output_data={} | |
| ) | |
| workflow_engine.executions["global_exec"] = execution | |
| result = await get_execution_status("global_exec") | |
| assert result == execution | |
| def test_create_workflow_builder(self): | |
| """Test creating workflow builder""" | |
| builder = create_workflow_builder("Test Builder", "A test builder") | |
| assert isinstance(builder, WorkflowBuilder) | |
| assert builder.name == "Test Builder" | |
| assert builder.description == "A test builder" | |
| class TestWorkflowTypes: | |
| """Test different workflow types""" | |
| async def test_sequential_workflow(self): | |
| """Test sequential workflow execution""" | |
| engine = WorkflowEngine() | |
| # Create mock agents | |
| agent1 = AsyncMock() | |
| agent1.ainvoke.return_value.content = "Agent 1 response" | |
| agent1.ainvoke.return_value.additional_kwargs = {} | |
| agent2 = AsyncMock() | |
| agent2.ainvoke.return_value.content = "Agent 2 response" | |
| agent2.ainvoke.return_value.additional_kwargs = {} | |
| await engine.register_agent("agent_1", agent1) | |
| await engine.register_agent("agent_2", agent2) | |
| # Create sequential workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="sequential_workflow", | |
| name="Sequential Workflow", | |
| description="A sequential workflow", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Step 1", | |
| description="First step", | |
| agent_id="agent_1" | |
| ), | |
| WorkflowStep( | |
| step_id="step_2", | |
| name="Step 2", | |
| description="Second step", | |
| agent_id="agent_2" | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("sequential_workflow", {}) | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert "step_1" in execution.step_results | |
| assert "step_2" in execution.step_results | |
| assert execution.step_results["step_1"]["status"] == "success" | |
| assert execution.step_results["step_2"]["status"] == "success" | |
| async def test_parallel_workflow(self): | |
| """Test parallel workflow execution""" | |
| engine = WorkflowEngine() | |
| # Create mock agents | |
| agent1 = AsyncMock() | |
| agent1.ainvoke.return_value.content = "Agent 1 response" | |
| agent1.ainvoke.return_value.additional_kwargs = {} | |
| agent2 = AsyncMock() | |
| agent2.ainvoke.return_value.content = "Agent 2 response" | |
| agent2.ainvoke.return_value.additional_kwargs = {} | |
| await engine.register_agent("agent_1", agent1) | |
| await engine.register_agent("agent_2", agent2) | |
| # Create parallel workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="parallel_workflow", | |
| name="Parallel Workflow", | |
| description="A parallel workflow", | |
| workflow_type=WorkflowType.PARALLEL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Step 1", | |
| description="First step", | |
| agent_id="agent_1" | |
| ), | |
| WorkflowStep( | |
| step_id="step_2", | |
| name="Step 2", | |
| description="Second step", | |
| agent_id="agent_2" | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("parallel_workflow", {}) | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert "step_1" in execution.step_results | |
| assert "step_2" in execution.step_results | |
| class TestErrorHandling: | |
| """Test error handling""" | |
| async def test_workflow_timeout(self): | |
| """Test workflow timeout""" | |
| engine = WorkflowEngine() | |
| # Create slow agent | |
| slow_agent = AsyncMock() | |
| slow_agent.ainvoke.side_effect = lambda x: asyncio.sleep(2) | |
| await engine.register_agent("slow_agent", slow_agent) | |
| # Create workflow with short timeout | |
| workflow = WorkflowDefinition( | |
| workflow_id="timeout_workflow", | |
| name="Timeout Workflow", | |
| description="A workflow that will timeout", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Slow Step", | |
| description="A slow step", | |
| agent_id="slow_agent" | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={}, | |
| timeout=1.0 # 1 second timeout | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("timeout_workflow", {}) | |
| assert execution.status == WorkflowStatus.FAILED | |
| assert "timeout" in execution.error_message.lower() | |
| async def test_step_retry_logic(self): | |
| """Test step retry logic""" | |
| engine = WorkflowEngine() | |
| # Create agent that fails twice then succeeds | |
| failing_agent = AsyncMock() | |
| call_count = 0 | |
| async def failing_func(*args, **kwargs): | |
| nonlocal call_count | |
| call_count += 1 | |
| if call_count < 3: | |
| raise ValueError(f"Agent error {call_count}") | |
| return type('Response', (), { | |
| 'content': 'Success after retries', | |
| 'additional_kwargs': {} | |
| })() | |
| failing_agent.ainvoke.side_effect = failing_func | |
| await engine.register_agent("failing_agent", failing_agent) | |
| # Create workflow with retries | |
| workflow = WorkflowDefinition( | |
| workflow_id="retry_workflow", | |
| name="Retry Workflow", | |
| description="A workflow with retries", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Retry Step", | |
| description="A step that will retry", | |
| agent_id="failing_agent", | |
| retry_count=3, | |
| retry_delay=0.1 | |
| ) | |
| ], | |
| input_schema={}, | |
| output_schema={} | |
| ) | |
| await engine.register_workflow(workflow) | |
| execution = await engine.execute_workflow("retry_workflow", {}) | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert execution.step_results["step_1"]["status"] == "success" | |
| assert call_count == 3 # Should have been called 3 times | |
| class TestIntegration: | |
| """Integration tests""" | |
| async def test_complex_workflow_integration(self): | |
| """Test complex workflow integration""" | |
| engine = WorkflowEngine() | |
| # Create multiple agents | |
| agent1 = AsyncMock() | |
| agent1.ainvoke.return_value.content = "Agent 1 result" | |
| agent1.ainvoke.return_value.additional_kwargs = {} | |
| agent2 = AsyncMock() | |
| agent2.ainvoke.return_value.content = "Agent 2 result" | |
| agent2.ainvoke.return_value.additional_kwargs = {} | |
| tool1 = AsyncMock() | |
| tool1.ainvoke.return_value = "Tool 1 result" | |
| await engine.register_agent("agent_1", agent1) | |
| await engine.register_agent("agent_2", agent2) | |
| await engine.register_tool("tool_1", tool1) | |
| # Create complex workflow | |
| workflow = WorkflowDefinition( | |
| workflow_id="complex_workflow", | |
| name="Complex Workflow", | |
| description="A complex workflow with multiple steps", | |
| workflow_type=WorkflowType.SEQUENTIAL, | |
| steps=[ | |
| WorkflowStep( | |
| step_id="step_1", | |
| name="Agent Step", | |
| description="First agent step", | |
| agent_id="agent_1", | |
| input_mapping={"message": "input.data"}, | |
| output_mapping={"agent_result": "output.agent_result"} | |
| ), | |
| WorkflowStep( | |
| step_id="step_2", | |
| name="Tool Step", | |
| description="Tool processing step", | |
| tool_name="tool_1", | |
| input_mapping={"input": "output.agent_result"}, | |
| output_mapping={"tool_result": "output.tool_result"} | |
| ), | |
| WorkflowStep( | |
| step_id="step_3", | |
| name="Final Agent Step", | |
| description="Final agent step", | |
| agent_id="agent_2", | |
| input_mapping={"message": "output.tool_result"}, | |
| output_mapping={"final_result": "output.final_result"} | |
| ) | |
| ], | |
| input_schema={"data": "string"}, | |
| output_schema={"agent_result": "string", "tool_result": "string", "final_result": "string"} | |
| ) | |
| await engine.register_workflow(workflow) | |
| # Execute workflow | |
| execution = await engine.execute_workflow( | |
| "complex_workflow", | |
| {"data": "initial input"} | |
| ) | |
| assert execution.status == WorkflowStatus.COMPLETED | |
| assert execution.output_data["agent_result"] == "Agent 1 result" | |
| assert execution.output_data["tool_result"] == "Tool 1 result" | |
| assert execution.output_data["final_result"] == "Agent 2 result" | |
| # Verify all steps were executed | |
| assert len(execution.step_results) == 3 | |
| for step_id in ["step_1", "step_2", "step_3"]: | |
| assert step_id in execution.step_results | |
| assert execution.step_results[step_id]["status"] == "success" |