| """ |
| Tests for environments/agent_loop.py β HermesAgentLoop. |
| |
| Tests the multi-turn agent engine using mocked servers, without needing |
| real API keys or running servers. |
| """ |
|
|
| import asyncio |
| import json |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
| from unittest.mock import MagicMock |
|
|
| import pytest |
|
|
| |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| try: |
| from environments.agent_loop import ( |
| AgentResult, |
| HermesAgentLoop, |
| ToolError, |
| _extract_reasoning_from_message, |
| resize_tool_pool, |
| ) |
| except ImportError: |
| pytest.skip("atroposlib not installed", allow_module_level=True) |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class MockFunction: |
| name: str |
| arguments: str |
|
|
|
|
| @dataclass |
| class MockToolCall: |
| id: str |
| function: MockFunction |
| type: str = "function" |
|
|
|
|
| @dataclass |
| class MockMessage: |
| content: Optional[str] |
| role: str = "assistant" |
| tool_calls: Optional[List[MockToolCall]] = None |
| reasoning_content: Optional[str] = None |
| reasoning: Optional[str] = None |
| reasoning_details: Optional[list] = None |
|
|
|
|
| @dataclass |
| class MockChoice: |
| message: MockMessage |
| finish_reason: str = "stop" |
| index: int = 0 |
|
|
|
|
| @dataclass |
| class MockChatCompletion: |
| choices: List[MockChoice] |
| id: str = "chatcmpl-mock" |
| model: str = "mock-model" |
|
|
|
|
| class MockServer: |
| """ |
| Mock server that returns pre-configured responses in sequence. |
| Mimics the chat_completion() interface. |
| """ |
|
|
| def __init__(self, responses: List[MockChatCompletion]): |
| self.responses = responses |
| self.call_count = 0 |
| self.call_history: List[Dict[str, Any]] = [] |
|
|
| async def chat_completion(self, **kwargs) -> MockChatCompletion: |
| self.call_history.append(kwargs) |
| if self.call_count >= len(self.responses): |
| |
| return MockChatCompletion( |
| choices=[MockChoice(message=MockMessage(content="Done."))] |
| ) |
| resp = self.responses[self.call_count] |
| self.call_count += 1 |
| return resp |
|
|
|
|
| def make_text_response(content: str) -> MockChatCompletion: |
| """Create a simple text-only response (no tool calls).""" |
| return MockChatCompletion( |
| choices=[MockChoice(message=MockMessage(content=content))] |
| ) |
|
|
|
|
| def make_tool_response( |
| tool_name: str, |
| arguments: dict, |
| content: str = "", |
| tool_call_id: str = "call_001", |
| ) -> MockChatCompletion: |
| """Create a response with a single tool call.""" |
| return MockChatCompletion( |
| choices=[ |
| MockChoice( |
| message=MockMessage( |
| content=content, |
| tool_calls=[ |
| MockToolCall( |
| id=tool_call_id, |
| function=MockFunction( |
| name=tool_name, |
| arguments=json.dumps(arguments), |
| ), |
| ) |
| ], |
| ), |
| finish_reason="tool_calls", |
| ) |
| ] |
| ) |
|
|
|
|
| |
|
|
|
|
| class TestAgentResult: |
| def test_defaults(self): |
| result = AgentResult(messages=[]) |
| assert result.messages == [] |
| assert result.managed_state is None |
| assert result.turns_used == 0 |
| assert result.finished_naturally is False |
| assert result.reasoning_per_turn == [] |
| assert result.tool_errors == [] |
|
|
|
|
| class TestExtractReasoning: |
| def test_reasoning_content_field(self): |
| msg = MockMessage(content="hello", reasoning_content="I think...") |
| assert _extract_reasoning_from_message(msg) == "I think..." |
|
|
| def test_reasoning_field(self): |
| msg = MockMessage(content="hello", reasoning="Let me consider...") |
| assert _extract_reasoning_from_message(msg) == "Let me consider..." |
|
|
| def test_reasoning_details(self): |
| detail = MagicMock() |
| detail.text = "Detail reasoning" |
| msg = MockMessage(content="hello", reasoning_details=[detail]) |
| assert _extract_reasoning_from_message(msg) == "Detail reasoning" |
|
|
| def test_reasoning_details_dict_format(self): |
| msg = MockMessage( |
| content="hello", |
| reasoning_details=[{"text": "Dict reasoning"}], |
| ) |
| assert _extract_reasoning_from_message(msg) == "Dict reasoning" |
|
|
| def test_no_reasoning(self): |
| msg = MockMessage(content="hello") |
| assert _extract_reasoning_from_message(msg) is None |
|
|
| def test_reasoning_content_takes_priority(self): |
| msg = MockMessage( |
| content="hello", |
| reasoning_content="First", |
| reasoning="Second", |
| ) |
| assert _extract_reasoning_from_message(msg) == "First" |
|
|
|
|
| class TestHermesAgentLoop: |
| """Test the agent loop with mock servers.""" |
|
|
| @pytest.fixture |
| def basic_tools(self): |
| """Minimal tool schema for testing.""" |
| return [ |
| { |
| "type": "function", |
| "function": { |
| "name": "terminal", |
| "description": "Run a command", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "command": { |
| "type": "string", |
| "description": "Command to run", |
| } |
| }, |
| "required": ["command"], |
| }, |
| }, |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "read_file", |
| "description": "Read a file", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "path": {"type": "string"}, |
| }, |
| "required": ["path"], |
| }, |
| }, |
| }, |
| ] |
|
|
| @pytest.fixture |
| def valid_names(self): |
| return {"terminal", "read_file", "todo"} |
|
|
| @pytest.mark.asyncio |
| async def test_simple_text_response(self, basic_tools, valid_names): |
| """Model responds with text only, no tool calls.""" |
| server = MockServer([make_text_response("Hello! How can I help?")]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| result = await agent.run(messages) |
|
|
| assert result.finished_naturally is True |
| assert result.turns_used == 1 |
| assert len(result.messages) >= 2 |
| assert result.messages[-1]["role"] == "assistant" |
| assert result.messages[-1]["content"] == "Hello! How can I help?" |
|
|
| @pytest.mark.asyncio |
| async def test_tool_call_then_text(self, basic_tools, valid_names): |
| """Model calls a tool, then responds with text.""" |
| server = MockServer([ |
| make_tool_response("todo", {"todos": [{"id": "1", "content": "test", "status": "pending"}]}), |
| make_text_response("I created a todo for you."), |
| ]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Create a todo"}] |
| result = await agent.run(messages) |
|
|
| assert result.finished_naturally is True |
| assert result.turns_used == 2 |
| |
| roles = [m["role"] for m in result.messages] |
| assert roles == ["user", "assistant", "tool", "assistant"] |
|
|
| @pytest.mark.asyncio |
| async def test_max_turns_reached(self, basic_tools, valid_names): |
| """Model keeps calling tools until max_turns is hit.""" |
| |
| responses = [ |
| make_tool_response("todo", {"todos": [{"id": str(i), "content": f"task {i}", "status": "pending"}]}, tool_call_id=f"call_{i}") |
| for i in range(10) |
| ] |
| server = MockServer(responses) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=3, |
| ) |
| messages = [{"role": "user", "content": "Keep going"}] |
| result = await agent.run(messages) |
|
|
| assert result.finished_naturally is False |
| assert result.turns_used == 3 |
|
|
| @pytest.mark.asyncio |
| async def test_unknown_tool_name(self, basic_tools, valid_names): |
| """Model calls a tool not in valid_tool_names.""" |
| server = MockServer([ |
| make_tool_response("nonexistent_tool", {"arg": "val"}), |
| make_text_response("OK, that didn't work."), |
| ]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Call something weird"}] |
| result = await agent.run(messages) |
|
|
| |
| assert len(result.tool_errors) >= 1 |
| assert result.tool_errors[0].tool_name == "nonexistent_tool" |
|
|
| @pytest.mark.asyncio |
| async def test_empty_response(self, basic_tools, valid_names): |
| """Server returns empty response.""" |
| server = MockServer([MockChatCompletion(choices=[])]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| result = await agent.run(messages) |
|
|
| assert result.finished_naturally is False |
| assert result.turns_used == 1 |
|
|
| @pytest.mark.asyncio |
| async def test_api_error_handling(self, basic_tools, valid_names): |
| """Server raises an exception.""" |
|
|
| class FailingServer: |
| async def chat_completion(self, **kwargs): |
| raise ConnectionError("Server unreachable") |
|
|
| agent = HermesAgentLoop( |
| server=FailingServer(), |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| result = await agent.run(messages) |
|
|
| assert result.finished_naturally is False |
| assert result.turns_used == 1 |
|
|
| @pytest.mark.asyncio |
| async def test_tools_passed_to_server(self, basic_tools, valid_names): |
| """Verify tools are passed in the chat_completion kwargs.""" |
| server = MockServer([make_text_response("OK")]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| await agent.run(messages) |
|
|
| assert len(server.call_history) == 1 |
| assert "tools" in server.call_history[0] |
| assert server.call_history[0]["tools"] == basic_tools |
|
|
| @pytest.mark.asyncio |
| async def test_extra_body_forwarded(self, basic_tools, valid_names): |
| """extra_body should be forwarded to server.""" |
| extra = {"provider": {"ignore": ["DeepInfra"]}} |
| server = MockServer([make_text_response("OK")]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| extra_body=extra, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| await agent.run(messages) |
|
|
| assert server.call_history[0].get("extra_body") == extra |
|
|
| @pytest.mark.asyncio |
| async def test_managed_state_returned(self, basic_tools, valid_names): |
| """If server has get_state(), result should include managed_state.""" |
| server = MockServer([make_text_response("OK")]) |
| server.get_state = lambda: {"nodes": [{"test": True}]} |
|
|
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| result = await agent.run(messages) |
|
|
| assert result.managed_state is not None |
| assert "nodes" in result.managed_state |
|
|
| @pytest.mark.asyncio |
| async def test_no_managed_state_without_get_state(self, basic_tools, valid_names): |
| """Regular server without get_state() should return None managed_state.""" |
| server = MockServer([make_text_response("OK")]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Hi"}] |
| result = await agent.run(messages) |
|
|
| assert result.managed_state is None |
|
|
| @pytest.mark.asyncio |
| async def test_memory_tool_blocked(self, basic_tools): |
| """Memory tool should return error in RL environments.""" |
| valid = {"terminal", "read_file", "todo", "memory"} |
| server = MockServer([ |
| make_tool_response("memory", {"action": "add", "target": "user", "content": "test"}), |
| make_text_response("Done"), |
| ]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Remember this"}] |
| result = await agent.run(messages) |
|
|
| |
| tool_msgs = [m for m in result.messages if m["role"] == "tool"] |
| assert len(tool_msgs) >= 1 |
| tool_result = json.loads(tool_msgs[0]["content"]) |
| assert "error" in tool_result |
| assert "not available" in tool_result["error"].lower() |
|
|
| @pytest.mark.asyncio |
| async def test_session_search_blocked(self, basic_tools): |
| """session_search should return error in RL environments.""" |
| valid = {"terminal", "read_file", "todo", "session_search"} |
| server = MockServer([ |
| make_tool_response("session_search", {"query": "test"}), |
| make_text_response("Done"), |
| ]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "Search sessions"}] |
| result = await agent.run(messages) |
|
|
| tool_msgs = [m for m in result.messages if m["role"] == "tool"] |
| assert len(tool_msgs) >= 1 |
| tool_result = json.loads(tool_msgs[0]["content"]) |
| assert "error" in tool_result |
|
|
| @pytest.mark.asyncio |
| async def test_reasoning_content_preserved(self, basic_tools, valid_names): |
| """Reasoning content should be extracted and preserved.""" |
| resp = MockChatCompletion( |
| choices=[ |
| MockChoice( |
| message=MockMessage( |
| content="The answer is 42.", |
| reasoning_content="Let me think about this step by step...", |
| ) |
| ) |
| ] |
| ) |
| server = MockServer([resp]) |
| agent = HermesAgentLoop( |
| server=server, |
| tool_schemas=basic_tools, |
| valid_tool_names=valid_names, |
| max_turns=10, |
| ) |
| messages = [{"role": "user", "content": "What is the meaning of life?"}] |
| result = await agent.run(messages) |
|
|
| assert len(result.reasoning_per_turn) == 1 |
| assert result.reasoning_per_turn[0] == "Let me think about this step by step..." |
|
|
|
|
| class TestResizeToolPool: |
| def test_resize_works(self): |
| """resize_tool_pool should not raise.""" |
| resize_tool_pool(16) |
| resize_tool_pool(128) |
|
|
| def test_resize_shuts_down_previous_executor(self, monkeypatch): |
| """Replacing the global tool executor should shut down the old pool.""" |
| import environments.agent_loop as agent_loop_module |
|
|
| old_executor = MagicMock() |
| new_executor = MagicMock() |
|
|
| monkeypatch.setattr(agent_loop_module, "_tool_executor", old_executor) |
| monkeypatch.setattr( |
| agent_loop_module.concurrent.futures, |
| "ThreadPoolExecutor", |
| MagicMock(return_value=new_executor), |
| ) |
|
|
| resize_tool_pool(16) |
|
|
| old_executor.shutdown.assert_called_once_with(wait=False) |
| assert agent_loop_module._tool_executor is new_executor |
|
|