| import os |
| import random |
| import json |
| import threading |
| import queue |
| import asyncio |
| from typing import Any |
| from urllib.parse import urljoin |
|
|
| import httpx |
|
|
| import time |
| import uuid |
|
|
| class BaseToolkit(): |
| """A tool for assisting test tasks.""" |
|
|
| NAME = "tool_base" |
| DESCRIPTION = f"The tool backbone of all real tools." |
| TIMEOUT = 60 |
| TOOLS_SERVER_BASE_ENDPOINT = [] |
| ENTRY_POINT = "" |
| TOOL_PARAMS = {} |
| TOOL_PARAMS_REQUIRED = [] |
| USE_CACHE = False |
|
|
| def __init__( |
| self, |
| name: str = "", |
| description: str = "", |
| params: dict = {}, |
| required_params: list[str] = [], |
| server_url: str | list[str] = [], |
| entry_point: str = "", |
| timeout: float | None = None, |
| request_id: str = "", |
| use_cache: bool | None = None, |
| is_tongyi_format: bool | None = None, |
| **kwargs, |
| ): |
| |
| |
| cls = type(self) |
| self.name = name or getattr(cls, "NAME", "") |
| self.description = description or getattr(cls, "DESCRIPTION", "") |
| self.params = params or getattr(cls, "TOOL_PARAMS", {}) |
| self.required_params = required_params or getattr(cls, "TOOL_PARAMS_REQUIRED", []) |
| self.server_url = server_url or getattr(cls, "TOOLS_SERVER_BASE_ENDPOINT", "") |
| self.entry_point = entry_point or getattr(cls, "ENTRY_POINT", "") or getattr(cls, "NAME", "") |
| |
| if timeout is not None: |
| self.timeout = timeout |
| else: |
| self.timeout = getattr(cls, "TIMEOUT", 600) |
|
|
| if use_cache is not None: |
| self.use_cache = use_cache |
| else: |
| self.use_cache = getattr(cls, "USE_CACHE", False) |
|
|
| if is_tongyi_format is not None: |
| self.is_tongyi_format = is_tongyi_format |
| else: |
| self.is_tongyi_format = getattr(cls, "USE_TONGYI_FORMAT", None) |
|
|
| self.set_request_id(request_id=request_id) |
|
|
| self._init_client() |
|
|
| def _init_client(self): |
| """ |
| Initialize the HTTP client for making requests. |
| """ |
| |
| self.client = httpx.Client() |
|
|
| @property |
| def json(self): |
| return { |
| "type": "function", |
| "function": { |
| "name": self.name, |
| "description": self.description, |
| "parameters": { |
| "type": "object", |
| "additionalProperties": False, |
| "properties": self.params, |
| "required": self.required_params, |
| }, |
| }, |
| } |
|
|
| def _post(self, pload: dict[str, Any]) -> Any: |
| """ |
| Post request to the tool server and return the response. |
| """ |
|
|
| |
| server_url = random.choice(self.server_url) if isinstance(self.server_url, list) else self.server_url |
| tool_endpoint = urljoin(server_url, self.entry_point) |
|
|
| |
| try: |
| resp = self.client.post(tool_endpoint, json=pload, timeout=self.timeout) |
| if not resp.is_success: |
| return f"{resp.status_code} {resp.text}" |
| data = resp.json() |
| return data.get("result", "") |
| |
| except Exception as e: |
| raise e |
|
|
| |
| |
| def forward(self, **kwargs): |
| """ |
| Execute this tool. |
| |
| Args: |
| keyword arguments: Arguments to be submitted to this tool. |
| |
| Returns: |
| ToolOutput: An object containing either the tool's results or an error message. |
| """ |
| |
| |
| try: |
| payload = {} |
|
|
| |
| |
| timestamp = time.strftime("%Y%m%d%", time.localtime()) |
| if self.request_id: |
| payload["request_id"] = f"{self.request_id}_{self.name}_{timestamp}" |
| else: |
| |
| payload["request_id"] = self.name |
|
|
| payload["use_cache"] = self.use_cache |
| |
| if self.is_tongyi_format is not None: |
| kwargs['is_tongyi_format'] = self.is_tongyi_format |
| payload['params'] = kwargs |
|
|
| conversation_id = kwargs.pop('conversation_id', None) |
| if conversation_id is not None: |
| payload['conversation_id'] = conversation_id |
|
|
| |
| raw = self._post(payload) |
| return raw |
|
|
| except Exception as e: |
| raise e |
| |
| |
|
|
| def set_request_id(self, request_id: str): |
| """ |
| Set the request ID for this tool. |
| """ |
| self.request_id = request_id |
|
|
| def set_use_cache(self, use_cache: bool): |
| """ |
| Set whether to use cache for this tool. |
| """ |
| self.use_cache = use_cache |
|
|
| def set_timeout(self, timeout: float): |
| """ |
| Set the timeout for this tool. |
| """ |
| self.timeout = timeout |
|
|
| |
| def __del__(self): |
| |
| |
| |
| |
| |
| pass |