| """Tests for cli/ module.""" |
|
|
| import asyncio |
| import json |
| import os |
| from unittest.mock import AsyncMock, MagicMock, patch |
|
|
| import pytest |
|
|
| from messaging.event_parser import parse_cli_event |
|
|
| |
|
|
|
|
| class TestCLIParser: |
| """Test CLI event parsing.""" |
|
|
| def test_parse_text_content(self): |
| """Test parsing text content from assistant message.""" |
| event = { |
| "type": "assistant", |
| "message": {"content": [{"type": "text", "text": "Hello, world!"}]}, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 1 |
| assert result[0]["type"] == "text_chunk" |
| assert result[0]["text"] == "Hello, world!" |
|
|
| def test_parse_thinking_content(self): |
| """Test parsing thinking content.""" |
| event = { |
| "type": "assistant", |
| "message": { |
| "content": [{"type": "thinking", "thinking": "Let me think..."}] |
| }, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 1 |
| assert result[0]["type"] == "thinking_chunk" |
| assert ( |
| result[0]["text"] == "Let me think...\n" |
| or result[0]["text"] == "Let me think..." |
| ) |
|
|
| def test_parse_multiple_content(self): |
| """Test parsing mixed content (thinking + tools).""" |
| event = { |
| "type": "assistant", |
| "message": { |
| "content": [ |
| {"type": "thinking", "thinking": "Thinking..."}, |
| {"type": "tool_use", "name": "ls", "input": {}}, |
| ] |
| }, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 2 |
| assert result[0]["type"] == "thinking_chunk" |
| assert result[0]["text"] == "Thinking..." |
| assert result[1]["type"] == "tool_use" |
|
|
| def test_parse_tool_use(self): |
| """Test parsing tool use content.""" |
| event = { |
| "type": "assistant", |
| "message": { |
| "content": [ |
| { |
| "type": "tool_use", |
| "name": "read_file", |
| "input": {"path": "/test"}, |
| } |
| ] |
| }, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 1 |
| assert result[0]["type"] == "tool_use" |
| assert result[0]["name"] == "read_file" |
| assert result[0]["input"] == {"path": "/test"} |
|
|
| def test_parse_text_delta(self): |
| """Test parsing streaming text delta.""" |
| event = { |
| "type": "content_block_delta", |
| "index": 0, |
| "delta": {"type": "text_delta", "text": "streaming text"}, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 1 |
| assert result[0]["type"] == "text_delta" |
| assert result[0]["text"] == "streaming text" |
|
|
| def test_parse_thinking_delta(self): |
| """Test parsing streaming thinking delta.""" |
| event = { |
| "type": "content_block_delta", |
| "index": 1, |
| "delta": {"type": "thinking_delta", "thinking": "thinking..."}, |
| } |
| result = parse_cli_event(event) |
| assert len(result) == 1 |
| assert result[0]["type"] == "thinking_delta" |
| assert result[0]["text"] == "thinking..." |
|
|
| def test_parse_error(self): |
| """Test parsing error event.""" |
| event = {"type": "error", "error": {"message": "Something went wrong"}} |
| result = parse_cli_event(event) |
| assert result[0]["type"] == "error" |
| assert result[0]["message"] == "Something went wrong" |
|
|
| def test_parse_exit_success(self): |
| """Test parsing exit event with success.""" |
| event = {"type": "exit", "code": 0} |
| result = parse_cli_event(event) |
| assert result[0]["type"] == "complete" |
| assert result[0]["status"] == "success" |
|
|
| def test_parse_exit_failure(self): |
| """Test parsing exit event with failure returns error then complete.""" |
| event = {"type": "exit", "code": 1} |
| result = parse_cli_event(event) |
| |
| assert len(result) == 2 |
| assert result[0]["type"] == "error" |
| assert ( |
| "exit" in result[0]["message"].lower() |
| or "code" in result[0]["message"].lower() |
| ) |
| assert result[1]["type"] == "complete" |
| assert result[1]["status"] == "failed" |
|
|
| def test_parse_invalid_event(self): |
| """Test parsing returns empty list for unrecognized event.""" |
| result = parse_cli_event({"type": "unknown"}) |
| assert result == [] |
|
|
| def test_parse_non_dict(self): |
| """Test parsing returns empty list for non-dict input.""" |
| result = parse_cli_event("not a dict") |
| assert result == [] |
|
|
|
|
| |
|
|
|
|
| class TestCLISession: |
| """Test CLISession.""" |
|
|
| def test_session_init(self): |
| """Test CLISession initialization.""" |
| from cli.session import CLISession |
|
|
| session = CLISession( |
| workspace_path="/tmp/test", |
| api_url="http://localhost:8082/v1", |
| allowed_dirs=["/home/user/projects"], |
| ) |
| assert session.workspace == os.path.normpath(os.path.abspath("/tmp/test")) |
| assert session.api_url == "http://localhost:8082/v1" |
| assert not session.is_busy |
|
|
| def test_session_extract_session_id(self): |
| """Test session ID extraction from various event formats.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| |
| assert session._extract_session_id({"session_id": "abc123"}) == "abc123" |
| assert session._extract_session_id({"sessionId": "abc123"}) == "abc123" |
|
|
| |
| assert ( |
| session._extract_session_id({"init": {"session_id": "nested123"}}) |
| == "nested123" |
| ) |
|
|
| |
| assert ( |
| session._extract_session_id({"result": {"session_id": "res123"}}) |
| == "res123" |
| ) |
|
|
| |
| assert ( |
| session._extract_session_id({"conversation": {"id": "conv123"}}) |
| == "conv123" |
| ) |
|
|
| |
| assert session._extract_session_id({"type": "message"}) is None |
| assert session._extract_session_id("not a dict") is None |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_basic_flow(self): |
| """Test start_task running a basic command flow.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| |
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [ |
| b'{"type": "message", "content": "Hello"}\n', |
| b'{"session_id": "sess_1"}\n', |
| b"", |
| ] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
| mock_process.returncode = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| events = [e async for e in session.start_task("Hello")] |
|
|
| |
| |
| args = mock_exec.call_args[0] |
| assert args[0] == "claude" |
| assert "-p" in args |
| assert "Hello" in args |
|
|
| |
| assert ( |
| len(events) == 4 |
| ) |
| assert events[0] == {"type": "message", "content": "Hello"} |
| assert events[1] == {"type": "session_info", "session_id": "sess_1"} |
| |
| assert events[2] == {"session_id": "sess_1"} |
| assert events[3] == {"type": "exit", "code": 0, "stderr": None} |
|
|
| assert session.current_session_id == "sess_1" |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_with_session_resume(self): |
| """Test resuming an existing session.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [ |
| b"", |
| ] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| async for _ in session.start_task("Hello", session_id="sess_abc"): |
| pass |
|
|
| args = mock_exec.call_args[0] |
| assert "--resume" in args |
| assert "sess_abc" in args |
| assert "--fork-session" not in args |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_with_session_resume_and_fork(self): |
| """Test resuming an existing session and forking.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b""] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| async for _ in session.start_task( |
| "Hello", session_id="sess_abc", fork_session=True |
| ): |
| pass |
|
|
| args = mock_exec.call_args[0] |
| assert "--resume" in args |
| assert "sess_abc" in args |
| assert "--fork-session" in args |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_process_failure_with_stderr(self): |
| """Test process exit with error code and stderr output.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b""] |
| mock_process.stderr.read.return_value = b"Fatal error" |
| mock_process.wait.return_value = 1 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| events = [e async for e in session.start_task("Hello")] |
|
|
| |
| assert len(events) == 2 |
| assert events[0]["type"] == "error" |
| assert events[0]["error"]["message"] == "Fatal error" |
|
|
| assert events[1]["type"] == "exit" |
| assert events[1]["code"] == 1 |
| assert events[1]["stderr"] == "Fatal error" |
|
|
| @pytest.mark.asyncio |
| async def test_stop_session(self): |
| """Test stopping the session process.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = MagicMock() |
| mock_process.returncode = None |
| |
| mock_process.wait = AsyncMock(return_value=0) |
|
|
| session.process = mock_process |
|
|
| stopped = await session.stop() |
|
|
| assert stopped is True |
| mock_process.terminate.assert_called_once() |
| mock_process.wait.assert_called() |
|
|
| @pytest.mark.asyncio |
| async def test_stop_session_timeout_force_kill(self): |
| """Test force kill if terminate times out.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = MagicMock() |
| mock_process.returncode = None |
|
|
| |
| async def wait_side_effect(): |
| if not mock_process.kill.called: |
| await asyncio.sleep(6) |
| return 0 |
|
|
| |
| mock_process.wait = AsyncMock(side_effect=[asyncio.TimeoutError, 0]) |
|
|
| session.process = mock_process |
|
|
| stopped = await session.stop() |
|
|
| assert stopped is True |
| mock_process.terminate.assert_called() |
| mock_process.kill.assert_called() |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_split_buffer(self): |
| """Test handling of JSON split across chunks.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| |
| mock_process.stdout.read.side_effect = [ |
| b'{"type": "mess', |
| b'age", "content": "Split"}\n', |
| b"", |
| ] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| events = [ |
| e async for e in session.start_task("test") if e["type"] == "message" |
| ] |
|
|
| assert len(events) == 1 |
| assert events[0]["content"] == "Split" |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_remnant_buffer(self): |
| """Test handling of buffer remnant at EOF (no newline at end).""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [ |
| b'{"type": "message", "content": "Remnant"}', |
| b"", |
| ] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| events = [ |
| e async for e in session.start_task("test") if e["type"] == "message" |
| ] |
|
|
| assert len(events) == 1 |
| assert events[0]["content"] == "Remnant" |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_non_v1_url(self): |
| """Test start_task with a non-v1 URL.""" |
| from cli.session import CLISession |
|
|
| |
| session = CLISession("/tmp", "http://localhost:8082") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b""] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
| async for _ in session.start_task("test"): |
| pass |
|
|
| |
| kwargs = mock_exec.call_args[1] |
| env = kwargs["env"] |
| assert env["ANTHROPIC_BASE_URL"] == "http://localhost:8082" |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_allowed_dirs(self): |
| """Test start_task includes allowed dirs in command.""" |
| from cli.session import CLISession |
|
|
| session = CLISession( |
| "/tmp", "http://localhost:8082/v1", allowed_dirs=["/dir1", "/dir2"] |
| ) |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b""] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
| async for _ in session.start_task("test"): |
| pass |
|
|
| cmd = mock_exec.call_args[0] |
| assert "--add-dir" in cmd |
| assert os.path.normpath("/dir1") in cmd |
| assert os.path.normpath("/dir2") in cmd |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_plans_directory(self): |
| """Test start_task includes --settings plansDirectory when plans_directory set.""" |
| from cli.session import CLISession |
|
|
| session = CLISession( |
| "/tmp", |
| "http://localhost:8082/v1", |
| plans_directory="./agent_workspace/plans", |
| ) |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b""] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
| async for _ in session.start_task("test"): |
| pass |
|
|
| cmd = mock_exec.call_args[0] |
| assert "--settings" in cmd |
| settings_idx = cmd.index("--settings") |
| assert settings_idx + 1 < len(cmd) |
| settings = json.loads(cmd[settings_idx + 1]) |
| assert settings["plansDirectory"] == "./agent_workspace/plans" |
|
|
| @pytest.mark.asyncio |
| async def test_start_task_json_error(self): |
| """Test handling of non-JSON output from CLI.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = AsyncMock() |
| mock_process.stdout.read.side_effect = [b"Not valid json\n", b""] |
| mock_process.stderr.read.return_value = b"" |
| mock_process.wait.return_value = 0 |
|
|
| with patch( |
| "asyncio.create_subprocess_exec", new_callable=AsyncMock |
| ) as mock_exec: |
| mock_exec.return_value = mock_process |
|
|
| events = [e async for e in session.start_task("test") if e["type"] == "raw"] |
|
|
| assert len(events) == 1 |
| assert events[0]["content"] == "Not valid json" |
|
|
| @pytest.mark.asyncio |
| async def test_stop_exception(self): |
| """Test exception handling during stop.""" |
| from cli.session import CLISession |
|
|
| session = CLISession("/tmp", "http://localhost:8082/v1") |
|
|
| mock_process = MagicMock() |
| mock_process.returncode = None |
| |
| mock_process.terminate.side_effect = RuntimeError("Permission denied") |
|
|
| session.process = mock_process |
|
|
| stopped = await session.stop() |
| assert stopped is False |
|
|
|
|
| class TestCLISessionManager: |
| """Test CLISessionManager.""" |
|
|
| @pytest.mark.asyncio |
| async def test_manager_create_session(self): |
| """Test creating a new session.""" |
| from cli.manager import CLISessionManager |
|
|
| manager = CLISessionManager( |
| workspace_path="/tmp/test", |
| api_url="http://localhost:8082/v1", |
| ) |
|
|
| session, sid, is_new = await manager.get_or_create_session() |
| assert session is not None |
| assert sid.startswith("pending_") |
| assert is_new is True |
|
|
| @pytest.mark.asyncio |
| async def test_manager_reuse_session(self): |
| """Test reusing an existing session.""" |
| from cli.manager import CLISessionManager |
|
|
| manager = CLISessionManager( |
| workspace_path="/tmp/test", |
| api_url="http://localhost:8082/v1", |
| ) |
|
|
| |
| s1, sid1, _is_new1 = await manager.get_or_create_session() |
|
|
| |
| s2, _sid2, is_new2 = await manager.get_or_create_session(session_id=sid1) |
|
|
| assert s1 is s2 |
| assert is_new2 is False |
|
|
| @pytest.mark.asyncio |
| async def test_manager_stats(self): |
| """Test manager stats.""" |
| from cli.manager import CLISessionManager |
|
|
| manager = CLISessionManager( |
| workspace_path="/tmp/test", |
| api_url="http://localhost:8082/v1", |
| ) |
|
|
| stats = manager.get_stats() |
| assert stats["active_sessions"] == 0 |
| assert stats["pending_sessions"] == 0 |
|
|