| """Regression tests for CLI fresh-session commands.""" |
|
|
| from __future__ import annotations |
|
|
| import importlib |
| import os |
| import sys |
| from datetime import timedelta |
| from unittest.mock import MagicMock, patch |
|
|
| from hermes_state import SessionDB |
| from tools.todo_tool import TodoStore |
|
|
|
|
| class _FakeCompressor: |
| """Minimal stand-in for ContextCompressor.""" |
|
|
| def __init__(self): |
| self.last_prompt_tokens = 500 |
| self.last_completion_tokens = 200 |
| self.last_total_tokens = 700 |
| self.compression_count = 3 |
| self._context_probed = True |
|
|
|
|
| class _FakeAgent: |
| def __init__(self, session_id: str, session_start): |
| self.session_id = session_id |
| self.session_start = session_start |
| self.model = "anthropic/claude-opus-4.6" |
| self._last_flushed_db_idx = 7 |
| self._todo_store = TodoStore() |
| self._todo_store.write( |
| [{"id": "t1", "content": "unfinished task", "status": "in_progress"}] |
| ) |
| self.flush_memories = MagicMock() |
| self._invalidate_system_prompt = MagicMock() |
|
|
| |
| self.session_total_tokens = 1000 |
| self.session_input_tokens = 600 |
| self.session_output_tokens = 400 |
| self.session_prompt_tokens = 550 |
| self.session_completion_tokens = 350 |
| self.session_cache_read_tokens = 100 |
| self.session_cache_write_tokens = 50 |
| self.session_reasoning_tokens = 80 |
| self.session_api_calls = 5 |
| self.session_estimated_cost_usd = 0.42 |
| self.session_cost_status = "estimated" |
| self.session_cost_source = "openrouter" |
| self.context_compressor = _FakeCompressor() |
|
|
| def reset_session_state(self): |
| """Mirror the real AIAgent.reset_session_state().""" |
| self.session_total_tokens = 0 |
| self.session_input_tokens = 0 |
| self.session_output_tokens = 0 |
| self.session_prompt_tokens = 0 |
| self.session_completion_tokens = 0 |
| self.session_cache_read_tokens = 0 |
| self.session_cache_write_tokens = 0 |
| self.session_reasoning_tokens = 0 |
| self.session_api_calls = 0 |
| self.session_estimated_cost_usd = 0.0 |
| self.session_cost_status = "unknown" |
| self.session_cost_source = "none" |
| if hasattr(self, "context_compressor") and self.context_compressor: |
| self.context_compressor.last_prompt_tokens = 0 |
| self.context_compressor.last_completion_tokens = 0 |
| self.context_compressor.last_total_tokens = 0 |
| self.context_compressor.compression_count = 0 |
| self.context_compressor._context_probed = False |
|
|
|
|
| def _make_cli(env_overrides=None, config_overrides=None, **kwargs): |
| """Create a HermesCLI instance with minimal mocking.""" |
| _clean_config = { |
| "model": { |
| "default": "anthropic/claude-opus-4.6", |
| "base_url": "https://openrouter.ai/api/v1", |
| "provider": "auto", |
| }, |
| "display": {"compact": False, "tool_progress": "all"}, |
| "agent": {}, |
| "terminal": {"env_type": "local"}, |
| } |
| if config_overrides: |
| _clean_config.update(config_overrides) |
| clean_env = {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""} |
| if env_overrides: |
| clean_env.update(env_overrides) |
| prompt_toolkit_stubs = { |
| "prompt_toolkit": MagicMock(), |
| "prompt_toolkit.history": MagicMock(), |
| "prompt_toolkit.styles": MagicMock(), |
| "prompt_toolkit.patch_stdout": MagicMock(), |
| "prompt_toolkit.application": MagicMock(), |
| "prompt_toolkit.layout": MagicMock(), |
| "prompt_toolkit.layout.processors": MagicMock(), |
| "prompt_toolkit.filters": MagicMock(), |
| "prompt_toolkit.layout.dimension": MagicMock(), |
| "prompt_toolkit.layout.menus": MagicMock(), |
| "prompt_toolkit.widgets": MagicMock(), |
| "prompt_toolkit.key_binding": MagicMock(), |
| "prompt_toolkit.completion": MagicMock(), |
| "prompt_toolkit.formatted_text": MagicMock(), |
| "prompt_toolkit.auto_suggest": MagicMock(), |
| } |
| with patch.dict(sys.modules, prompt_toolkit_stubs), patch.dict( |
| "os.environ", clean_env, clear=False |
| ): |
| import cli as _cli_mod |
|
|
| _cli_mod = importlib.reload(_cli_mod) |
| with patch.object(_cli_mod, "get_tool_definitions", return_value=[]), patch.dict( |
| _cli_mod.__dict__, {"CLI_CONFIG": _clean_config} |
| ): |
| return _cli_mod.HermesCLI(**kwargs) |
|
|
|
|
| def _prepare_cli_with_active_session(tmp_path): |
| cli = _make_cli() |
| cli._session_db = SessionDB(db_path=tmp_path / "state.db") |
| cli._session_db.create_session(session_id=cli.session_id, source="cli", model=cli.model) |
|
|
| cli.agent = _FakeAgent(cli.session_id, cli.session_start) |
| cli.conversation_history = [{"role": "user", "content": "hello"}] |
|
|
| old_session_start = cli.session_start - timedelta(seconds=1) |
| cli.session_start = old_session_start |
| cli.agent.session_start = old_session_start |
| return cli |
|
|
|
|
| def test_new_command_creates_real_fresh_session_and_resets_agent_state(tmp_path): |
| cli = _prepare_cli_with_active_session(tmp_path) |
| old_session_id = cli.session_id |
| old_session_start = cli.session_start |
|
|
| cli.process_command("/new") |
|
|
| assert cli.session_id != old_session_id |
|
|
| old_session = cli._session_db.get_session(old_session_id) |
| assert old_session is not None |
| assert old_session["end_reason"] == "new_session" |
|
|
| new_session = cli._session_db.get_session(cli.session_id) |
| assert new_session is not None |
|
|
| cli._session_db.append_message(cli.session_id, role="user", content="next turn") |
|
|
| assert cli.agent.session_id == cli.session_id |
| assert cli.agent._last_flushed_db_idx == 0 |
| assert cli.agent._todo_store.read() == [] |
| assert cli.session_start > old_session_start |
| assert cli.agent.session_start == cli.session_start |
| cli.agent.flush_memories.assert_called_once_with([{"role": "user", "content": "hello"}]) |
| cli.agent._invalidate_system_prompt.assert_called_once() |
|
|
|
|
| def test_reset_command_is_alias_for_new_session(tmp_path): |
| cli = _prepare_cli_with_active_session(tmp_path) |
| old_session_id = cli.session_id |
|
|
| cli.process_command("/reset") |
|
|
| assert cli.session_id != old_session_id |
| assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session" |
| assert cli._session_db.get_session(cli.session_id) is not None |
|
|
|
|
| def test_clear_command_starts_new_session_before_redrawing(tmp_path): |
| cli = _prepare_cli_with_active_session(tmp_path) |
| cli.console = MagicMock() |
| cli.show_banner = MagicMock() |
|
|
| old_session_id = cli.session_id |
| cli.process_command("/clear") |
|
|
| assert cli.session_id != old_session_id |
| assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session" |
| assert cli._session_db.get_session(cli.session_id) is not None |
| cli.console.clear.assert_called_once() |
| cli.show_banner.assert_called_once() |
| assert cli.conversation_history == [] |
|
|
|
|
| def test_new_session_resets_token_counters(tmp_path): |
| """Regression test for #2099: /new must zero all token counters.""" |
| cli = _prepare_cli_with_active_session(tmp_path) |
|
|
| |
| agent = cli.agent |
| assert agent.session_total_tokens > 0 |
| assert agent.session_api_calls > 0 |
| assert agent.context_compressor.compression_count > 0 |
|
|
| cli.process_command("/new") |
|
|
| |
| assert agent.session_total_tokens == 0 |
| assert agent.session_input_tokens == 0 |
| assert agent.session_output_tokens == 0 |
| assert agent.session_prompt_tokens == 0 |
| assert agent.session_completion_tokens == 0 |
| assert agent.session_cache_read_tokens == 0 |
| assert agent.session_cache_write_tokens == 0 |
| assert agent.session_reasoning_tokens == 0 |
| assert agent.session_api_calls == 0 |
| assert agent.session_estimated_cost_usd == 0.0 |
| assert agent.session_cost_status == "unknown" |
| assert agent.session_cost_source == "none" |
|
|
| |
| comp = agent.context_compressor |
| assert comp.last_prompt_tokens == 0 |
| assert comp.last_completion_tokens == 0 |
| assert comp.last_total_tokens == 0 |
| assert comp.compression_count == 0 |
| assert comp._context_probed is False |
|
|