Spaces:
Paused
Paused
| import json | |
| import os | |
| import sys | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| import httpx | |
| import pytest | |
| sys.path.insert(0, os.path.abspath("../../../../..")) | |
| from litellm.llms.sagemaker.common_utils import AWSEventStreamDecoder | |
| from litellm.llms.sagemaker.completion.transformation import SagemakerConfig | |
| async def test_aiter_bytes_unicode_decode_error(): | |
| """ | |
| Test that AWSEventStreamDecoder.aiter_bytes() does not raise an error when encountering invalid UTF-8 bytes. (UnicodeDecodeError) | |
| Ensures stream processing continues despite the error. | |
| Relevant issue: https://github.com/BerriAI/litellm/issues/9165 | |
| """ | |
| # Create an instance of AWSEventStreamDecoder | |
| decoder = AWSEventStreamDecoder(model="test-model") | |
| # Create a mock event that will trigger a UnicodeDecodeError | |
| mock_event = MagicMock() | |
| mock_event.to_response_dict.return_value = { | |
| "status_code": 200, | |
| "headers": {}, | |
| "body": b"\xff\xfe", # Invalid UTF-8 bytes | |
| } | |
| # Create a mock EventStreamBuffer that yields our mock event | |
| mock_buffer = MagicMock() | |
| mock_buffer.__iter__.return_value = [mock_event] | |
| # Mock the EventStreamBuffer class | |
| with patch("botocore.eventstream.EventStreamBuffer", return_value=mock_buffer): | |
| # Create an async generator that yields some test bytes | |
| async def mock_iterator(): | |
| yield b"" | |
| # Process the stream | |
| chunks = [] | |
| async for chunk in decoder.aiter_bytes(mock_iterator()): | |
| if chunk is not None: | |
| print("chunk=", chunk) | |
| chunks.append(chunk) | |
| # Verify that processing continued despite the error | |
| # The chunks list should be empty since we only sent invalid data | |
| assert len(chunks) == 0 | |
| async def test_aiter_bytes_valid_chunk_followed_by_unicode_error(): | |
| """ | |
| Test that valid chunks are processed correctly even when followed by Unicode decode errors. | |
| This ensures errors don't corrupt or prevent processing of valid data that came before. | |
| Relevant issue: https://github.com/BerriAI/litellm/issues/9165 | |
| """ | |
| decoder = AWSEventStreamDecoder(model="test-model") | |
| # Create two mock events - first valid, then invalid | |
| mock_valid_event = MagicMock() | |
| mock_valid_event.to_response_dict.return_value = { | |
| "status_code": 200, | |
| "headers": {}, | |
| "body": json.dumps({"token": {"text": "hello"}}).encode(), # Valid data first | |
| } | |
| mock_invalid_event = MagicMock() | |
| mock_invalid_event.to_response_dict.return_value = { | |
| "status_code": 200, | |
| "headers": {}, | |
| "body": b"\xff\xfe", # Invalid UTF-8 bytes second | |
| } | |
| # Create a mock EventStreamBuffer that yields valid event first, then invalid | |
| mock_buffer = MagicMock() | |
| mock_buffer.__iter__.return_value = [mock_valid_event, mock_invalid_event] | |
| with patch("botocore.eventstream.EventStreamBuffer", return_value=mock_buffer): | |
| async def mock_iterator(): | |
| yield b"test_bytes" | |
| chunks = [] | |
| async for chunk in decoder.aiter_bytes(mock_iterator()): | |
| if chunk is not None: | |
| chunks.append(chunk) | |
| # Verify we got our valid chunk despite the subsequent error | |
| assert len(chunks) == 1 | |
| assert chunks[0]["text"] == "hello" # Verify the content of the valid chunk | |
| class TestSagemakerTransform: | |
| def setup_method(self): | |
| self.config = SagemakerConfig() | |
| self.model = "test" | |
| self.logging_obj = MagicMock() | |
| def test_map_mistral_params(self): | |
| """Test that parameters are correctly mapped""" | |
| test_params = { | |
| "temperature": 0.7, | |
| "max_tokens": 200, | |
| "max_completion_tokens": 256, | |
| } | |
| result = self.config.map_openai_params( | |
| non_default_params=test_params, | |
| optional_params={}, | |
| model=self.model, | |
| drop_params=False, | |
| ) | |
| # The function should properly map max_completion_tokens to max_tokens and override max_tokens | |
| assert result == {"temperature": 0.7, "max_new_tokens": 256} | |
| def test_mistral_max_tokens_backward_compat(self): | |
| """Test that parameters are correctly mapped""" | |
| test_params = { | |
| "temperature": 0.7, | |
| "max_tokens": 200, | |
| } | |
| result = self.config.map_openai_params( | |
| non_default_params=test_params, | |
| optional_params={}, | |
| model=self.model, | |
| drop_params=False, | |
| ) | |
| # The function should properly map max_tokens if max_completion_tokens is not provided | |
| assert result == {"temperature": 0.7, "max_new_tokens": 200} | |