Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| from unittest.mock import patch | |
| import pytest | |
| from swarm_models import OpenAIChat | |
| from swarms.structs.agent import Agent | |
| from swarms.structs.sequential_workflow import ( | |
| SequentialWorkflow, | |
| Task, | |
| ) | |
| # Mock the OpenAI API key using environment variables | |
| os.environ["OPENAI_API_KEY"] = "mocked_api_key" | |
| # Mock OpenAIChat class for testing | |
| class MockOpenAIChat: | |
| def __init__(self, *args, **kwargs): | |
| pass | |
| def run(self, *args, **kwargs): | |
| return "Mocked result" | |
| # Mock Agent class for testing | |
| class MockAgent: | |
| def __init__(self, *args, **kwargs): | |
| pass | |
| def run(self, *args, **kwargs): | |
| return "Mocked result" | |
| # Mock SequentialWorkflow class for testing | |
| class MockSequentialWorkflow: | |
| def __init__(self, *args, **kwargs): | |
| pass | |
| def add(self, *args, **kwargs): | |
| pass | |
| def run(self): | |
| pass | |
| # Test Task class | |
| def test_task_initialization(): | |
| description = "Sample Task" | |
| agent = MockOpenAIChat() | |
| task = Task(description=description, agent=agent) | |
| assert task.description == description | |
| assert task.agent == agent | |
| def test_task_execute(): | |
| description = "Sample Task" | |
| agent = MockOpenAIChat() | |
| task = Task(description=description, agent=agent) | |
| task.run() | |
| assert task.result == "Mocked result" | |
| # Test SequentialWorkflow class | |
| def test_sequential_workflow_initialization(): | |
| workflow = SequentialWorkflow() | |
| assert isinstance(workflow, SequentialWorkflow) | |
| assert len(workflow.tasks) == 0 | |
| assert workflow.max_loops == 1 | |
| assert workflow.autosave is False | |
| assert ( | |
| workflow.saved_state_filepath | |
| == "sequential_workflow_state.json" | |
| ) | |
| assert workflow.restore_state_filepath is None | |
| assert workflow.dashboard is False | |
| def test_sequential_workflow_add_task(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| assert len(workflow.tasks) == 1 | |
| assert workflow.tasks[0].description == task_description | |
| assert workflow.tasks[0].agent == task_flow | |
| def test_sequential_workflow_reset_workflow(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.reset_workflow() | |
| assert workflow.tasks[0].result is None | |
| def test_sequential_workflow_get_task_results(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| results = workflow.get_task_results() | |
| assert len(results) == 1 | |
| assert task_description in results | |
| assert results[task_description] == "Mocked result" | |
| def test_sequential_workflow_remove_task(): | |
| workflow = SequentialWorkflow() | |
| task1_description = "Task 1" | |
| task2_description = "Task 2" | |
| task1_flow = MockOpenAIChat() | |
| task2_flow = MockOpenAIChat() | |
| workflow.add(task1_description, task1_flow) | |
| workflow.add(task2_description, task2_flow) | |
| workflow.remove_task(task1_description) | |
| assert len(workflow.tasks) == 1 | |
| assert workflow.tasks[0].description == task2_description | |
| def test_sequential_workflow_update_task(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.update_task(task_description, max_tokens=1000) | |
| assert workflow.tasks[0].kwargs["max_tokens"] == 1000 | |
| def test_sequential_workflow_save_workflow_state(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.save_workflow_state("test_state.json") | |
| assert os.path.exists("test_state.json") | |
| os.remove("test_state.json") | |
| def test_sequential_workflow_load_workflow_state(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.save_workflow_state("test_state.json") | |
| workflow.load_workflow_state("test_state.json") | |
| assert len(workflow.tasks) == 1 | |
| assert workflow.tasks[0].description == task_description | |
| os.remove("test_state.json") | |
| def test_sequential_workflow_run(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockOpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| assert workflow.tasks[0].result == "Mocked result" | |
| def test_sequential_workflow_workflow_bootup(capfd): | |
| workflow = SequentialWorkflow() | |
| workflow.workflow_bootup() | |
| out, _ = capfd.readouterr() | |
| assert "Sequential Workflow Initializing..." in out | |
| def test_sequential_workflow_workflow_dashboard(capfd): | |
| workflow = SequentialWorkflow() | |
| workflow.workflow_dashboard() | |
| out, _ = capfd.readouterr() | |
| assert "Sequential Workflow Dashboard" in out | |
| # Mock Agent class for async testing | |
| class MockAsyncAgent: | |
| def __init__(self, *args, **kwargs): | |
| pass | |
| async def arun(self, *args, **kwargs): | |
| return "Mocked result" | |
| # Test async execution in SequentialWorkflow | |
| async def test_sequential_workflow_arun(): | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = MockAsyncAgent() | |
| workflow.add(task_description, task_flow) | |
| await workflow.arun() | |
| assert workflow.tasks[0].result == "Mocked result" | |
| def test_real_world_usage_with_openai_key(): | |
| # Initialize the language model | |
| llm = OpenAIChat() | |
| assert isinstance(llm, OpenAIChat) | |
| def test_real_world_usage_with_flow_and_openai_key(): | |
| # Initialize a agent with the language model | |
| agent = Agent(llm=OpenAIChat()) | |
| assert isinstance(agent, Agent) | |
| def test_real_world_usage_with_sequential_workflow(): | |
| # Initialize a sequential workflow | |
| workflow = SequentialWorkflow() | |
| assert isinstance(workflow, SequentialWorkflow) | |
| def test_real_world_usage_add_tasks(): | |
| # Create a sequential workflow and add tasks | |
| workflow = SequentialWorkflow() | |
| task1_description = "Task 1" | |
| task2_description = "Task 2" | |
| task1_flow = OpenAIChat() | |
| task2_flow = OpenAIChat() | |
| workflow.add(task1_description, task1_flow) | |
| workflow.add(task2_description, task2_flow) | |
| assert len(workflow.tasks) == 2 | |
| assert workflow.tasks[0].description == task1_description | |
| assert workflow.tasks[1].description == task2_description | |
| def test_real_world_usage_run_workflow(): | |
| # Create a sequential workflow, add a task, and run the workflow | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = OpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| assert workflow.tasks[0].result is not None | |
| def test_real_world_usage_dashboard_display(): | |
| # Create a sequential workflow, add tasks, and display the dashboard | |
| workflow = SequentialWorkflow() | |
| task1_description = "Task 1" | |
| task2_description = "Task 2" | |
| task1_flow = OpenAIChat() | |
| task2_flow = OpenAIChat() | |
| workflow.add(task1_description, task1_flow) | |
| workflow.add(task2_description, task2_flow) | |
| with patch("builtins.print") as mock_print: | |
| workflow.workflow_dashboard() | |
| mock_print.assert_called() | |
| def test_real_world_usage_async_execution(): | |
| # Create a sequential workflow, add an async task, and run the workflow asynchronously | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| async_task_flow = OpenAIChat() | |
| async def async_run_workflow(): | |
| await workflow.arun() | |
| workflow.add(task_description, async_task_flow) | |
| asyncio.run(async_run_workflow()) | |
| assert workflow.tasks[0].result is not None | |
| def test_real_world_usage_multiple_loops(): | |
| # Create a sequential workflow with multiple loops, add a task, and run the workflow | |
| workflow = SequentialWorkflow(max_loops=3) | |
| task_description = "Sample Task" | |
| task_flow = OpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| assert workflow.tasks[0].result is not None | |
| def test_real_world_usage_autosave_state(): | |
| # Create a sequential workflow with autosave, add a task, run the workflow, and check if state is saved | |
| workflow = SequentialWorkflow(autosave=True) | |
| task_description = "Sample Task" | |
| task_flow = OpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| assert workflow.tasks[0].result is not None | |
| assert os.path.exists("sequential_workflow_state.json") | |
| os.remove("sequential_workflow_state.json") | |
| def test_real_world_usage_load_state(): | |
| # Create a sequential workflow, add a task, save state, load state, and run the workflow | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = OpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.run() | |
| workflow.save_workflow_state("test_state.json") | |
| workflow.load_workflow_state("test_state.json") | |
| workflow.run() | |
| assert workflow.tasks[0].result is not None | |
| os.remove("test_state.json") | |
| def test_real_world_usage_update_task_args(): | |
| # Create a sequential workflow, add a task, and update task arguments | |
| workflow = SequentialWorkflow() | |
| task_description = "Sample Task" | |
| task_flow = OpenAIChat() | |
| workflow.add(task_description, task_flow) | |
| workflow.update_task(task_description, max_tokens=1000) | |
| assert workflow.tasks[0].kwargs["max_tokens"] == 1000 | |
| def test_real_world_usage_remove_task(): | |
| # Create a sequential workflow, add tasks, remove a task, and run the workflow | |
| workflow = SequentialWorkflow() | |
| task1_description = "Task 1" | |
| task2_description = "Task 2" | |
| task1_flow = OpenAIChat() | |
| task2_flow = OpenAIChat() | |
| workflow.add(task1_description, task1_flow) | |
| workflow.add(task2_description, task2_flow) | |
| workflow.remove_task(task1_description) | |
| workflow.run() | |
| assert len(workflow.tasks) == 1 | |
| assert workflow.tasks[0].description == task2_description | |
| def test_real_world_usage_with_environment_variables(): | |
| # Ensure that the OpenAI API key is set using environment variables | |
| assert "OPENAI_API_KEY" in os.environ | |
| assert os.environ["OPENAI_API_KEY"] == "mocked_api_key" | |
| del os.environ["OPENAI_API_KEY"] # Clean up after the test | |
| def test_real_world_usage_no_openai_key(): | |
| # Ensure that an exception is raised when the OpenAI API key is not set | |
| with pytest.raises(ValueError): | |
| OpenAIChat() # API key not provided, should raise an exception | |