| | from datetime import datetime
|
| | from typing import Any
|
| | from unittest.mock import MagicMock
|
| |
|
| | import pytest
|
| |
|
| | from openhands.controller.state.state import State
|
| | from openhands.core.config.condenser_config import (
|
| | AmortizedForgettingCondenserConfig,
|
| | LLMAttentionCondenserConfig,
|
| | LLMSummarizingCondenserConfig,
|
| | NoOpCondenserConfig,
|
| | ObservationMaskingCondenserConfig,
|
| | RecentEventsCondenserConfig,
|
| | )
|
| | from openhands.core.config.llm_config import LLMConfig
|
| | from openhands.events.event import Event, EventSource
|
| | from openhands.events.observation.observation import Observation
|
| | from openhands.llm import LLM
|
| | from openhands.memory.condenser import (
|
| | AmortizedForgettingCondenser,
|
| | Condenser,
|
| | ImportantEventSelection,
|
| | LLMAttentionCondenser,
|
| | LLMSummarizingCondenser,
|
| | NoOpCondenser,
|
| | ObservationMaskingCondenser,
|
| | RecentEventsCondenser,
|
| | )
|
| |
|
| |
|
| | def create_test_event(
|
| | message: str, timestamp: datetime | None = None, id: int | None = None
|
| | ) -> Event:
|
| | """Create a simple test event."""
|
| | event = Event()
|
| | event._message = message
|
| | event.timestamp = timestamp if timestamp else datetime.now()
|
| | if id:
|
| | event._id = id
|
| | event._source = EventSource.USER
|
| | return event
|
| |
|
| |
|
| | @pytest.fixture
|
| | def mock_llm() -> LLM:
|
| | """Mocks an LLM object with a utility function for setting and resetting response contents in unit tests."""
|
| |
|
| | mock_llm = MagicMock(
|
| | spec=LLM,
|
| | config=MagicMock(
|
| | spec=LLMConfig, model='gpt-4o', api_key='test_key', custom_llm_provider=None
|
| | ),
|
| | metrics=MagicMock(),
|
| | )
|
| | _mock_content = None
|
| |
|
| |
|
| | mock_message = MagicMock()
|
| | mock_message.content = _mock_content
|
| |
|
| | def set_mock_response_content(content: Any):
|
| | """Set the mock response for the LLM."""
|
| | nonlocal mock_message
|
| | mock_message.content = content
|
| |
|
| | mock_choice = MagicMock()
|
| | mock_choice.message = mock_message
|
| |
|
| | mock_response = MagicMock()
|
| | mock_response.choices = [mock_choice]
|
| |
|
| | mock_llm.completion.return_value = mock_response
|
| |
|
| |
|
| | mock_llm.set_mock_response_content = set_mock_response_content
|
| |
|
| | return mock_llm
|
| |
|
| |
|
| | @pytest.fixture
|
| | def mock_state() -> State:
|
| | """Mocks a State object with the only parameters needed for testing condensers: history and extra_data."""
|
| | mock_state = MagicMock(spec=State)
|
| | mock_state.history = []
|
| | mock_state.extra_data = {}
|
| |
|
| | return mock_state
|
| |
|
| |
|
| | def test_noop_condenser_from_config():
|
| | """Test that the NoOpCondenser objects can be made from config."""
|
| | config = NoOpCondenserConfig()
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, NoOpCondenser)
|
| |
|
| |
|
| | def test_noop_condenser():
|
| | """Test that NoOpCondensers preserve their input events."""
|
| | events = [
|
| | create_test_event('Event 1'),
|
| | create_test_event('Event 2'),
|
| | create_test_event('Event 3'),
|
| | ]
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.history = events
|
| |
|
| | condenser = NoOpCondenser()
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert result == events
|
| |
|
| |
|
| | def test_observation_masking_condenser_from_config():
|
| | """Test that ObservationMaskingCondenser objects can be made from config."""
|
| | attention_window = 5
|
| | config = ObservationMaskingCondenserConfig(attention_window=attention_window)
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, ObservationMaskingCondenser)
|
| | assert condenser.attention_window == attention_window
|
| |
|
| |
|
| | def test_observation_masking_condenser_respects_attention_window(mock_state):
|
| | """Test that ObservationMaskingCondenser only masks events outside the attention window."""
|
| | attention_window = 3
|
| | condenser = ObservationMaskingCondenser(attention_window=attention_window)
|
| |
|
| | events = [
|
| | create_test_event('Event 1'),
|
| | Observation('Observation 1'),
|
| | create_test_event('Event 3'),
|
| | create_test_event('Event 4'),
|
| | Observation('Observation 2'),
|
| | ]
|
| |
|
| | mock_state.history = events
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert len(result) == len(events)
|
| |
|
| | for index, (event, condensed_event) in enumerate(zip(events, result)):
|
| |
|
| | if index < len(events) - attention_window:
|
| | if isinstance(event, Observation):
|
| | assert '<MASKED>' in str(condensed_event)
|
| |
|
| |
|
| | else:
|
| | assert event == condensed_event
|
| |
|
| |
|
| | def test_recent_events_condenser_from_config():
|
| | """Test that RecentEventsCondenser objects can be made from config."""
|
| | max_events = 5
|
| | keep_first = True
|
| | config = RecentEventsCondenserConfig(keep_first=keep_first, max_events=max_events)
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, RecentEventsCondenser)
|
| | assert condenser.max_events == max_events
|
| | assert condenser.keep_first == keep_first
|
| |
|
| |
|
| | def test_recent_events_condenser():
|
| | """Test that RecentEventsCondensers keep just the most recent events."""
|
| | events = [
|
| | create_test_event('Event 1'),
|
| | create_test_event('Event 2'),
|
| | create_test_event('Event 3'),
|
| | create_test_event('Event 4'),
|
| | create_test_event('Event 5'),
|
| | ]
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.history = events
|
| |
|
| |
|
| | condenser = RecentEventsCondenser(max_events=len(events))
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert result == events
|
| |
|
| |
|
| | max_events = 2
|
| | condenser = RecentEventsCondenser(max_events=max_events)
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert len(result) == max_events
|
| | assert result[0]._message == 'Event 4'
|
| | assert result[1]._message == 'Event 5'
|
| |
|
| |
|
| | keep_first = 1
|
| | max_events = 2
|
| | condenser = RecentEventsCondenser(keep_first=keep_first, max_events=max_events)
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert len(result) == max_events
|
| | assert result[0]._message == 'Event 1'
|
| | assert result[1]._message == 'Event 5'
|
| |
|
| |
|
| | keep_first = 2
|
| | max_events = 3
|
| | condenser = RecentEventsCondenser(keep_first=keep_first, max_events=max_events)
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert len(result) == max_events
|
| | assert result[0]._message == 'Event 1'
|
| | assert result[1]._message == 'Event 2'
|
| | assert result[2]._message == 'Event 5'
|
| |
|
| |
|
| | def test_llm_condenser_from_config():
|
| | """Test that LLMCondensers can be made from config."""
|
| | config = LLMSummarizingCondenserConfig(
|
| | llm_config=LLMConfig(
|
| | model='gpt-4o',
|
| | api_key='test_key',
|
| | )
|
| | )
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, LLMSummarizingCondenser)
|
| | assert condenser.llm.config.model == 'gpt-4o'
|
| | assert condenser.llm.config.api_key.get_secret_value() == 'test_key'
|
| |
|
| |
|
| | def test_llm_condenser(mock_llm, mock_state):
|
| | """Test that LLMCondensers use the LLM to generate a summary event."""
|
| | events = [
|
| | create_test_event('Event 1'),
|
| | create_test_event('Event 2'),
|
| | ]
|
| | mock_state.history = events
|
| |
|
| | mock_llm.metrics = MagicMock()
|
| | mock_llm.metrics.get.return_value = {'test_metric': 1.0}
|
| |
|
| | mock_llm.set_mock_response_content('Summary of events')
|
| |
|
| | condenser = LLMSummarizingCondenser(llm=mock_llm)
|
| | result = condenser.condensed_history(mock_state)
|
| |
|
| | assert len(result) == 1
|
| | assert result[0].content == 'Summary of events'
|
| |
|
| |
|
| | mock_llm.completion.assert_called_once()
|
| | call_args = mock_llm.completion.call_args[1]
|
| | assert 'messages' in call_args
|
| | assert len(call_args['messages']) == 1
|
| | assert 'Event 1' in call_args['messages'][0]['content']
|
| | assert 'Event 2' in call_args['messages'][0]['content']
|
| |
|
| |
|
| | assert 'condenser_meta' in mock_state.extra_data
|
| | assert len(mock_state.extra_data['condenser_meta']) == 1
|
| | assert mock_state.extra_data['condenser_meta'][0]['metrics'] == {'test_metric': 1.0}
|
| |
|
| |
|
| | def test_llm_condenser_error():
|
| | """Test that LLM errors are propagated during condensation."""
|
| | events = [create_test_event('Event 1', datetime(2024, 1, 1, 10, 0))]
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.history = events
|
| |
|
| | mock_llm = MagicMock()
|
| | mock_llm.completion.side_effect = Exception('LLM error')
|
| |
|
| | condenser = LLMSummarizingCondenser(llm=mock_llm)
|
| |
|
| | try:
|
| | condenser.condensed_history(mock_state)
|
| | raise AssertionError('Expected exception was not raised.')
|
| | except Exception as e:
|
| | assert str(e) == 'LLM error'
|
| |
|
| |
|
| | def test_amortized_forgetting_condenser_from_config():
|
| | """Test that AmortizedForgettingCondenser objects can be made from config."""
|
| | max_size = 50
|
| | keep_first = 10
|
| | config = AmortizedForgettingCondenserConfig(
|
| | max_size=max_size, keep_first=keep_first
|
| | )
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, AmortizedForgettingCondenser)
|
| | assert condenser.max_size == max_size
|
| | assert condenser.keep_first == keep_first
|
| |
|
| |
|
| | def test_amortized_forgetting_condenser_invalid_config():
|
| | """Test that AmortizedForgettingCondenser raises error when keep_first > max_size."""
|
| | pytest.raises(ValueError, AmortizedForgettingCondenser, max_size=4, keep_first=2)
|
| | pytest.raises(ValueError, AmortizedForgettingCondenser, max_size=0)
|
| | pytest.raises(ValueError, AmortizedForgettingCondenser, keep_first=-1)
|
| |
|
| |
|
| | def test_amortized_forgetting_condenser_grows_to_max_size():
|
| | """Test that AmortizedForgettingCondenser correctly maintains an event context up to max size."""
|
| | max_size = 15
|
| | condenser = AmortizedForgettingCondenser(max_size=max_size)
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.extra_data = {}
|
| | mock_state.history = []
|
| |
|
| | for i in range(max_size):
|
| | event = create_test_event(f'Event {i}')
|
| | mock_state.history.append(event)
|
| | results = condenser.condensed_history(mock_state)
|
| | assert len(results) == i + 1
|
| |
|
| |
|
| | def test_amortized_forgetting_condenser_forgets_when_larger_than_max_size():
|
| | """Test that the AmortizedForgettingCondenser forgets events when the context grows too large."""
|
| | max_size = 2
|
| | condenser = AmortizedForgettingCondenser(max_size=max_size)
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.extra_data = {}
|
| | mock_state.history = []
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i}')
|
| | mock_state.history.append(event)
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert results[-1] == event
|
| |
|
| |
|
| | assert len(results) == (i % 2) + 1
|
| |
|
| |
|
| | def test_amortized_forgetting_condenser_keeps_first_events():
|
| | """Test that the AmortizedForgettingCondenser keeps the right number of initial events when forgetting."""
|
| | max_size = 4
|
| | keep_first = 1
|
| | condenser = AmortizedForgettingCondenser(max_size=max_size, keep_first=keep_first)
|
| |
|
| | first_event = create_test_event('Event 0')
|
| |
|
| | mock_state = MagicMock()
|
| | mock_state.extra_data = {}
|
| | mock_state.history = [first_event]
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i+1}', datetime(2024, 1, 1, 10, i + 1))
|
| | mock_state.history.append(event)
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert results[-1] == event
|
| |
|
| |
|
| | assert results[0] == first_event
|
| |
|
| |
|
| | print(len(results))
|
| | assert len(results) == (i % 3) + 2
|
| |
|
| |
|
| | def test_llm_attention_condenser_from_config():
|
| | """Test that LLMAttentionCondenser objects can be made from config."""
|
| | config = LLMAttentionCondenserConfig(
|
| | max_size=50,
|
| | keep_first=10,
|
| | llm_config=LLMConfig(
|
| | model='gpt-4o',
|
| | api_key='test_key',
|
| | ),
|
| | )
|
| | condenser = Condenser.from_config(config)
|
| |
|
| | assert isinstance(condenser, LLMAttentionCondenser)
|
| | assert condenser.llm.config.model == 'gpt-4o'
|
| | assert condenser.llm.config.api_key.get_secret_value() == 'test_key'
|
| | assert condenser.max_size == 50
|
| | assert condenser.keep_first == 10
|
| |
|
| |
|
| | def test_llm_attention_condenser_invalid_config():
|
| | """Test that LLMAttentionCondenser raises an error if the configured LLM doesn't support response schema."""
|
| | config = LLMAttentionCondenserConfig(
|
| | max_size=50,
|
| | keep_first=10,
|
| | llm_config=LLMConfig(
|
| | model='claude-2',
|
| | api_key='test_key',
|
| | ),
|
| | )
|
| |
|
| | pytest.raises(ValueError, LLMAttentionCondenser.from_config, config)
|
| |
|
| |
|
| | def test_llm_attention_condenser_keeps_first_events(mock_llm, mock_state):
|
| | """Test that the LLMAttentionCondenser keeps the right number of initial events when forgetting."""
|
| | max_size = 4
|
| | condenser = LLMAttentionCondenser(max_size=max_size, keep_first=1, llm=mock_llm)
|
| |
|
| | first_event = create_test_event('Event 0', id=0)
|
| | mock_state.history.append(first_event)
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i+1}', id=i + 1)
|
| | mock_state.history.append(event)
|
| |
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(
|
| | ids=[event.id for event in mock_state.history]
|
| | ).model_dump_json()
|
| | )
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert results[0] == first_event
|
| |
|
| |
|
| | def test_llm_attention_condenser_grows_to_max_size(mock_llm, mock_state):
|
| | """Test that LLMAttentionCondenser correctly maintains an event context up to max size."""
|
| | max_size = 15
|
| | condenser = LLMAttentionCondenser(max_size=max_size, llm=mock_llm)
|
| |
|
| | for i in range(max_size):
|
| | event = create_test_event(f'Event {i}')
|
| | mock_state.history.append(event)
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(ids=[event.id for event in mock_state.history])
|
| | )
|
| | results = condenser.condensed_history(mock_state)
|
| | assert len(results) == i + 1
|
| |
|
| |
|
| | def test_llm_attention_condenser_forgets_when_larger_than_max_size(
|
| | mock_llm, mock_state
|
| | ):
|
| | """Test that the LLMAttentionCondenser forgets events when the context grows too large."""
|
| | max_size = 2
|
| | condenser = LLMAttentionCondenser(max_size=max_size, llm=mock_llm)
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i}', id=i)
|
| | mock_state.history.append(event)
|
| |
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(
|
| | ids=[event.id for event in mock_state.history]
|
| | ).model_dump_json()
|
| | )
|
| |
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert len(results) == (i % 2) + 1
|
| |
|
| |
|
| | def test_llm_attention_condenser_handles_events_outside_history(mock_llm, mock_state):
|
| | """Test that the LLMAttentionCondenser handles event IDs that aren't from the event history."""
|
| | max_size = 2
|
| | condenser = LLMAttentionCondenser(max_size=max_size, llm=mock_llm)
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i}', id=i)
|
| | mock_state.history.append(event)
|
| |
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(
|
| | ids=[event.id for event in mock_state.history] + [-1, -2, -3, -4]
|
| | ).model_dump_json()
|
| | )
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert len(results) == (i % 2) + 1
|
| |
|
| |
|
| | def test_llm_attention_condenser_handles_too_many_events(mock_llm, mock_state):
|
| | """Test that the LLMAttentionCondenser handles when the response contains too many event IDs."""
|
| | max_size = 2
|
| | condenser = LLMAttentionCondenser(max_size=max_size, llm=mock_llm)
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i}', id=i)
|
| | mock_state.history.append(event)
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(
|
| | ids=[event.id for event in mock_state.history]
|
| | + [event.id for event in mock_state.history]
|
| | ).model_dump_json()
|
| | )
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert len(results) == (i % 2) + 1
|
| |
|
| |
|
| | def test_llm_attention_condenser_handles_too_few_events(mock_llm, mock_state):
|
| | """Test that the LLMAttentionCondenser handles when the response contains too few event IDs."""
|
| | max_size = 2
|
| | condenser = LLMAttentionCondenser(max_size=max_size, llm=mock_llm)
|
| |
|
| | for i in range(max_size * 10):
|
| | event = create_test_event(f'Event {i}', id=i)
|
| | mock_state.history.append(event)
|
| |
|
| | mock_llm.set_mock_response_content(
|
| | ImportantEventSelection(ids=[]).model_dump_json()
|
| | )
|
| |
|
| | results = condenser.condensed_history(mock_state)
|
| |
|
| |
|
| | assert len(results) == (i % 2) + 1
|
| |
|