from typing import Any, Dict, List, cast from unittest.mock import AsyncMock, patch import pytest from api_utils.utils import ( _extract_json_from_text, _get_latest_user_text, collect_and_validate_attachments, generate_sse_stop_chunk_with_usage, maybe_execute_tools, prepare_combined_prompt, ) from models import FunctionCall, Message, MessageContentItem, ToolCall @pytest.fixture def mock_tools_registry(): with ( patch("api_utils.utils_ext.tools_execution.register_runtime_tools") as mock_reg, patch( "api_utils.utils_ext.tools_execution.execute_tool_call", new_callable=AsyncMock, ) as mock_exec, ): yield mock_reg, mock_exec @pytest.fixture def mock_logger(): with patch("logging.getLogger") as mock: yield mock.return_value @pytest.fixture def mock_file_utils(): """Fixture providing mocked file utility functions for cross-platform testing.""" with ( patch("api_utils.utils_ext.files.extract_data_url_to_local") as mock_extract, patch("api_utils.utils_ext.files.save_blob_to_local") as mock_save, # Patch exists globally to avoid conflicts between multiple module-level patches patch("os.path.exists") as mock_exists, patch( "api_utils.utils_ext.prompts.extract_data_url_to_local" ) as mock_extract_prompts, patch("api_utils.utils_ext.prompts.save_blob_to_local") as mock_save_prompts, ): # Configure all mocks to behave consistently mock_exists.return_value = True # Link prompts mocks to files mocks mock_extract_prompts.side_effect = lambda *args, **kwargs: mock_extract( *args, **kwargs ) mock_save_prompts.side_effect = lambda *args, **kwargs: mock_save( *args, **kwargs ) yield mock_extract, mock_save, mock_exists def test_prepare_combined_prompt_basic(mock_logger): """Test basic text message formatting.""" messages = [ Message(role="user", content="Hello"), Message(role="assistant", content="Hi there"), ] prompt, files = prepare_combined_prompt(messages, "req1") assert "User:\nHello" in prompt assert "Assistant:\nHi there" in prompt assert len(files) == 0 def test_prepare_combined_prompt_system(mock_logger): """Test system prompt handling.""" messages = [ Message(role="system", content="Be helpful"), Message(role="user", content="Hi"), ] prompt, files = prepare_combined_prompt(messages, "req1") assert "System Instructions:\nBe helpful" in prompt assert "User:\nHi" in prompt # System message should not be repeated in conversation history if processed assert prompt.count("Be helpful") == 1 def test_prepare_combined_prompt_tools(mock_logger): """Test tool definitions injection.""" tools = [ { "type": "function", "function": { "name": "get_weather", "description": "Get weather", "parameters": {"type": "object"}, }, } ] messages = [Message(role="user", content="Weather?")] prompt, files = prepare_combined_prompt( messages, "req1", tools=tools, tool_choice="auto" ) assert "Available Tools Catalog:" in prompt assert "- Function: get_weather" in prompt assert "Parameter Schema:" in prompt def test_prepare_combined_prompt_multimodal_image(mock_file_utils, mock_logger): """Test image url processing.""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/image.png" # Use dicts for Pydantic parsing content_item = { "type": "image_url", "image_url": {"url": "data:image/png;base64,...", "detail": "high"}, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/image.png" in files assert "[Image Details: detail=high]" in prompt def test_prepare_combined_prompt_multimodal_dict(mock_file_utils, mock_logger): """Test dictionary content processing.""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/image.png" content = [ {"type": "text", "text": "Look at this"}, {"type": "image_url", "image_url": {"url": "data:image...", "detail": "low"}}, ] messages = [Message(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "Look at this" in prompt assert "/tmp/image.png" in files assert "[Image Details: detail=low]" in prompt def test_prepare_combined_prompt_audio(mock_file_utils, mock_logger): """Test audio input processing.""" _, mock_save, _ = mock_file_utils mock_save.return_value = "/tmp/audio.mp3" content_item = { "type": "input_audio", "input_audio": { "data": "SGVsbG8=", # Valid base64 "Hello" "mime_type": "audio/mp3", }, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/audio.mp3" in files def test_prepare_combined_prompt_tool_calls(mock_logger): """Test tool call visualization.""" tool_call = ToolCall( id="call1", type="function", function=FunctionCall(name="get_weather", arguments='{"city": "Paris"}'), ) messages = [Message(role="assistant", content=None, tool_calls=[tool_call])] prompt, files = prepare_combined_prompt(messages, "req1") assert "Request function call: get_weather" in prompt assert '"city": "Paris"' in prompt def test_prepare_combined_prompt_tool_results(mock_logger): """Test tool result inclusion.""" messages = [Message(role="tool", content="Sunny", tool_call_id="call1")] prompt, _ = prepare_combined_prompt(messages, "req1") assert "Tool result (tool_call_id=call1):" in prompt assert "Sunny" in prompt def test_extract_json_from_text(): """Test JSON extraction from text.""" text = 'Here is some json: {"key": "value"} end.' result = _extract_json_from_text(text) assert result == '{"key": "value"}' text_no_json = "Just text" assert _extract_json_from_text(text_no_json) is None text_invalid = "Bad json { key: value }" assert _extract_json_from_text(text_invalid) is None def test_get_latest_user_text(): """Test extracting latest user text.""" messages = [ Message(role="user", content="Hello"), Message(role="assistant", content="Hi"), Message(role="user", content="World"), ] assert _get_latest_user_text(messages) == "World" # Test with list content content = [{"type": "text", "text": "Part1"}, {"type": "text", "text": "Part2"}] messages = [Message(role="user", content=cast(List[MessageContentItem], content))] assert _get_latest_user_text(messages) == "Part1\nPart2" @pytest.mark.asyncio async def test_maybe_execute_tools(mock_tools_registry, mock_logger): """Test explicit tool execution logic.""" _, mock_exec = mock_tools_registry mock_exec.return_value = "success" tools = [{"type": "function", "function": {"name": "test_func"}}] tool_choice = {"type": "function", "function": {"name": "test_func"}} messages = [Message(role="user", content='Call test_func with {"arg": 1}')] results = await maybe_execute_tools(messages, tools, tool_choice) assert results is not None assert len(results) == 1 assert results[0]["name"] == "test_func" assert results[0]["result"] == "success" # Verify arguments extraction (simple JSON in text) # The current implementation of maybe_execute_tools tries to find JSON in user text mock_exec.assert_called_once() call_args = mock_exec.call_args[0] # (name, args, tool_def) assert call_args[0] == "test_func" assert call_args[1] == '{"arg": 1}' # It passes the extracted JSON string def test_collect_and_validate_attachments(mock_file_utils, mock_logger, tmp_path): """Test attachment collection and validation.""" mock_extract, _, mock_exists = mock_file_utils mock_exists.return_value = True valid_initial = str(tmp_path / "initial.png") valid_existing = str(tmp_path / "existing.png") valid_msg_att = str(tmp_path / "msg_att.png") mock_extract.return_value = "/tmp/data.png" # Mock request object class MockMessage: def __init__(self, role, content, attachments=None): self.role = role self.content = content self.attachments = attachments or [] class MockRequest: attachments = [valid_existing, {"url": "data:image/png..."}] messages = [ MockMessage(role="user", content="text", attachments=[valid_msg_att]) ] req = MockRequest() initial_list = [valid_initial] result = collect_and_validate_attachments(req, "req1", initial_list) assert valid_initial in result assert valid_existing in result assert "/tmp/data.png" in result assert valid_msg_att in result def test_generate_sse_stop_chunk_with_usage(): """Test SSE stop chunk generation with usage.""" with patch("api_utils.utils.generate_sse_stop_chunk") as mock_gen: mock_gen.return_value = "SSE_CHUNK" res = generate_sse_stop_chunk_with_usage("req1", "model", {"tokens": 10}) assert res == "SSE_CHUNK" mock_gen.assert_called_with("req1", "model", "stop", {"tokens": 10}) def test_prepare_combined_prompt_complex_dict(mock_file_utils, mock_logger): """Test complex dictionary with attachments.""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/file.pdf" content = { "text": "Analyze this", "attachments": [{"url": "data:application/pdf..."}], } # Use model_construct to bypass validation for test purposes if needed, # or ensure content structure matches what Pydantic expects if possible. # Given the utils.py code handles dict content, we bypass Pydantic check here. messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "Analyze this" in prompt assert "/tmp/file.pdf" in files def test_prepare_combined_prompt_local_files(mock_file_utils, mock_logger): """Test local file paths.""" _, _, mock_exists = mock_file_utils mock_exists.return_value = True content = [{"type": "image_url", "image_url": {"url": "file:///c:/test.png"}}] messages = [Message(role="user", content=cast(List[MessageContentItem], content))] prompt, files = prepare_combined_prompt(messages, "req1") # URL decoding might happen, on windows /c:/test.png -> /c:/test.png or c:\test.png # The code uses unquote(parsed.path) # file:///c:/test.png -> path is /c:/test.png assert len(files) == 1 assert files[0].endswith("test.png") def test_prepare_combined_prompt_system_message_ordering(mock_logger): """Test that system messages are processed and placed correctly.""" messages = [ Message(role="user", content="User 1"), Message(role="system", content="System 1"), Message(role="assistant", content="Assistant 1"), Message(role="system", content="System 2"), ] prompt, _ = prepare_combined_prompt(messages, "req1") # Based on utils.py: # 1. Finds first system message ("System 1") and uses it. # 2. Skips subsequent system messages ("System 2"). assert "User:\nUser 1" in prompt assert "System Instructions:\nSystem 1" in prompt assert "Assistant:\nAssistant 1" in prompt # System 2 should be skipped assert "System 2" not in prompt # Check ordering # System prompt is prepended to the first message usually or handled separately # In utils.py, it is added to combined_parts BEFORE iterating other messages. # So "System 1" should appear before "User 1" idx_s1 = prompt.find("System Instructions:\nSystem 1") idx_u1 = prompt.find("User:\nUser 1") idx_a1 = prompt.find("Assistant:\nAssistant 1") assert idx_s1 < idx_u1 assert idx_u1 < idx_a1 def test_prepare_combined_prompt_complex_attachments_url_schemes( mock_file_utils, mock_logger ): """Test various URL schemes in attachments.""" mock_extract, _, mock_exists = mock_file_utils mock_exists.return_value = True mock_extract.side_effect = lambda url, **kwargs: f"/tmp/{url.split('/')[-1]}" content = { "text": "Check these", "attachments": [ {"url": "http://example.com/http.pdf"}, {"url": "https://example.com/https.jpg"}, {"url": "file:///c:/local.txt"}, ], } messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") # utils.py ignores http/https for attachments, only takes data:, file:, or absolute paths assert len(files) == 1 # file:///c:/local.txt -> unquoted path. local_file_found = any("local.txt" in f for f in files) assert local_file_found def test_prepare_combined_prompt_input_audio_data_url(mock_file_utils, mock_logger): """Test input audio with data URL.""" mock_extract, mock_save, _ = mock_file_utils # utils.py uses extract_data_url_to_local for data: URLs mock_extract.return_value = "/tmp/audio_data.mp3" content_item = { "type": "input_audio", "input_audio": {"data": "data:audio/mp3;base64,SGVsbG8=", "format": "mp3"}, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/audio_data.mp3" in files def test_prepare_combined_prompt_input_video_processing(mock_file_utils, mock_logger): """Test input video processing with various formats.""" mock_extract, mock_save, mock_exists = mock_file_utils mock_exists.return_value = True # 1. Video URL (data:) mock_extract.return_value = "/tmp/video1.mp4" item1 = { "type": "input_video", "input_video": {"url": "data:video/mp4;base64,AAAA"}, } # 2. Video raw base64 data mock_save.return_value = "/tmp/video2.mp4" item2 = { "type": "input_video", "input_video": { "data": "BBBB", # Raw base64 "mime_type": "video/mp4", "format": "mp4", }, } # 3. Local file URL item3 = {"type": "input_video", "input_video": {"url": "file:///c:/video3.mp4"}} messages = [ Message( role="user", content=cast(List[MessageContentItem], [item1, item2, item3]) ) ] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/video1.mp4" in files assert "/tmp/video2.mp4" in files # file:///c:/video3.mp4 -> /c:/video3.mp4 (unquoted) # Windows path handling in test environment might vary, checking endswith assert any(f.endswith("video3.mp4") for f in files) # Check that extract and save were called mock_extract.assert_called() mock_save.assert_called() def test_prepare_combined_prompt_complex_nested_dict( mock_file_utils, mock_logger, tmp_path ): """Test nested dictionary content with specific attachment keys. Uses platform-appropriate paths via tmp_path fixture for cross-platform compatibility. """ mock_extract, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp files for cross-platform path handling img_file = tmp_path / "img1.png" doc_file = tmp_path / "doc1.pdf" img_file.touch() doc_file.touch() content = { "text": "Look at these files", "images": [{"url": img_file.as_uri()}], # Platform-appropriate file URL "files": [{"path": str(doc_file)}], # Platform-appropriate absolute path "media": [{"url": "data:video..."}], # data url } mock_extract.return_value = str(tmp_path / "media.mp4") messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "Look at these files" in prompt assert str(tmp_path / "media.mp4") in files assert any(f.endswith("img1.png") for f in files) assert any(f.endswith("doc1.pdf") for f in files) @pytest.mark.asyncio async def test_maybe_execute_tools_choice_logic(mock_tools_registry, mock_logger): """Test tool_choice logic (none, auto, specific).""" _, mock_exec = mock_tools_registry tools = [{"type": "function", "function": {"name": "func1"}}] messages = [Message(role="user", content="call func1")] # 1. Choice = 'none' -> returns None res = await maybe_execute_tools(messages, tools, "none") assert res is None # 2. Choice = 'auto' with 1 tool -> executes messages_with_json = [Message(role="user", content='call func1 {"arg": 1}')] mock_exec.return_value = "res1" res = await maybe_execute_tools(messages_with_json, tools, "auto") assert res is not None assert res[0]["name"] == "func1" # 3. Choice = specific name res = await maybe_execute_tools(messages_with_json, tools, "func1") assert res is not None assert res[0]["name"] == "func1" # 4. Choice = dict res = await maybe_execute_tools( messages_with_json, tools, {"type": "function", "function": {"name": "func1"}} ) assert res is not None assert res[0]["name"] == "func1" def test_collect_and_validate_attachments_error_handling(mock_file_utils, mock_logger): """Test error handling in attachment collection.""" # Force an error during property access or iteration class BrokenMessage: @property def attachments(self): raise ValueError("Broken") class MockRequest: messages = [BrokenMessage()] initial_list = [] # Should not raise exception, just log warning result = collect_and_validate_attachments(MockRequest(), "req1", initial_list) assert result == [] def test_prepare_combined_prompt_tool_result_list_content(mock_logger): """Test tool result with list content.""" content = [{"type": "text", "text": "Result Part 1"}, "Result Part 2"] # Use model_construct to bypass strict Pydantic validation for mixed content types messages = [ Message.model_construct(role="tool", content=content, tool_call_id="call1") ] prompt, _ = prepare_combined_prompt(messages, "req1") assert "Result Part 1" in prompt assert "Result Part 2" in prompt def test_prepare_combined_prompt_malformed_tool_args(mock_logger): """Test malformed JSON in tool call arguments.""" tool_call = ToolCall( id="call1", type="function", function=FunctionCall(name="func1", arguments="{bad_json"), ) messages = [Message(role="assistant", content=None, tool_calls=[tool_call])] prompt, _ = prepare_combined_prompt(messages, "req1") # Should fall back to raw string assert "{bad_json" in prompt def test_prepare_combined_prompt_unknown_content_type(mock_logger): """Test unknown content type warning.""" class UnknownType: def __str__(self): return "UnknownObj" # Bypass Pydantic validation messages = [Message.model_construct(role="user", content=UnknownType())] prompt, _ = prepare_combined_prompt(messages, "req1") assert "UnknownObj" in prompt def test_prepare_combined_prompt_invalid_base64(mock_file_utils, mock_logger): """Test invalid base64 in audio/video input.""" _, mock_save, _ = mock_file_utils mock_save.return_value = None item = { "type": "input_audio", "input_audio": {"data": "InvalidBase64!!!", "mime_type": "audio/mp3"}, } messages = [Message(role="user", content=cast(List[MessageContentItem], [item]))] # Should not crash, just ignore or log error prompt, files = prepare_combined_prompt(messages, "req1") assert len(files) == 0 @pytest.mark.asyncio async def test_maybe_execute_tools_existing_tool_message( mock_tools_registry, mock_logger ): """Test maybe_execute_tools returns None if tool result already exists.""" messages = [ Message(role="user", content="call func"), Message( role="assistant", content=None, tool_calls=[ ToolCall( id="1", type="function", function=FunctionCall(name="f", arguments="{}"), ) ], ), Message(role="tool", content="result", tool_call_id="1"), ] res = await maybe_execute_tools(messages, [], "auto") assert res is None @pytest.mark.asyncio async def test_maybe_execute_tools_no_choice(mock_tools_registry, mock_logger): """Test maybe_execute_tools returns None if tool_choice is None.""" messages = [Message(role="user", content="call func")] res = await maybe_execute_tools(messages, [], None) assert res is None @pytest.mark.asyncio async def test_maybe_execute_tools_execution_error(mock_tools_registry, mock_logger): """Test maybe_execute_tools handles execution errors gracefully.""" _, mock_exec = mock_tools_registry mock_exec.side_effect = Exception("Execution failed") tools = [{"type": "function", "function": {"name": "func1"}}] messages = [Message(role="user", content='call func1 {"arg": 1}')] res = await maybe_execute_tools(messages, tools, "func1") assert res is None @pytest.mark.asyncio async def test_maybe_execute_tools_fallback_args(mock_tools_registry, mock_logger): """Test maybe_execute_tools uses empty dict when no JSON found.""" _, mock_exec = mock_tools_registry mock_exec.return_value = "res" tools = [{"type": "function", "function": {"name": "func1"}}] # No JSON in content messages = [Message(role="user", content="call func1")] res = await maybe_execute_tools(messages, tools, "func1") assert res is not None assert res[0]["arguments"] == "{}" mock_exec.assert_called_with("func1", "{}") def test_prepare_combined_prompt_tool_choice_string(mock_logger): """Test tool choice injection with specific string name.""" tools = [{"type": "function", "function": {"name": "my_tool"}}] messages = [Message(role="user", content="hi")] # tool_choice as string name prompt, _ = prepare_combined_prompt( messages, "req1", tools=tools, tool_choice="my_tool" ) assert "Recommended function to use: my_tool" in prompt def test_prepare_combined_prompt_tool_choice_dict(mock_logger): """Test tool choice injection with dictionary.""" tools = [{"type": "function", "function": {"name": "my_tool"}}] messages = [Message(role="user", content="hi")] # tool_choice as dict tool_choice = {"type": "function", "function": {"name": "my_tool"}} prompt, _ = prepare_combined_prompt( messages, "req1", tools=tools, tool_choice=tool_choice ) assert "Recommended function to use: my_tool" in prompt def test_prepare_combined_prompt_tools_error(mock_logger): """Test error handling during tools processing.""" class BadTool: def get(self, k): raise ValueError("Bad tool") tools = [BadTool()] messages = [Message(role="user", content="hi")] # Should not crash prompt, _ = prepare_combined_prompt( messages, "req1", tools=cast(List[Dict[str, Any]], tools) ) assert "User:\nhi" in prompt def test_prepare_combined_prompt_empty_content(mock_logger): """Test message with None content.""" messages = [Message.model_construct(role="user", content=None)] prompt, _ = prepare_combined_prompt(messages, "req1") # If it's the only message and content is None/empty, it is skipped. assert prompt == "" def test_prepare_combined_prompt_tool_result_list_exception(mock_logger): """Test tool result list processing exception handling.""" class BadItem: def __str__(self): raise ValueError("Cannot stringify") messages = [ Message.model_construct(role="tool", content=[BadItem()], tool_call_id="1") ] prompt, _ = prepare_combined_prompt(messages, "req1") # It should fall back to str(msg.content) in the except block assert "tool_call_id=1" in prompt def test_collect_and_validate_attachments_top_level( mock_file_utils, mock_logger, tmp_path ): """Test top level attachments in request.""" mock_extract, _, mock_exists = mock_file_utils mock_exists.return_value = True valid_top1 = str(tmp_path / "top1.png") valid_top2 = tmp_path / "top2.png" class MockRequest: attachments = [valid_top1, {"url": valid_top2.as_uri()}] messages = [] req = MockRequest() result = collect_and_validate_attachments(req, "req1", []) assert valid_top1 in result assert any("top2.png" in f for f in result) def test_collect_and_validate_attachments_initial_filter( mock_file_utils, mock_logger, tmp_path ): """Test filtering of initial image list.""" _, _, mock_exists = mock_file_utils exists_path = str(tmp_path / "exists.png") missing_path = str(tmp_path / "missing.png") # mock_exists side effect to filter def side_effect(path): return str(path) == exists_path mock_exists.side_effect = side_effect initial = [exists_path, missing_path, "relative.png"] # Need request object class MockRequest: messages = [] result = collect_and_validate_attachments(MockRequest(), "req1", initial) assert exists_path in result assert missing_path not in result assert "relative.png" not in result def test_prepare_combined_prompt_tool_fallback_name(mock_logger): """Test tool definition fallback when function is not a dict.""" # Case: function is string, name is in top level tools = [{"function": "not_a_dict", "name": "fallback_tool"}] messages = [Message(role="user", content="hi")] prompt, _ = prepare_combined_prompt(messages, "req1", tools=tools) assert "- Function: fallback_tool" in prompt def test_prepare_combined_prompt_tool_params_unserializable(mock_logger): """Test tool params that cannot be serialized.""" class NoJson: def __str__(self): return "NoJson" # Use model_construct or just dict if tools is list of dicts tools = [ { "type": "function", "function": { "name": "bad_params", "parameters": NoJson(), # json.dumps will fail }, } ] messages = [Message(role="user", content="hi")] prompt, _ = prepare_combined_prompt(messages, "req1", tools=tools) # Should contain function name but skip params assert "- Function: bad_params" in prompt assert "Parameter Schema:" not in prompt def test_prepare_combined_prompt_empty_system_message(mock_logger): """Test empty system message handling.""" messages = [ Message(role="system", content=""), # Empty content Message(role="user", content="Hi"), ] prompt, _ = prepare_combined_prompt(messages, "req1") assert "System Instructions:" not in prompt assert "User:\nHi" in prompt def test_prepare_combined_prompt_item_type_exception(mock_logger): """Test exception when accessing item.type.""" class BadItem: @property def type(self): raise ValueError("No type") def __str__(self): return "BadItemStr" messages = [Message.model_construct(role="user", content=[BadItem()])] prompt, _ = prepare_combined_prompt(messages, "req1") # Should swallow exception and ignore the item assert "BadItemStr" not in prompt # Since content is effectively empty and it's the only message, it should be skipped entirely assert prompt == "" def test_prepare_combined_prompt_object_file_url(mock_file_utils, mock_logger): """Test object-style content with file_url/media_url.""" _, _, mock_exists = mock_file_utils mock_exists.return_value = True class UrlObj: def __init__(self, url): self.url = url class ItemWithFileUrl: type = "file_url" file_url = UrlObj("file:///c:/file.txt") class ItemWithMediaUrl: type = "media_url" media_url = UrlObj("file:///c:/media.mp4") messages = [ Message.model_construct( role="user", content=[ItemWithFileUrl(), ItemWithMediaUrl()] ) ] prompt, files = prepare_combined_prompt(messages, "req1") # Check if files were added assert any("file.txt" in f for f in files) assert any("media.mp4" in f for f in files) def test_prepare_combined_prompt_non_existent_local_file(mock_file_utils, mock_logger): """Test non-existent local file URL.""" _, _, mock_exists = mock_file_utils mock_exists.return_value = False # Not exists item = {"type": "file_url", "file_url": {"url": "file:///c:/missing.txt"}} messages = [Message(role="user", content=cast(List[MessageContentItem], [item]))] prompt, files = prepare_combined_prompt(messages, "req1") # Should not be in files list assert len(files) == 0 def test_collect_and_validate_attachments_detailed( mock_file_utils, mock_logger, tmp_path ): """Test detailed attachment collection logic including top-level and various message keys. Uses tmp_path for platform-compatible paths. """ mock_extract, _, mock_exists = mock_file_utils # Create actual temp files for cross-platform path handling valid_initial = tmp_path / "valid_initial.png" valid_top_level = tmp_path / "valid_top_level.png" valid_file_url = tmp_path / "valid_file_url.txt" valid_image = tmp_path / "valid_image.png" valid_file = tmp_path / "valid_file.pdf" valid_media = tmp_path / "valid_media.mp4" missing_initial = tmp_path / "missing_initial.png" missing = tmp_path / "missing.png" # Create "valid" files, leave "missing" files uncreated valid_initial.touch() valid_top_level.touch() valid_file_url.touch() valid_image.touch() valid_file.touch() valid_media.touch() # Mock file existence based on actual file existence def side_effect(path): from pathlib import Path # Handle string paths and Path objects path_str = str(path) # Check if it looks like one of our temp files that we created/didn't create if str(tmp_path) in path_str: return Path(path_str).exists() return True # Default to True for other files logic might check mock_exists.side_effect = side_effect mock_exists.return_value = None # Clear return_value so side_effect is used mock_extract.side_effect = lambda url, **kwargs: f"/tmp/{url.split('/')[-1]}" # Mock request object with various attachment fields class MockRequest: attachments = [ str(valid_top_level), str(missing), {"url": "data:image/png;base64,data1"}, {"url": valid_file_url.as_uri()}, "", # Empty string None, # None {"url": ""}, # Empty URL in dict ] messages = [ Message.model_construct( role="user", content="msg1", images=[str(valid_image)], files=[{"path": str(valid_file)}], media=[valid_media.as_uri()], ) ] req = MockRequest() initial_list = [str(valid_initial), str(missing_initial)] result = collect_and_validate_attachments(req, "req1", initial_list) # Check initial list filtering assert str(valid_initial) in result assert str(missing_initial) not in result # Check top-level attachments assert str(valid_top_level) in result assert str(missing) not in result # data: URL -> extracted (mock extract returns /tmp/...) assert mock_extract.called # Check message-level attachments assert str(valid_image) in result assert str(valid_file) in result # file:// -> unquoted path assert any("valid_media.mp4" in f for f in result) def test_prepare_combined_prompt_tool_edge_cases(mock_logger): """Test edge cases for tool definitions and choice.""" # Malformed tool definition (missing function) tools = [ {"type": "function"}, # Missing function dict {"name": "direct_name_tool"}, # Old style direct dict ] # Tool choice that doesn't match any tool tool_choice = "non_existent_tool" messages = [Message(role="user", content="Hi")] prompt, _ = prepare_combined_prompt( messages, "req1", tools=tools, tool_choice=tool_choice ) # Should handle malformed tool gracefully (skip or partial log) # "direct_name_tool" might be processed if logic allows (it does: t.get('name')) assert "Function: direct_name_tool" in prompt # tool_choice "non_existent_tool" might still be suggested if logic just appends it assert "Recommended function to use: non_existent_tool" in prompt def test_prepare_combined_prompt_content_pydantic_objects(mock_file_utils, mock_logger): """Test content items as objects (simulating Pydantic models) instead of dicts.""" mock_extract, mock_save, _ = mock_file_utils mock_save.return_value = "/tmp/saved.mp3" class InputAudio: data = "SGVsbG8=" # Valid base64 "Hello" mime_type = "audio/mp3" format = "mp3" class ContentItem: type = "input_audio" input_audio = InputAudio() messages = [Message.model_construct(role="user", content=[ContentItem()])] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/saved.mp3" in files def test_prepare_combined_prompt_empty_url_strings(mock_logger): """Test content items with empty URL strings.""" content = [ {"type": "image_url", "image_url": {"url": ""}}, {"type": "image_url", "image_url": {"url": " "}}, {"type": "text", "text": "Valid Text"}, ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "Valid Text" in prompt assert len(files) == 0 def test_prepare_combined_prompt_fallback_str_content(mock_logger): """Test fallback when content is not string/list/dict (e.g. None or unexpected type).""" # None content handled in loop start usually, but let's try weird type messages = [Message.model_construct(role="user", content=12345)] prompt, _ = prepare_combined_prompt(messages, "req1") # Should convert to string assert "12345" in prompt def test_prepare_combined_prompt_input_image_with_detail(mock_file_utils, mock_logger): """Test input_image object with detail field (lines 234-244).""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/input_image.png" class InputImage: url = "data:image/png;base64,iVBORw0KGgo=" detail = "high" class ContentItem: type = "image_url" def __init__(self): self.input_image = InputImage() messages = [Message.model_construct(role="user", content=[ContentItem()])] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/input_image.png" in files assert "[Image Details: detail=high]" in prompt def test_prepare_combined_prompt_dict_image_url_with_detail( mock_file_utils, mock_logger ): """Test dict image_url with detail field (lines 266-268).""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/image_detail.png" # Dict structure with nested image_url dict containing detail content = [ { "type": "image_url", "image_url": {"url": "data:image/png;base64,ABC123", "detail": "auto"}, } ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/image_detail.png" in files assert "[Image Details: detail=auto]" in prompt def test_prepare_combined_prompt_audio_absolute_path( mock_file_utils, mock_logger, tmp_path ): """Test audio with absolute path (lines 427-431). Uses tmp_path for platform-compatible paths. """ _, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp file for cross-platform path handling audio_file = tmp_path / "test.mp3" audio_file.touch() # Absolute path for audio content_item = { "type": "input_audio", "input_audio": {"url": str(audio_file)}, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert str(audio_file) in files def test_prepare_combined_prompt_video_absolute_path( mock_file_utils, mock_logger, tmp_path ): """Test video with absolute path (lines 427-431).""" _, _, mock_exists = mock_file_utils mock_exists.return_value = True video_file = str(tmp_path / "video.mp4") # Absolute path for video content_item = { "type": "input_video", "input_video": {"url": video_file}, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert video_file in files def test_get_latest_user_text_dict_non_text_type(mock_logger): """Test _get_latest_user_text with dict items that are not text type (lines 666-671).""" messages = [ Message.model_construct( role="user", content=[ {"type": "image", "url": "image.png"}, # Not text type {"type": "audio", "data": "..."}, # Not text type {"type": "text", "text": "Valid text"}, ], ) ] result = _get_latest_user_text(messages) # Should only extract text items assert result == "Valid text" def test_get_latest_user_text_dict_content_empty(mock_logger): """Test _get_latest_user_text with dict content that has no text (lines 680-681).""" # Dict content with no text field messages = [ Message.model_construct(role="user", content={"attachments": ["file.pdf"]}) ] result = _get_latest_user_text(messages) assert result == "" def test_get_latest_user_text_no_user_message(mock_logger): """Test _get_latest_user_text with no user messages (lines 680-681).""" messages = [ Message(role="assistant", content="Hello"), Message(role="system", content="System"), ] result = _get_latest_user_text(messages) assert result == "" @pytest.mark.asyncio async def test_maybe_execute_tools_auto_single_tool_name_fallback( mock_tools_registry, mock_logger ): """Test maybe_execute_tools with auto choice, single tool, name at top level (lines 726-728).""" _, mock_exec = mock_tools_registry mock_exec.return_value = "success" # Tool with name at top level, no function dict tools = [{"name": "top_level_func"}] tool_choice = "auto" messages = [Message(role="user", content='call it {"arg": 1}')] results = await maybe_execute_tools(messages, tools, tool_choice) assert results is not None assert results[0]["name"] == "top_level_func" @pytest.mark.asyncio async def test_maybe_execute_tools_required_single_tool( mock_tools_registry, mock_logger ): """Test maybe_execute_tools with 'required' choice (lines 717-728).""" _, mock_exec = mock_tools_registry mock_exec.return_value = "result" tools = [{"type": "function", "function": {"name": "required_func"}}] tool_choice = "required" messages = [Message(role="user", content='{"x": 1}')] results = await maybe_execute_tools(messages, tools, tool_choice) assert results is not None assert results[0]["name"] == "required_func" def test_prepare_combined_prompt_tool_params_json_dumps_error(mock_logger): """Test tool params that raise exception during json.dumps (lines 96-97).""" class UnserializableParams: """Object that cannot be JSON serialized.""" def __iter__(self): raise TypeError("Cannot iterate") # Create tool with unserializable parameters tools = [ { "type": "function", "function": {"name": "bad_tool", "parameters": UnserializableParams()}, } ] messages = [Message(role="user", content="hi")] # Should not crash, just skip params serialization prompt, _ = prepare_combined_prompt(messages, "req1", tools=tools) # Tool name should still be present assert "- Function: bad_tool" in prompt # But params should be skipped due to exception def test_collect_and_validate_attachments_empty_url_handling( mock_file_utils, mock_logger, tmp_path ): """Test attachment collection with empty URL strings (lines 802, 831, 834). Uses tmp_path for platform-compatible paths. """ _, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp files for cross-platform path handling valid_png = tmp_path / "valid.png" valid_image = tmp_path / "valid_image.png" valid_file = tmp_path / "valid_file.pdf" valid_png.touch() valid_image.touch() valid_file.touch() class MockRequest: attachments = [ "", # Empty string " ", # Whitespace only {"url": ""}, # Empty URL in dict {"url": " "}, # Whitespace URL in dict str(valid_png), # Valid path ] messages = [ Message.model_construct( role="user", content="test", images=["", str(valid_image)], files=[{"path": ""}, {"path": str(valid_file)}], ) ] req = MockRequest() result = collect_and_validate_attachments(req, "req1", []) # Only valid paths should be included assert str(valid_png) in result assert str(valid_image) in result assert str(valid_file) in result # Empty strings should be filtered out assert "" not in result assert " " not in result def test_prepare_combined_prompt_dict_input_image_with_detail( mock_file_utils, mock_logger ): """Test dict input_image structure with detail field (lines 271-282).""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/input_img_detail.png" content = [ { "type": "image_url", "input_image": { "url": "data:image/png;base64,XYZ", "detail": "low", }, } ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/input_img_detail.png" in files assert "[Image Details: detail=low]" in prompt def test_prepare_combined_prompt_dict_content_file_field( mock_file_utils, mock_logger, tmp_path ): """Test dict content with generic 'file' field (lines 313-322). Uses tmp_path for platform-compatible paths. """ _, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp file for cross-platform path handling file_path = tmp_path / "from_file_field.png" file_path.touch() content = [ { "type": "image_url", "file": {"url": str(file_path)}, } ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert str(file_path) in files def test_prepare_combined_prompt_dict_content_file_path( mock_file_utils, mock_logger, tmp_path ): """Test dict content with file.path field (lines 318-322).""" _, _, mock_exists = mock_file_utils mock_exists.return_value = True pdf_file = str(tmp_path / "file.pdf") content = [ { "type": "file_url", "file": {"path": pdf_file}, } ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert pdf_file in files def test_prepare_combined_prompt_object_url_attribute( mock_file_utils, mock_logger, tmp_path ): """Test content item with direct url attribute (lines 249-250). Uses tmp_path for platform-compatible paths. """ _, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp file for cross-platform path handling url_path = tmp_path / "direct_url.png" url_path.touch() class UrlItem: type = "image_url" url = str(url_path) messages = [Message.model_construct(role="user", content=[UrlItem()])] prompt, files = prepare_combined_prompt(messages, "req1") assert str(url_path) in files def test_prepare_combined_prompt_dict_attachments_nested_input_image( mock_file_utils, mock_logger ): """Test dict content attachments with nested input_image (lines 496-502).""" mock_extract, _, _ = mock_file_utils mock_extract.return_value = "/tmp/nested_input.png" content = { "text": "Check attachment", "attachments": [ {"input_image": {"url": "data:image/png;base64,NESTED"}}, ], } messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert "Check attachment" in prompt assert "/tmp/nested_input.png" in files def test_prepare_combined_prompt_content_item_input_image_string( mock_file_utils, mock_logger, tmp_path ): """Test content item with input_image as string (lines 283-284). Uses tmp_path for platform-compatible paths. """ _, _, mock_exists = mock_file_utils mock_exists.return_value = True # Create actual temp file for cross-platform path handling img_path = tmp_path / "string_input_image.png" img_path.touch() # This tests the case where item has input_image as a string directly content = [ { "type": "image_url", "input_image": str(img_path), # String, not dict } ] messages = [Message.model_construct(role="user", content=content)] prompt, files = prepare_combined_prompt(messages, "req1") assert str(img_path) in files def test_prepare_combined_prompt_audio_video_data_base64(mock_file_utils, mock_logger): """Test audio/video with raw base64 data not starting with 'data:' (lines 446-459).""" _, mock_save, _ = mock_file_utils mock_save.return_value = "/tmp/base64_audio.mp3" content_item = { "type": "input_audio", "input_audio": { "data": "SGVsbG8gV29ybGQ=", # Not starting with "data:" "mime_type": "audio/mp3", "format": "mp3", }, } messages = [ Message(role="user", content=cast(List[MessageContentItem], [content_item])) ] prompt, files = prepare_combined_prompt(messages, "req1") assert "/tmp/base64_audio.mp3" in files mock_save.assert_called_once() def test_prepare_combined_prompt_tool_result_no_tool_call_id(mock_logger): """Test tool result without tool_call_id (lines 579-586).""" messages = [Message.model_construct(role="tool", content="Result without ID")] prompt, _ = prepare_combined_prompt(messages, "req1") # Should still include content, but no tool_call_id line assert "Result without ID" in prompt assert "tool_call_id=" not in prompt def test_prepare_combined_prompt_assistant_empty_with_tool_calls(mock_logger): """Test assistant message with no content but has tool_calls (line 614).""" tool_call = ToolCall( id="call1", type="function", function=FunctionCall(name="func", arguments="{}"), ) messages = [ Message(role="assistant", content="", tool_calls=[tool_call]) # Empty content ] prompt, _ = prepare_combined_prompt(messages, "req1") assert "Request function call: func" in prompt prompt, _ = prepare_combined_prompt(messages, "req1") # Should include tool call visualization even with empty content assert "Request function call: func" in prompt def test_prepare_combined_prompt_skip_empty_messages_edge_case(mock_logger): """Test edge case for empty message skipping (lines 616-621).""" # Edge case: First message with only role prefix, no content messages = [Message.model_construct(role="assistant", content="")] prompt, _ = prepare_combined_prompt(messages, "req1") # Should be empty since only role prefix and no content assert prompt == ""