Spaces:
Paused
Paused
| """Tests for user-defined quick commands that bypass the agent loop.""" | |
| import subprocess | |
| from unittest.mock import MagicMock, patch, AsyncMock | |
| from rich.text import Text | |
| import pytest | |
| # ── CLI tests ────────────────────────────────────────────────────────────── | |
| class TestCLIQuickCommands: | |
| """Test quick command dispatch in HermesCLI.process_command.""" | |
| def _printed_plain(call_arg): | |
| if isinstance(call_arg, Text): | |
| return call_arg.plain | |
| return str(call_arg) | |
| def _make_cli(self, quick_commands): | |
| from cli import HermesCLI | |
| cli = HermesCLI.__new__(HermesCLI) | |
| cli.config = {"quick_commands": quick_commands} | |
| cli.console = MagicMock() | |
| cli.agent = None | |
| cli.conversation_history = [] | |
| return cli | |
| def test_exec_command_runs_and_prints_output(self): | |
| cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}}) | |
| result = cli.process_command("/dn") | |
| assert result is True | |
| cli.console.print.assert_called_once() | |
| printed = self._printed_plain(cli.console.print.call_args[0][0]) | |
| assert printed == "daily-note" | |
| def test_exec_command_stderr_shown_on_no_stdout(self): | |
| cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}}) | |
| result = cli.process_command("/err") | |
| assert result is True | |
| # stderr fallback — should print something | |
| cli.console.print.assert_called_once() | |
| def test_exec_command_no_output_shows_fallback(self): | |
| cli = self._make_cli({"empty": {"type": "exec", "command": "true"}}) | |
| cli.process_command("/empty") | |
| cli.console.print.assert_called_once() | |
| args = cli.console.print.call_args[0][0] | |
| assert "no output" in args.lower() | |
| def test_alias_command_routes_to_target(self): | |
| """Alias quick commands rewrite to the target command.""" | |
| cli = self._make_cli({"shortcut": {"type": "alias", "target": "/help"}}) | |
| with patch.object(cli, "process_command", wraps=cli.process_command) as spy: | |
| cli.process_command("/shortcut") | |
| # Should recursively call process_command with /help | |
| spy.assert_any_call("/help") | |
| def test_alias_command_passes_args(self): | |
| """Alias quick commands forward user arguments to the target.""" | |
| cli = self._make_cli({"sc": {"type": "alias", "target": "/context"}}) | |
| with patch.object(cli, "process_command", wraps=cli.process_command) as spy: | |
| cli.process_command("/sc some args") | |
| spy.assert_any_call("/context some args") | |
| def test_alias_no_target_shows_error(self): | |
| cli = self._make_cli({"broken": {"type": "alias", "target": ""}}) | |
| cli.process_command("/broken") | |
| cli.console.print.assert_called_once() | |
| args = cli.console.print.call_args[0][0] | |
| assert "no target defined" in args.lower() | |
| def test_unsupported_type_shows_error(self): | |
| cli = self._make_cli({"bad": {"type": "prompt", "command": "echo hi"}}) | |
| cli.process_command("/bad") | |
| cli.console.print.assert_called_once() | |
| args = cli.console.print.call_args[0][0] | |
| assert "unsupported type" in args.lower() | |
| def test_missing_command_field_shows_error(self): | |
| cli = self._make_cli({"oops": {"type": "exec"}}) | |
| cli.process_command("/oops") | |
| cli.console.print.assert_called_once() | |
| args = cli.console.print.call_args[0][0] | |
| assert "no command defined" in args.lower() | |
| def test_quick_command_takes_priority_over_skill_commands(self): | |
| """Quick commands must be checked before skill slash commands.""" | |
| cli = self._make_cli({"mygif": {"type": "exec", "command": "echo overridden"}}) | |
| with patch("cli._skill_commands", {"/mygif": {"name": "gif-search"}}): | |
| cli.process_command("/mygif") | |
| cli.console.print.assert_called_once() | |
| printed = self._printed_plain(cli.console.print.call_args[0][0]) | |
| assert printed == "overridden" | |
| def test_unknown_command_still_shows_error(self): | |
| cli = self._make_cli({}) | |
| with patch("cli._cprint") as mock_cprint: | |
| cli.process_command("/nonexistent") | |
| mock_cprint.assert_called() | |
| printed = " ".join(str(c) for c in mock_cprint.call_args_list) | |
| assert "unknown command" in printed.lower() | |
| def test_timeout_shows_error(self): | |
| cli = self._make_cli({"slow": {"type": "exec", "command": "sleep 100"}}) | |
| with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("sleep", 30)): | |
| cli.process_command("/slow") | |
| cli.console.print.assert_called_once() | |
| args = cli.console.print.call_args[0][0] | |
| assert "timed out" in args.lower() | |
| # ── Gateway tests ────────────────────────────────────────────────────────── | |
| class TestGatewayQuickCommands: | |
| """Test quick command dispatch in GatewayRunner._handle_message.""" | |
| def _make_event(self, command, args=""): | |
| event = MagicMock() | |
| event.get_command.return_value = command | |
| event.get_command_args.return_value = args | |
| event.text = f"/{command} {args}".strip() | |
| event.source = MagicMock() | |
| event.source.user_id = "test_user" | |
| event.source.user_name = "Test User" | |
| event.source.platform.value = "telegram" | |
| event.source.chat_type = "dm" | |
| event.source.chat_id = "123" | |
| return event | |
| async def test_exec_command_returns_output(self): | |
| from gateway.run import GatewayRunner | |
| runner = GatewayRunner.__new__(GatewayRunner) | |
| runner.config = {"quick_commands": {"limits": {"type": "exec", "command": "echo ok"}}} | |
| runner._running_agents = {} | |
| runner._pending_messages = {} | |
| runner._is_user_authorized = MagicMock(return_value=True) | |
| event = self._make_event("limits") | |
| result = await runner._handle_message(event) | |
| assert result == "ok" | |
| async def test_unsupported_type_returns_error(self): | |
| from gateway.run import GatewayRunner | |
| runner = GatewayRunner.__new__(GatewayRunner) | |
| runner.config = {"quick_commands": {"bad": {"type": "prompt", "command": "echo hi"}}} | |
| runner._running_agents = {} | |
| runner._pending_messages = {} | |
| runner._is_user_authorized = MagicMock(return_value=True) | |
| event = self._make_event("bad") | |
| result = await runner._handle_message(event) | |
| assert result is not None | |
| assert "unsupported type" in result.lower() | |
| async def test_timeout_returns_error(self): | |
| from gateway.run import GatewayRunner | |
| import asyncio | |
| runner = GatewayRunner.__new__(GatewayRunner) | |
| runner.config = {"quick_commands": {"slow": {"type": "exec", "command": "sleep 100"}}} | |
| runner._running_agents = {} | |
| runner._pending_messages = {} | |
| runner._is_user_authorized = MagicMock(return_value=True) | |
| event = self._make_event("slow") | |
| with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): | |
| result = await runner._handle_message(event) | |
| assert result is not None | |
| assert "timed out" in result.lower() | |
| async def test_gateway_config_object_supports_quick_commands(self): | |
| from gateway.config import GatewayConfig | |
| from gateway.run import GatewayRunner | |
| runner = GatewayRunner.__new__(GatewayRunner) | |
| runner.config = GatewayConfig( | |
| quick_commands={"limits": {"type": "exec", "command": "echo ok"}} | |
| ) | |
| runner._running_agents = {} | |
| runner._pending_messages = {} | |
| runner._is_user_authorized = MagicMock(return_value=True) | |
| event = self._make_event("limits") | |
| result = await runner._handle_message(event) | |
| assert result == "ok" | |