| """ |
| Unit-tests for OpenAIServingChat — rewritten to use only the std-lib 'unittest'. |
| Run with either: |
| python tests/test_serving_chat_unit.py -v |
| or |
| python -m unittest discover -s tests -p "test_*unit.py" -v |
| """ |
|
|
| import json |
| import unittest |
| import uuid |
| from http import HTTPStatus |
| from typing import Optional |
| from unittest.mock import Mock, patch |
|
|
| from fastapi import Request |
|
|
| from sglang.srt.entrypoints.openai.protocol import ( |
| ChatCompletionRequest, |
| MessageProcessingResult, |
| ) |
| from sglang.srt.entrypoints.openai.serving_chat import OpenAIServingChat |
| from sglang.srt.managers.io_struct import GenerateReqInput |
| from sglang.srt.utils import get_or_create_event_loop |
| from sglang.test.ci.ci_register import register_amd_ci, register_cuda_ci |
|
|
| register_cuda_ci(est_time=10, suite="stage-b-test-small-1-gpu") |
| register_amd_ci(est_time=10, suite="stage-b-test-small-1-gpu-amd") |
|
|
|
|
| class _MockTokenizerManager: |
| """Minimal mock that satisfies OpenAIServingChat.""" |
|
|
| def __init__(self): |
| self.model_config = Mock(is_multimodal=False) |
| self.server_args = Mock( |
| enable_cache_report=False, |
| tool_call_parser="hermes", |
| reasoning_parser=None, |
| ) |
| |
| mock_hf_config = Mock() |
| mock_hf_config.architectures = ["LlamaForCausalLM"] |
| self.model_config.hf_config = mock_hf_config |
|
|
| self.chat_template_name: Optional[str] = "llama-3" |
|
|
| |
| self.tokenizer = Mock() |
| self.tokenizer.encode.return_value = [1, 2, 3, 4, 5] |
| self.tokenizer.decode.return_value = "Test response" |
| self.tokenizer.chat_template = None |
| self.tokenizer.bos_token_id = 1 |
|
|
| |
| async def _mock_generate(): |
| yield { |
| "text": "Test response", |
| "meta_info": { |
| "id": f"chatcmpl-{uuid.uuid4()}", |
| "prompt_tokens": 10, |
| "completion_tokens": 5, |
| "cached_tokens": 0, |
| "finish_reason": {"type": "stop", "matched": None}, |
| "output_token_logprobs": [(0.1, 1, "Test"), (0.2, 2, "response")], |
| "output_top_logprobs": None, |
| }, |
| "index": 0, |
| } |
|
|
| self.generate_request = Mock(return_value=_mock_generate()) |
| self.create_abort_task = Mock() |
|
|
|
|
| class _MockTemplateManager: |
| """Minimal mock for TemplateManager.""" |
|
|
| def __init__(self): |
| self.chat_template_name: Optional[str] = "llama-3" |
| self.jinja_template_content_format: Optional[str] = None |
| self.completion_template_name: Optional[str] = None |
|
|
|
|
| class ServingChatTestCase(unittest.TestCase): |
| |
| def setUp(self): |
| self.tm = _MockTokenizerManager() |
| self.template_manager = _MockTemplateManager() |
| self.chat = OpenAIServingChat(self.tm, self.template_manager) |
|
|
| |
| self.basic_req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "Hi?"}], |
| temperature=0.7, |
| max_tokens=100, |
| stream=False, |
| ) |
| self.stream_req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "Hi?"}], |
| temperature=0.7, |
| max_tokens=100, |
| stream=True, |
| ) |
|
|
| self.fastapi_request = Mock(spec=Request) |
| self.fastapi_request.headers = {} |
|
|
| |
| def test_convert_to_internal_request_single(self): |
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.generate_chat_conv" |
| ) as conv_mock, patch.object(self.chat, "_process_messages") as proc_mock: |
| conv_ins = Mock() |
| conv_ins.get_prompt.return_value = "Test prompt" |
| conv_ins.image_data = conv_ins.audio_data = None |
| conv_ins.modalities = [] |
| conv_ins.stop_str = ["</s>"] |
| conv_mock.return_value = conv_ins |
|
|
| proc_mock.return_value = MessageProcessingResult( |
| "Test prompt", |
| [1, 2, 3], |
| None, |
| None, |
| [], |
| ["</s>"], |
| None, |
| ) |
|
|
| adapted, processed = self.chat._convert_to_internal_request(self.basic_req) |
| self.assertIsInstance(adapted, GenerateReqInput) |
| self.assertFalse(adapted.stream) |
| self.assertEqual(processed, self.basic_req) |
|
|
| def test_jinja_uses_openai_tool_schema_first(self): |
| """Ensure Jinja chat templates receive OpenAI-shaped tools by default.""" |
| self.template_manager.chat_template_name = None |
| self.template_manager.jinja_template_content_format = "string" |
|
|
| req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "What is 2+2?"}], |
| tools=[ |
| { |
| "type": "function", |
| "function": { |
| "name": "add", |
| "description": "Add two numbers.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "a": {"type": "integer"}, |
| "b": {"type": "integer"}, |
| }, |
| "required": ["a", "b"], |
| }, |
| }, |
| } |
| ], |
| ) |
|
|
| self.chat._process_messages(req, is_multimodal=False) |
|
|
| expected_tools = [tool.model_dump() for tool in req.tools] |
| kwargs = self.tm.tokenizer.apply_chat_template.call_args.kwargs |
| self.assertEqual(kwargs["tools"], expected_tools) |
|
|
| def test_jinja_tool_schema_fallback_to_flat_function(self): |
| """Fallback to function-only schema when template rejects OpenAI wrapper.""" |
| self.template_manager.chat_template_name = None |
| self.template_manager.jinja_template_content_format = "string" |
|
|
| req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "What is 2+2?"}], |
| tools=[ |
| { |
| "type": "function", |
| "function": { |
| "name": "add", |
| "description": "Add two numbers.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "a": {"type": "integer"}, |
| "b": {"type": "integer"}, |
| }, |
| "required": ["a", "b"], |
| }, |
| }, |
| } |
| ], |
| ) |
|
|
| self.tm.tokenizer.apply_chat_template.side_effect = [ |
| RuntimeError("template expects flat tools format"), |
| [1, 2, 3], |
| ] |
|
|
| self.chat._process_messages(req, is_multimodal=False) |
|
|
| first_tools = self.tm.tokenizer.apply_chat_template.call_args_list[0].kwargs[ |
| "tools" |
| ] |
| second_tools = self.tm.tokenizer.apply_chat_template.call_args_list[1].kwargs[ |
| "tools" |
| ] |
| self.assertEqual(first_tools, [tool.model_dump() for tool in req.tools]) |
| self.assertEqual( |
| second_tools, [tool.function.model_dump() for tool in req.tools] |
| ) |
|
|
| def test_stop_str_isolation_between_requests(self): |
| """Test that stop strings from one request don't affect subsequent requests. |
| |
| This tests the fix for the bug where conv.stop_str was being mutated globally, |
| causing stop strings from one request to persist in subsequent requests. |
| """ |
| |
| initial_stop_str = ["\n"] |
|
|
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.generate_chat_conv" |
| ) as conv_mock: |
| |
| conv_ins = Mock() |
| conv_ins.get_prompt.return_value = "Test prompt" |
| conv_ins.image_data = None |
| conv_ins.audio_data = None |
| conv_ins.modalities = [] |
| conv_ins.stop_str = ( |
| initial_stop_str.copy() |
| ) |
| conv_mock.return_value = conv_ins |
|
|
| |
| req1 = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "First request"}], |
| stop=["CUSTOM_STOP"], |
| ) |
|
|
| |
| result1 = self.chat._apply_conversation_template(req1, is_multimodal=False) |
|
|
| |
| expected_stop1 = initial_stop_str + ["CUSTOM_STOP"] |
| self.assertEqual(result1.stop, expected_stop1) |
|
|
| |
| self.assertEqual(conv_ins.stop_str, initial_stop_str) |
|
|
| |
| req2 = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "Second request"}], |
| |
| ) |
| result2 = self.chat._apply_conversation_template(req2, is_multimodal=False) |
|
|
| |
| self.assertEqual(result2.stop, initial_stop_str) |
| self.assertNotIn("CUSTOM_STOP", result2.stop) |
| self.assertEqual(conv_ins.stop_str, initial_stop_str) |
|
|
| def test_unstreamed_tool_args_completion(self): |
| """Test that remaining tool call arguments are sent when generation finishes.""" |
|
|
| |
| mock_parser = Mock() |
| mock_detector = Mock() |
|
|
| |
| mock_detector.prev_tool_call_arr = [ |
| { |
| "name": "get_weather", |
| "arguments": {"location": "San Francisco", "unit": "celsius"}, |
| } |
| ] |
| mock_detector.streamed_args_for_tool = [ |
| '{"location": "San Francisco"' |
| ] |
| mock_parser.detector = mock_detector |
|
|
| content = { |
| "meta_info": { |
| "id": "chatcmpl-test123", |
| } |
| } |
|
|
| request = ChatCompletionRequest( |
| model="test", |
| messages=[{"role": "user", "content": "What's the weather?"}], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| ) |
|
|
| |
| result = self.chat._check_for_unstreamed_tool_args( |
| parser=mock_parser, |
| content=content, |
| request=request, |
| index=0, |
| ) |
|
|
| |
| self.assertIsNotNone(result, "Should return chunk with remaining arguments") |
|
|
| |
| self.assertTrue(result.startswith("data: ")) |
| chunk = json.loads(result[6:]) |
| tool_calls = chunk["choices"][0]["delta"]["tool_calls"] |
| self.assertEqual(len(tool_calls), 1) |
| arguments = tool_calls[0]["function"]["arguments"] |
| self.assertIn(', "unit": "celsius"}', arguments) |
|
|
| self.assertIn( |
| '"finish_reason":null', |
| result, |
| "Should not include finish_reason in completion chunk", |
| ) |
|
|
| def test_unstreamed_tool_args_no_completion_needed(self): |
| """Test that no completion chunk is sent when all arguments were already streamed.""" |
|
|
| |
| mock_parser = Mock() |
| mock_detector = Mock() |
|
|
| |
| mock_detector.prev_tool_call_arr = [ |
| {"name": "get_weather", "arguments": {"location": "San Francisco"}} |
| ] |
| mock_detector.streamed_args_for_tool = [ |
| '{"location": "San Francisco"}' |
| ] |
| mock_parser.detector = mock_detector |
|
|
| content = { |
| "meta_info": { |
| "id": "chatcmpl-test123", |
| } |
| } |
|
|
| request = ChatCompletionRequest( |
| model="test", |
| messages=[{"role": "user", "content": "What's the weather?"}], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| ) |
|
|
| |
| result = self.chat._check_for_unstreamed_tool_args( |
| parser=mock_parser, |
| content=content, |
| request=request, |
| index=0, |
| ) |
|
|
| |
| self.assertIsNone(result, "Should return None when no completion is needed") |
|
|
| def test_unstreamed_tool_args_no_parser_data(self): |
| """Test that no completion chunk is sent when parser has no tool call data.""" |
|
|
| |
| mock_parser = Mock() |
| mock_detector = Mock() |
| mock_detector.prev_tool_call_arr = [] |
| mock_detector.streamed_args_for_tool = [] |
| mock_parser.detector = mock_detector |
|
|
| content = { |
| "meta_info": { |
| "id": "chatcmpl-test123", |
| } |
| } |
|
|
| request = ChatCompletionRequest( |
| model="test", |
| messages=[{"role": "user", "content": "What's the weather?"}], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| ) |
|
|
| |
| result = self.chat._check_for_unstreamed_tool_args( |
| parser=mock_parser, |
| content=content, |
| request=request, |
| index=0, |
| ) |
|
|
| |
| self.assertIsNone( |
| result, "Should return None when parser has no tool call data" |
| ) |
|
|
| |
| def test_kimi_k2_non_streaming_tool_call_id_format(self): |
| """Ensure non-streaming tool_call.id matches functions.{name}:{index} for kimi_k2 parser.""" |
|
|
| |
| self.chat.tool_call_parser = "kimi_k2" |
|
|
| |
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.FunctionCallParser" |
| ) as ParserMock: |
| parser_instance = ParserMock.return_value |
|
|
| |
| call_info = Mock() |
| call_info.name = "get_weather" |
| call_info.parameters = '{"city":"Paris"}' |
| call_info.tool_index = 0 |
|
|
| parser_instance.has_tool_call.return_value = True |
| parser_instance.parse_non_stream.return_value = ("", [call_info]) |
|
|
| finish_reason = {"type": "stop", "matched": None} |
| tools = [ |
| {"type": "function", "function": {"name": "get_weather"}}, |
| ] |
|
|
| tool_calls, remaining_text, finish_reason = self.chat._process_tool_calls( |
| text="<|tool_calls_section_begin|>...", |
| tools=tools, |
| finish_reason=finish_reason, |
| ) |
|
|
| self.assertIsNotNone(tool_calls) |
| self.assertEqual(len(tool_calls), 1) |
| self.assertEqual(tool_calls[0].id, "functions.get_weather:0") |
| self.assertEqual(tool_calls[0].function.name, "get_weather") |
|
|
| def test_kimi_k2_streaming_tool_call_id_format(self): |
| """Ensure streaming first chunk tool_call.id matches functions.{name}:{index} for kimi_k2 parser.""" |
|
|
| |
| self.chat.tool_call_parser = "kimi_k2" |
|
|
| |
| req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "Hi?"}], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| stream=True, |
| ) |
|
|
| |
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.FunctionCallParser" |
| ) as ParserMock: |
| parser_instance = ParserMock.return_value |
|
|
| |
| first_chunk_call = Mock() |
| first_chunk_call.tool_index = 0 |
| first_chunk_call.name = "get_weather" |
| first_chunk_call.parameters = "" |
| parser_instance.parse_stream_chunk.side_effect = [ |
| ("", [first_chunk_call]), |
| ("", []), |
| ] |
|
|
| async def collect_first_tool_chunk(): |
| gen = self.chat._process_tool_call_stream( |
| index=0, |
| delta="irrelevant", |
| parser_dict={}, |
| content={"meta_info": {"id": "chatcmpl-test"}}, |
| request=req, |
| has_tool_calls={}, |
| ) |
| |
| line = None |
| async for emitted in gen: |
| line = emitted |
| break |
| return line |
|
|
| loop = get_or_create_event_loop() |
| line = loop.run_until_complete(collect_first_tool_chunk()) |
| self.assertIsNotNone(line) |
| self.assertTrue(line.startswith("data: ")) |
|
|
| payload = json.loads(line[len("data: ") :]) |
| tool_calls = payload["choices"][0]["delta"]["tool_calls"] |
| self.assertEqual(tool_calls[0]["id"], "functions.get_weather:0") |
|
|
| def test_kimi_k2_non_streaming_tool_call_id_with_history(self): |
| """Ensure non-streaming tool_call.id increase with tool calls history for kimi_k2 parser.""" |
|
|
| |
| self.chat.tool_call_parser = "kimi_k2" |
|
|
| |
| req = ChatCompletionRequest( |
| model="x", |
| messages=[ |
| {"role": "user", "content": "What's the weather today in paris?"}, |
| { |
| "role": "assistant", |
| "content": "Let me do some search first.", |
| "tool_calls": [ |
| { |
| "id": "functions.get_weather:0", |
| "type": "function", |
| "function": { |
| "name": "get_weather", |
| "arguments": '{"city": "Paris"}', |
| }, |
| } |
| ], |
| }, |
| { |
| "role": "tool", |
| "content": "It's rainy in paris now.", |
| "tool_call_id": "functions.get_weather:0", |
| }, |
| { |
| "role": "assistant", |
| "content": "It's rainy now.", |
| }, |
| { |
| "role": "user", |
| "content": "What about LA and Tokyo?", |
| }, |
| ], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| stream=False, |
| ) |
|
|
| |
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.FunctionCallParser" |
| ) as ParserMock: |
| parser_instance = ParserMock.return_value |
|
|
| |
| call_info = Mock() |
| call_info.name = "get_weather" |
| call_info.parameters = '{"city":"Loa Angeles"}' |
| |
| |
| call_info.tool_index = 0 |
|
|
| call_info2 = Mock() |
| call_info2.name = "get_weather" |
| call_info2.parameters = '{"city":"Tokyo"}' |
| call_info2.tool_index = 1 |
|
|
| parser_instance.has_tool_call.return_value = True |
| parser_instance.parse_non_stream.return_value = ( |
| "", |
| [call_info, call_info2], |
| ) |
|
|
| finish_reason = {"type": "stop", "matched": None} |
| tools = [ |
| {"type": "function", "function": {"name": "get_weather"}}, |
| ] |
|
|
| history_tool_calls_cnt = self.chat._get_history_tool_calls_cnt(req) |
| tool_calls, remaining_text, _ = self.chat._process_tool_calls( |
| text="<|tool_calls_section_begin|>...", |
| tools=tools, |
| finish_reason=finish_reason, |
| history_tool_calls_cnt=history_tool_calls_cnt, |
| ) |
|
|
| self.assertEqual(history_tool_calls_cnt, 1) |
| self.assertIsNotNone(tool_calls) |
| self.assertEqual(len(tool_calls), 2) |
| self.assertEqual(tool_calls[0].id, "functions.get_weather:1") |
| self.assertEqual(tool_calls[0].function.name, "get_weather") |
| self.assertEqual(tool_calls[1].id, "functions.get_weather:2") |
| self.assertEqual(tool_calls[1].function.name, "get_weather") |
|
|
| def test_kimi_k2_streaming_tool_call_id_with_history(self): |
| """Ensure streaming first chunk tool_call.id increase with tool calls history for kimi_k2 parser.""" |
|
|
| |
| self.chat.tool_call_parser = "kimi_k2" |
|
|
| |
| req = ChatCompletionRequest( |
| model="x", |
| messages=[ |
| {"role": "user", "content": "What's the weather today in paris?"}, |
| { |
| "role": "assistant", |
| "content": "Let me do some search first.", |
| "tool_calls": [ |
| { |
| "id": "functions.get_weather:0", |
| "type": "function", |
| "function": { |
| "name": "get_weather", |
| "arguments": '{"city": "Paris"}', |
| }, |
| } |
| ], |
| }, |
| { |
| "role": "tool", |
| "content": "It's rainy in paris now.", |
| "tool_call_id": "functions.get_weather:0", |
| }, |
| { |
| "role": "assistant", |
| "content": "It's rainy now.", |
| }, |
| { |
| "role": "user", |
| "content": "What about LA?", |
| }, |
| ], |
| tools=[{"type": "function", "function": {"name": "get_weather"}}], |
| stream=True, |
| ) |
|
|
| |
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.FunctionCallParser" |
| ) as ParserMock: |
| parser_instance = ParserMock.return_value |
|
|
| |
| first_chunk_call = Mock() |
| |
| |
| first_chunk_call.tool_index = 0 |
| first_chunk_call.name = "get_weather" |
| first_chunk_call.parameters = "" |
| parser_instance.parse_stream_chunk.side_effect = [ |
| ("", [first_chunk_call]), |
| ("", []), |
| ] |
|
|
| async def collect_first_tool_chunk(): |
| gen = self.chat._process_tool_call_stream( |
| index=0, |
| delta="irrelevant", |
| parser_dict={}, |
| content={"meta_info": {"id": "chatcmpl-test"}}, |
| request=req, |
| has_tool_calls={}, |
| ) |
| |
| line = None |
| async for emitted in gen: |
| line = emitted |
| break |
| return line |
|
|
| loop = get_or_create_event_loop() |
| line = loop.run_until_complete(collect_first_tool_chunk()) |
| self.assertIsNotNone(line) |
| self.assertTrue(line.startswith("data: ")) |
|
|
| payload = json.loads(line[len("data: ") :]) |
| tool_calls = payload["choices"][0]["delta"]["tool_calls"] |
| self.assertEqual(tool_calls[0]["id"], "functions.get_weather:1") |
|
|
| def test_dpsk_v32_encoding_path(self): |
| """Test DeepSeek V3.2 encoding path detection and application.""" |
| from sglang.srt.managers.template_manager import TemplateManager |
| from sglang.srt.server_args import PortArgs, ServerArgs |
|
|
| server_args = ServerArgs(model_path="deepseek-ai/DeepSeek-V3.2") |
| port_args = PortArgs.init_new(server_args) |
|
|
| |
| with patch( |
| "sglang.srt.managers.tokenizer_manager.TokenizerManager" |
| ) as MockTokenizerManager: |
| tokenizer_manager = MockTokenizerManager(server_args, port_args) |
| tokenizer_manager.server_args = server_args |
| tokenizer_manager.model_config = Mock() |
| tokenizer_manager.model_config.get_default_sampling_params.return_value = ( |
| None |
| ) |
|
|
| |
| mock_hf_config = Mock() |
| mock_hf_config.architectures = ["DeepseekV32ForCausalLM"] |
|
|
| tokenizer_manager.model_config.hf_config = mock_hf_config |
|
|
| |
| tokenizer_manager.tokenizer = Mock() |
| tokenizer_manager.tokenizer.chat_template = None |
|
|
| serving_chat = OpenAIServingChat(tokenizer_manager, TemplateManager()) |
| self.assertTrue(serving_chat.use_dpsk_v32_encoding) |
|
|
| |
| tokenizer_manager.tokenizer.chat_template = "some template" |
| serving_chat = OpenAIServingChat(tokenizer_manager, TemplateManager()) |
| self.assertFalse(serving_chat.use_dpsk_v32_encoding) |
|
|
| |
| tokenizer_manager.tokenizer.chat_template = None |
| mock_hf_config.architectures = ["LlamaForCausalLM"] |
| serving_chat = OpenAIServingChat(tokenizer_manager, TemplateManager()) |
| self.assertFalse(serving_chat.use_dpsk_v32_encoding) |
|
|
| def test_streaming_abort_yields_error(self): |
| """Test that an abort finish reason during streaming correctly yields an error and stops.""" |
| err_msg = "Aborted by scheduler" |
| err_code = HTTPStatus.INTERNAL_SERVER_ERROR |
|
|
| async def _mock_generate_abort(): |
| yield { |
| "text": "Partial ", |
| "meta_info": { |
| "id": "chatcmpl-test", |
| "prompt_tokens": 10, |
| "completion_tokens": 2, |
| "cached_tokens": 0, |
| "finish_reason": { |
| "type": "abort", |
| "status_code": err_code, |
| "message": err_msg, |
| }, |
| "output_token_logprobs": None, |
| "output_top_logprobs": None, |
| }, |
| "index": 0, |
| } |
|
|
| self.tm.generate_request.return_value = _mock_generate_abort() |
|
|
| req = ChatCompletionRequest( |
| model="x", |
| messages=[{"role": "user", "content": "Hi?"}], |
| temperature=0.7, |
| max_tokens=100, |
| stream=True, |
| ) |
|
|
| with patch( |
| "sglang.srt.entrypoints.openai.serving_chat.generate_chat_conv" |
| ) as conv_mock: |
| |
| conv_ins = Mock() |
| conv_ins.get_prompt.return_value = "Test prompt" |
| conv_mock.return_value = conv_ins |
|
|
| adapted_request, _ = self.chat._convert_to_internal_request( |
| req, self.fastapi_request |
| ) |
|
|
| async def run_stream(): |
| chunks = [] |
| try: |
| async for chunk in self.chat._generate_chat_stream( |
| adapted_request, req, self.fastapi_request |
| ): |
| chunks.append(chunk) |
| except Exception as e: |
| print(f"Error during stream iteration: {e}") |
| return chunks |
|
|
| loop = get_or_create_event_loop() |
| chunks = loop.run_until_complete(run_stream()) |
|
|
| error_chunk_data = None |
| for c in chunks: |
| if "error" in c: |
| error_chunk_data = json.loads(c[len("data: ") :]) |
| break |
| self.assertIsNotNone(error_chunk_data, "Error chunk not found in stream") |
| self.assertEqual(error_chunk_data["error"]["message"], err_msg) |
| self.assertEqual(error_chunk_data["error"]["code"], err_code.value) |
|
|
| |
| |
| self.assertEqual(chunks[-1], "data: [DONE]\n\n") |
|
|
| |
| self.assertEqual(len(chunks), 2) |
| self.assertIn("error", chunks[0]) |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main(verbosity=2) |
|
|