import json import requests import aiohttp from ..core.types.chat import ChatMessage from ..utils.tool_prompt import TOOL_PROMPT_SYSTEM from ..core.types.endpoint_api import EndpointAPI from ..core.exceptions import QwenAPIError, RateLimitError def using_tools(messages, tools, model, temperature, max_tokens, stream, client): """ Sync version of tool handling - simplified without selection logic """ # Convert tools to individual JSON objects separated by newlines (no array brackets) tools_str = "\n".join([json.dumps(tool, ensure_ascii=False) for tool in tools]) # Create system message with tools info system_content = TOOL_PROMPT_SYSTEM.replace("{list_tools}", tools_str) # Check if first message is already system message if messages and messages[0].role == "system": # Append tools info to existing system message system_content = messages[0].content + "\n\n" + system_content msg_tool = [ChatMessage(role="system", content=system_content)] + messages[1:] else: # Create new system message and include all original messages msg_tool = [ChatMessage(role="system", content=system_content)] + messages payload_tools = client._build_payload( messages=msg_tool, model=model, temperature=temperature, max_tokens=max_tokens ) response_tool = requests.post( url=client.base_url + EndpointAPI.completions, headers=client._build_headers(), json=payload_tools, timeout=client.timeout, stream=stream, ) if not response_tool.ok: error_text = response_tool.text client.logger.error(f"API Error: {response_tool.status_code} {error_text}") raise QwenAPIError(f"API Error: {response_tool.status_code} {error_text}") if response_tool.status_code == 429: client.logger.error("Too many requests") raise RateLimitError("Too many requests") client.logger.info(f"Response status: {response_tool.status_code}") client.logger.info( f"Response content-type: {response_tool.headers.get('content-type', 'unknown')}" ) # Parse tool response directly try: # Check if response is streaming format content_type = response_tool.headers.get("content-type", "") if "text/event-stream" in content_type: # Handle streaming response content = "" for line in response_tool.iter_lines(decode_unicode=True): if line.startswith("data: ") and not line.endswith("[DONE]"): try: data_part = line[6:] # Remove 'data: ' prefix if data_part and data_part != "[DONE]": chunk_data = json.loads(data_part) delta_content = ( chunk_data.get("choices", [{}])[0] .get("delta", {}) .get("content", "") ) if delta_content: content += delta_content except json.JSONDecodeError: continue else: # Handle regular JSON response response_data = response_tool.json() content = ( response_data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) # Check if content contains tool calls tool_calls = None if "" in content and "" in content: # Extract and parse tool calls tool_calls = [] import re # Find all tool_call blocks tool_call_pattern = r"\s*({.*?})\s*" matches = re.findall(tool_call_pattern, content, re.DOTALL) for i, match in enumerate(matches): try: tool_data = json.loads(match) # Create ToolCall object following the schema from ..core.types.response.function_tool import ToolCall, Function function = Function( name=tool_data.get("name", ""), arguments=tool_data.get("arguments", {}), ) tool_call = ToolCall(function=function) tool_calls.append(tool_call) except json.JSONDecodeError: client.logger.warning(f"Failed to parse tool call: {match}") # Clear content if we have tool calls (like OpenAI behavior) if tool_calls: content = "" # Create ChatResponse object from ..core.types.chat import ChatResponse, Choice, Message message = Message(role="assistant", content=content, tool_calls=tool_calls) choice = Choice(message=message, extra=None) chat_response = ChatResponse(choices=choice) return chat_response except Exception as e: client.logger.error(f"Error parsing tool response: {e}") # Return error response from ..core.types.chat import ChatResponse, Choice, Message message = Message( role="assistant", content="Error parsing tool response", tool_calls=None ) choice = Choice(message=message, extra=None) return ChatResponse(choices=choice) async def async_using_tools(messages, tools, model, temperature, max_tokens, client): """ Main function for handling tools - simplified version without selection logic """ # Convert tools to individual JSON objects separated by newlines (no array brackets) tools_str = "\n".join([json.dumps(tool, ensure_ascii=False) for tool in tools]) # Create system message with tools info system_content = TOOL_PROMPT_SYSTEM.replace("{list_tools}", tools_str) # Check if first message is already system message if messages and messages[0].role == "system": # Append tools info to existing system message system_content = messages[0].content + "\n\n" + system_content msg_tool = [ChatMessage(role="system", content=system_content)] + messages[1:] else: # Create new system message and include all original messages msg_tool = [ChatMessage(role="system", content=system_content)] + messages payload_tools = client._build_payload( messages=msg_tool, model=model, temperature=temperature, max_tokens=max_tokens ) session = aiohttp.ClientSession() try: response_tool = await session.post( url=client.base_url + EndpointAPI.completions, headers=client._build_headers(), json=payload_tools, timeout=aiohttp.ClientTimeout(total=client.timeout), ) if not response_tool.ok: error_text = await response_tool.text() client.logger.error(f"API Error: {response_tool.status} {error_text}") raise QwenAPIError(f"API Error: {response_tool.status} {error_text}") if response_tool.status == 429: client.logger.error("Too many requests") raise RateLimitError("Too many requests") client.logger.info(f"Response status: {response_tool.status}") client.logger.info( f"Response content-type: {response_tool.headers.get('content-type', 'unknown')}" ) # Parse tool response directly try: # Check if response is streaming format content_type = response_tool.headers.get("content-type", "") if "text/event-stream" in content_type: # Handle streaming response content = "" async for line in response_tool.content: line_str = line.decode("utf-8").strip() if line_str.startswith("data: ") and not line_str.endswith( "[DONE]" ): try: data_part = line_str[6:] # Remove 'data: ' prefix if data_part and data_part != "[DONE]": chunk_data = json.loads(data_part) delta_content = ( chunk_data.get("choices", [{}])[0] .get("delta", {}) .get("content", "") ) if delta_content: content += delta_content except json.JSONDecodeError: continue else: # Handle regular JSON response response_data = await response_tool.json() content = ( response_data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) # Check if content contains tool calls tool_calls = None if "" in content and "" in content: # Extract and parse tool calls tool_calls = [] import re # Find all tool_call blocks tool_call_pattern = r"\s*({.*?})\s*" matches = re.findall(tool_call_pattern, content, re.DOTALL) for i, match in enumerate(matches): try: tool_data = json.loads(match) # Create ToolCall object following the schema from ..core.types.response.function_tool import ( ToolCall, Function, ) function = Function( name=tool_data.get("name", ""), arguments=tool_data.get("arguments", {}), ) tool_call = ToolCall(function=function) tool_calls.append(tool_call) except json.JSONDecodeError: client.logger.warning(f"Failed to parse tool call: {match}") # Clear content if we have tool calls (like OpenAI behavior) if tool_calls: content = "" # Create ChatResponse object from ..core.types.chat import ChatResponse, Choice, Message message = Message(role="assistant", content=content, tool_calls=tool_calls) choice = Choice(message=message, extra=None) chat_response = ChatResponse(choices=choice) return chat_response except Exception as e: client.logger.error(f"Error parsing tool response: {e}") # Return error response from ..core.types.chat import ChatResponse, Choice, Message message = Message( role="assistant", content="Error parsing tool response", tool_calls=None ) choice = Choice(message=message, extra=None) return ChatResponse(choices=choice) finally: await session.close()