| | import asyncio
|
| | from contextlib import contextmanager
|
| | from typing import Type
|
| | from unittest.mock import AsyncMock, MagicMock, patch
|
| |
|
| | import pytest
|
| |
|
| | from openhands.core.config import load_app_config
|
| | from openhands.core.exceptions import UserCancelledError
|
| | from openhands.llm.async_llm import AsyncLLM
|
| | from openhands.llm.llm import LLM
|
| | from openhands.llm.streaming_llm import StreamingLLM
|
| |
|
| | config = load_app_config()
|
| |
|
| |
|
| | @pytest.fixture
|
| | def test_llm():
|
| | return _get_llm(LLM)
|
| |
|
| |
|
| | def _get_llm(type_: Type[LLM]):
|
| | with _patch_http():
|
| | return type_(config=config.get_llm_config())
|
| |
|
| |
|
| | @pytest.fixture
|
| | def mock_response():
|
| | return [
|
| | {'choices': [{'delta': {'content': 'This is a'}}]},
|
| | {'choices': [{'delta': {'content': ' test'}}]},
|
| | {'choices': [{'delta': {'content': ' message.'}}]},
|
| | {'choices': [{'delta': {'content': ' It is'}}]},
|
| | {'choices': [{'delta': {'content': ' a bit'}}]},
|
| | {'choices': [{'delta': {'content': ' longer'}}]},
|
| | {'choices': [{'delta': {'content': ' than'}}]},
|
| | {'choices': [{'delta': {'content': ' the'}}]},
|
| | {'choices': [{'delta': {'content': ' previous'}}]},
|
| | {'choices': [{'delta': {'content': ' one,'}}]},
|
| | {'choices': [{'delta': {'content': ' but'}}]},
|
| | {'choices': [{'delta': {'content': ' hopefully'}}]},
|
| | {'choices': [{'delta': {'content': ' still'}}]},
|
| | {'choices': [{'delta': {'content': ' short'}}]},
|
| | {'choices': [{'delta': {'content': ' enough.'}}]},
|
| | ]
|
| |
|
| |
|
| | @contextmanager
|
| | def _patch_http():
|
| | with patch('openhands.llm.llm.requests.get', MagicMock()) as mock_http:
|
| | mock_http.json.return_value = {
|
| | 'data': [
|
| | {'model_name': 'some_model'},
|
| | {'model_name': 'another_model'},
|
| | ]
|
| | }
|
| | yield
|
| |
|
| |
|
| | @pytest.mark.asyncio
|
| | async def test_acompletion_non_streaming():
|
| | with patch.object(AsyncLLM, '_call_acompletion') as mock_call_acompletion:
|
| | mock_response = {
|
| | 'choices': [{'message': {'content': 'This is a test message.'}}]
|
| | }
|
| | mock_call_acompletion.return_value = mock_response
|
| | test_llm = _get_llm(AsyncLLM)
|
| | response = await test_llm.async_completion(
|
| | messages=[{'role': 'user', 'content': 'Hello!'}],
|
| | stream=False,
|
| | drop_params=True,
|
| | )
|
| |
|
| | assert response['choices'][0]['message']['content'] != ''
|
| |
|
| |
|
| | @pytest.mark.asyncio
|
| | async def test_acompletion_streaming(mock_response):
|
| | with patch.object(StreamingLLM, '_call_acompletion') as mock_call_acompletion:
|
| | mock_call_acompletion.return_value.__aiter__.return_value = iter(mock_response)
|
| | test_llm = _get_llm(StreamingLLM)
|
| | async for chunk in test_llm.async_streaming_completion(
|
| | messages=[{'role': 'user', 'content': 'Hello!'}], stream=True
|
| | ):
|
| | print(f"Chunk: {chunk['choices'][0]['delta']['content']}")
|
| |
|
| | assert chunk['choices'][0]['delta']['content'] in [
|
| | r['choices'][0]['delta']['content'] for r in mock_response
|
| | ]
|
| |
|
| |
|
| | @pytest.mark.asyncio
|
| | async def test_completion(test_llm):
|
| | with patch.object(LLM, 'completion') as mock_completion:
|
| | mock_completion.return_value = {
|
| | 'choices': [{'message': {'content': 'This is a test message.'}}]
|
| | }
|
| | response = test_llm.completion(messages=[{'role': 'user', 'content': 'Hello!'}])
|
| | assert response['choices'][0]['message']['content'] == 'This is a test message.'
|
| |
|
| |
|
| | @pytest.mark.asyncio
|
| | @pytest.mark.parametrize('cancel_delay', [0.1, 0.3, 0.5, 0.7, 0.9])
|
| | async def test_async_completion_with_user_cancellation(cancel_delay):
|
| | cancel_event = asyncio.Event()
|
| |
|
| | async def mock_on_cancel_requested():
|
| | is_set = cancel_event.is_set()
|
| | print(f'Cancel requested: {is_set}')
|
| | return is_set
|
| |
|
| | async def mock_acompletion(*args, **kwargs):
|
| | print('Starting mock_acompletion')
|
| | for i in range(20):
|
| | print(f'mock_acompletion iteration {i}')
|
| | await asyncio.sleep(0.1)
|
| | if await mock_on_cancel_requested():
|
| | print('Cancellation detected in mock_acompletion')
|
| | raise UserCancelledError('LLM request cancelled by user')
|
| | print('Completing mock_acompletion without cancellation')
|
| | return {'choices': [{'message': {'content': 'This is a test message.'}}]}
|
| |
|
| | with patch.object(
|
| | AsyncLLM, '_call_acompletion', new_callable=AsyncMock
|
| | ) as mock_call_acompletion:
|
| | mock_call_acompletion.side_effect = mock_acompletion
|
| | test_llm = _get_llm(AsyncLLM)
|
| |
|
| | async def cancel_after_delay():
|
| | print(f'Starting cancel_after_delay with delay {cancel_delay}')
|
| | await asyncio.sleep(cancel_delay)
|
| | print('Setting cancel event')
|
| | cancel_event.set()
|
| |
|
| | with pytest.raises(UserCancelledError):
|
| | await asyncio.gather(
|
| | test_llm.async_completion(
|
| | messages=[{'role': 'user', 'content': 'Hello!'}],
|
| | stream=False,
|
| | ),
|
| | cancel_after_delay(),
|
| | )
|
| |
|
| |
|
| | mock_call_acompletion.assert_called_once()
|
| |
|
| |
|
| | @pytest.mark.asyncio
|
| | @pytest.mark.parametrize('cancel_after_chunks', [1, 3, 5, 7, 9])
|
| | async def test_async_streaming_completion_with_user_cancellation(cancel_after_chunks):
|
| | cancel_requested = False
|
| |
|
| | test_messages = [
|
| | 'This is ',
|
| | 'a test ',
|
| | 'message ',
|
| | 'with ',
|
| | 'multiple ',
|
| | 'chunks ',
|
| | 'to ',
|
| | 'simulate ',
|
| | 'a ',
|
| | 'longer ',
|
| | 'streaming ',
|
| | 'response.',
|
| | ]
|
| |
|
| | async def mock_acompletion(*args, **kwargs):
|
| | for i, content in enumerate(test_messages):
|
| | yield {'choices': [{'delta': {'content': content}}]}
|
| | if i + 1 == cancel_after_chunks:
|
| | nonlocal cancel_requested
|
| | cancel_requested = True
|
| | if cancel_requested:
|
| | raise UserCancelledError('LLM request cancelled by user')
|
| | await asyncio.sleep(0.05)
|
| |
|
| | with patch.object(
|
| | AsyncLLM, '_call_acompletion', new_callable=AsyncMock
|
| | ) as mock_call_acompletion:
|
| | mock_call_acompletion.return_value = mock_acompletion()
|
| | test_llm = _get_llm(StreamingLLM)
|
| |
|
| | received_chunks = []
|
| | with pytest.raises(UserCancelledError):
|
| | async for chunk in test_llm.async_streaming_completion(
|
| | messages=[{'role': 'user', 'content': 'Hello!'}], stream=True
|
| | ):
|
| | received_chunks.append(chunk['choices'][0]['delta']['content'])
|
| | print(f"Chunk: {chunk['choices'][0]['delta']['content']}")
|
| |
|
| |
|
| | assert len(received_chunks) == cancel_after_chunks
|
| | assert received_chunks == test_messages[:cancel_after_chunks]
|
| |
|
| |
|
| | mock_call_acompletion.assert_called_once()
|
| |
|