Spaces:
Running
Running
| import argparse | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from typing import Optional, Union | |
| import requests | |
| from agent_base.tools.tooling import ToolBase | |
| from agent_base.utils import PROJECT_ROOT, env_flag, load_dotenv | |
| DEFAULT_WEBFETCH_TIMEOUT_SECONDS = 300.0 | |
| DEFAULT_WEBFETCH_MAX_CHARS = 16384 | |
| def webfetch_timeout_seconds() -> float: | |
| timeout = float(os.getenv("WEBFETCH_TIMEOUT_SECONDS", str(DEFAULT_WEBFETCH_TIMEOUT_SECONDS))) | |
| if timeout <= 0: | |
| raise ValueError("WEBFETCH_TIMEOUT_SECONDS must be > 0.") | |
| return timeout | |
| def webfetch_default_max_chars() -> int: | |
| max_chars = int(os.getenv("WEBFETCH_MAX_CHARS", str(DEFAULT_WEBFETCH_MAX_CHARS))) | |
| if max_chars <= 0: | |
| raise ValueError("WEBFETCH_MAX_CHARS must be > 0.") | |
| return max_chars | |
| def search_debug_enabled() -> bool: | |
| return env_flag("DEBUG_SEARCH") | |
| def scholar_debug_enabled() -> bool: | |
| return env_flag("DEBUG_SCHOLAR") | |
| def visit_debug_enabled() -> bool: | |
| return env_flag("DEBUG_VISIT") | |
| def _request_error_text(exc: requests.RequestException) -> str: | |
| response = getattr(exc, "response", None) | |
| if response is None: | |
| return str(exc) | |
| body = response.text.strip() | |
| if len(body) > 1000: | |
| body = body[:1000] + "...(truncated)" | |
| return f"{exc}; response_body={body}" if body else str(exc) | |
| def _clean_webpage_text(text: str) -> str: | |
| text = str(text or "").replace("\r\n", "\n").replace("\r", "\n").replace("\x00", "") | |
| text = re.sub(r"[ \t]+\n", "\n", text) | |
| text = re.sub(r"\n{4,}", "\n\n\n", text) | |
| return text.strip() | |
| class WebSearch(ToolBase): | |
| name = "WebSearch" | |
| description = "Perform one Google web search and return the top results. Call WebSearch multiple times for multiple queries." | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query.", | |
| }, | |
| }, | |
| "required": ["query"], | |
| } | |
| def __init__(self, cfg: Optional[dict] = None): | |
| super().__init__(cfg) | |
| def google_search_with_serp(self, query: str): | |
| def contains_chinese_basic(text: str) -> bool: | |
| return any("\u4E00" <= char <= "\u9FFF" for char in text) | |
| if contains_chinese_basic(query): | |
| payload = { | |
| "q": query, | |
| "location": "China", | |
| "gl": "cn", | |
| "hl": "zh-cn", | |
| } | |
| else: | |
| payload = { | |
| "q": query, | |
| "location": "United States", | |
| "gl": "us", | |
| "hl": "en", | |
| } | |
| serper_key = os.getenv("SERPER_KEY", "").strip() | |
| if not serper_key: | |
| return "[WebSearch] SERPER_KEY is not set." | |
| headers = { | |
| "X-API-KEY": serper_key, | |
| "Content-Type": "application/json", | |
| } | |
| last_error = "" | |
| res = None | |
| for i in range(5): | |
| try: | |
| res = requests.post( | |
| "https://google.serper.dev/search", | |
| json=payload, | |
| headers=headers, | |
| timeout=20, | |
| ) | |
| res.raise_for_status() | |
| break | |
| except requests.RequestException as exc: | |
| last_error = _request_error_text(exc) | |
| if search_debug_enabled(): | |
| print(exc) | |
| if i == 4: | |
| return f"[WebSearch] Request failed for '{query}': {last_error}" | |
| if res is None: | |
| return f"[WebSearch] Request failed for '{query}': {last_error or 'unknown error'}" | |
| try: | |
| results = res.json() | |
| except ValueError as exc: | |
| return f"[WebSearch] Invalid JSON response for '{query}': {exc}" | |
| organic_results = results.get("organic") | |
| if not isinstance(organic_results, list) or not organic_results: | |
| return f"No results found for '{query}'. Try with a more general query." | |
| web_snippets = [] | |
| for idx, page in enumerate(organic_results, start=1): | |
| if not isinstance(page, dict): | |
| continue | |
| title = str(page.get("title", "Untitled result")) | |
| link = str(page.get("link", "")) | |
| date_published = f"\nDate published: {page['date']}" if "date" in page else "" | |
| source = f"\nSource: {page['source']}" if "source" in page else "" | |
| snippet = f"\n{page['snippet']}" if "snippet" in page else "" | |
| redacted_version = f"{idx}. [{title}]({link}){date_published}{source}\n{snippet}" | |
| redacted_version = redacted_version.replace("Your browser can't play this video.", "") | |
| web_snippets.append(redacted_version) | |
| if not web_snippets: | |
| return f"No results found for '{query}'. Try with a more general query." | |
| content = f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n" + "\n\n".join(web_snippets) | |
| return content | |
| def call(self, params: Union[str, dict], **kwargs) -> str: | |
| try: | |
| params = self.parse_json_args(params) | |
| query = params["query"] | |
| except ValueError as exc: | |
| return f"[WebSearch] {exc}" | |
| if not isinstance(query, str) or not query.strip(): | |
| return "[WebSearch] 'query' must be a non-empty string." | |
| return self.google_search_with_serp(query.strip()) | |
| class ScholarSearch(ToolBase): | |
| name = "ScholarSearch" | |
| description = "Run one academic search through Google Scholar and return relevant publication results. Call ScholarSearch multiple times for multiple queries." | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query for Google Scholar.", | |
| }, | |
| }, | |
| "required": ["query"], | |
| } | |
| def __init__(self, cfg: Optional[dict] = None): | |
| super().__init__(cfg) | |
| def google_scholar_with_serp(self, query: str): | |
| payload = {"q": query} | |
| serper_key = os.getenv("SERPER_KEY", "").strip() | |
| if not serper_key: | |
| return "[ScholarSearch] SERPER_KEY is not set." | |
| headers = { | |
| "X-API-KEY": serper_key, | |
| "Content-Type": "application/json", | |
| } | |
| last_error = "" | |
| res = None | |
| for i in range(5): | |
| try: | |
| res = requests.post( | |
| "https://google.serper.dev/scholar", | |
| json=payload, | |
| headers=headers, | |
| timeout=20, | |
| ) | |
| res.raise_for_status() | |
| break | |
| except requests.RequestException as exc: | |
| last_error = _request_error_text(exc) | |
| if scholar_debug_enabled(): | |
| print(exc) | |
| if i == 4: | |
| return f"[ScholarSearch] Request failed for '{query}': {last_error}" | |
| if res is None: | |
| return f"[ScholarSearch] Request failed for '{query}': {last_error or 'unknown error'}" | |
| try: | |
| results = res.json() | |
| except ValueError as exc: | |
| return f"[ScholarSearch] Invalid JSON response for '{query}': {exc}" | |
| organic_results = results.get("organic") | |
| if not isinstance(organic_results, list) or not organic_results: | |
| return f"No results found for '{query}'. Try with a more general query." | |
| web_snippets = [] | |
| for idx, page in enumerate(organic_results, start=1): | |
| if not isinstance(page, dict): | |
| continue | |
| title = str(page.get("title", "Untitled result")) | |
| date_published = f"\nDate published: {page['year']}" if "year" in page else "" | |
| publication_info = f"\npublicationInfo: {page['publicationInfo']}" if "publicationInfo" in page else "" | |
| snippet = f"\n{page['snippet']}" if "snippet" in page else "" | |
| link_info = "no available link" | |
| if "pdfUrl" in page: | |
| link_info = "pdfUrl: " + str(page["pdfUrl"]) | |
| cited_by = f"\ncitedBy: {page['citedBy']}" if "citedBy" in page else "" | |
| redacted_version = f"{idx}. [{title}]({link_info}){publication_info}{date_published}{cited_by}\n{snippet}" | |
| redacted_version = redacted_version.replace("Your browser can't play this video.", "") | |
| web_snippets.append(redacted_version) | |
| if not web_snippets: | |
| return f"No results found for '{query}'. Try with a more general query." | |
| content = f"A Google scholar for '{query}' found {len(web_snippets)} results:\n\n## Scholar Results\n" + "\n\n".join(web_snippets) | |
| return content | |
| def call(self, params: Union[str, dict], **kwargs) -> str: | |
| try: | |
| params = self.parse_json_args(params) | |
| query = params["query"] | |
| except ValueError as exc: | |
| return f"[ScholarSearch] {exc}" | |
| if not isinstance(query, str) or not query.strip(): | |
| return "[ScholarSearch] 'query' must be a non-empty string." | |
| return self.google_scholar_with_serp(query.strip()) | |
| class WebFetch(ToolBase): | |
| name = "WebFetch" | |
| description = "Fetch webpage content and return cleaned, range-bounded page text for the agent to inspect." | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "url": { | |
| "type": "string", | |
| "description": "The URL of the webpage to visit. Call WebFetch multiple times for multiple URLs.", | |
| }, | |
| "start_line": { | |
| "type": "integer", | |
| "description": "Optional 1-based start line for partial reading. Default is 1.", | |
| }, | |
| "end_line": { | |
| "type": "integer", | |
| "description": "Optional 1-based end line for partial reading. If omitted, read to the end.", | |
| }, | |
| "max_chars": { | |
| "type": "integer", | |
| "description": "Maximum number of characters to return. Default is 16384 and the value must not exceed WEBFETCH_MAX_CHARS.", | |
| }, | |
| }, | |
| "required": ["url"], | |
| } | |
| def __init__(self, cfg: Optional[dict] = None): | |
| super().__init__(cfg) | |
| def _remaining_budget_seconds(runtime_deadline: Optional[float]) -> Optional[float]: | |
| if runtime_deadline is None: | |
| return None | |
| return runtime_deadline - time.time() | |
| def _webfetch_deadline(runtime_deadline: Optional[float]) -> float: | |
| tool_deadline = time.time() + webfetch_timeout_seconds() | |
| if runtime_deadline is None: | |
| return tool_deadline | |
| return min(float(runtime_deadline), tool_deadline) | |
| def _format_page_content( | |
| *, | |
| url: str, | |
| content: str, | |
| start_line: int, | |
| end_line: Optional[int], | |
| max_chars: int, | |
| ) -> str: | |
| clean_content = _clean_webpage_text(content) | |
| lines = clean_content.splitlines() | |
| selected_lines = lines[start_line - 1:end_line] | |
| selected_content = "\n".join(selected_lines) | |
| truncated = len(selected_content) > max_chars | |
| returned_content = selected_content[:max_chars] if truncated else selected_content | |
| effective_end_line = end_line if end_line is not None else len(lines) | |
| meta = [ | |
| f"url: {url}", | |
| "source_type: web", | |
| f"start_line: {start_line}", | |
| f"end_line: {effective_end_line}", | |
| f"total_lines: {len(lines)}", | |
| f"total_chars: {len(clean_content)}", | |
| f"max_chars: {max_chars}", | |
| f"returned_chars: {len(returned_content)}", | |
| f"truncated: {str(truncated).lower()}", | |
| ] | |
| if truncated: | |
| meta.append("note: content was truncated by max_chars; use a narrower line range, or raise max_chars only up to WEBFETCH_MAX_CHARS if this call used a smaller value.") | |
| return "\n".join(meta) + "\ncontent:\n" + returned_content | |
| def call(self, params: Union[str, dict], **kwargs) -> str: | |
| try: | |
| params = self.parse_json_args(params) | |
| url = params["url"] | |
| except ValueError as exc: | |
| return f"[WebFetch] {exc}" | |
| try: | |
| start_line = int(params.get("start_line", 1)) | |
| end_line_raw = params.get("end_line") | |
| end_line = int(end_line_raw) if end_line_raw is not None else None | |
| max_chars_limit = webfetch_default_max_chars() | |
| max_chars_raw = params.get("max_chars") | |
| max_chars = int(max_chars_raw) if max_chars_raw is not None else max_chars_limit | |
| except (TypeError, ValueError): | |
| return "[WebFetch] start_line, end_line, and max_chars must be integers when provided." | |
| if start_line < 1: | |
| return "[WebFetch] start_line must be >= 1." | |
| if end_line is not None and end_line < start_line: | |
| return "[WebFetch] end_line must be >= start_line." | |
| if max_chars <= 0: | |
| return "[WebFetch] max_chars must be > 0." | |
| if max_chars > max_chars_limit: | |
| return f"[WebFetch] max_chars must be <= WEBFETCH_MAX_CHARS ({max_chars_limit}). Use a narrower line range to read more of the page." | |
| try: | |
| runtime_deadline = self._webfetch_deadline(kwargs.get("runtime_deadline")) | |
| except ValueError as exc: | |
| return f"[WebFetch] {exc}" | |
| response = self.readpage_jina( | |
| url, | |
| start_line=start_line, | |
| end_line=end_line, | |
| max_chars=max_chars, | |
| runtime_deadline=runtime_deadline, | |
| ) | |
| if visit_debug_enabled(): | |
| print(f"WebFetch Length {len(response)}") | |
| return response.strip() | |
| def jina_readpage(self, url: str, runtime_deadline: Optional[float] = None) -> str: | |
| max_retries = 3 | |
| timeout = 50 | |
| jina_api_key = os.getenv("JINA_KEY", "").strip() | |
| if not jina_api_key: | |
| return "[WebFetch] JINA_KEY is not set." | |
| last_error = "unknown page-fetch error" | |
| for attempt in range(max_retries): | |
| headers = { | |
| "Authorization": f"Bearer {jina_api_key}", | |
| } | |
| try: | |
| remaining = self._remaining_budget_seconds(runtime_deadline) | |
| if remaining is not None and remaining <= 0: | |
| return "[WebFetch] Failed to read page: agent runtime limit reached." | |
| response = requests.get( | |
| f"https://r.jina.ai/{url}", | |
| headers=headers, | |
| timeout=min(timeout, max(remaining, 0.001)) if remaining is not None else timeout, | |
| ) | |
| if response.status_code == 200: | |
| return response.text | |
| if visit_debug_enabled(): | |
| print(response.text) | |
| last_error = f"HTTP {response.status_code}: {response.text[:200]}" | |
| except requests.RequestException as exc: | |
| last_error = str(exc) | |
| remaining = self._remaining_budget_seconds(runtime_deadline) | |
| if remaining is not None and remaining <= 0: | |
| return "[WebFetch] Failed to read page: agent runtime limit reached." | |
| time.sleep(min(0.5, remaining) if remaining is not None else 0.5) | |
| if attempt == max_retries - 1: | |
| return f"[WebFetch] Failed to read page: {last_error}" | |
| return f"[WebFetch] Failed to read page: {last_error}" | |
| def html_readpage_jina(self, url: str, runtime_deadline: Optional[float] = None) -> str: | |
| max_attempts = 8 | |
| for _ in range(max_attempts): | |
| remaining = self._remaining_budget_seconds(runtime_deadline) | |
| if remaining is not None and remaining <= 0: | |
| return "[WebFetch] Failed to read page: agent runtime limit reached." | |
| content = self.jina_readpage(url, runtime_deadline=runtime_deadline) | |
| if content and not content.startswith("[WebFetch] Failed to read page:") and content != "[WebFetch] Empty content." and not content.startswith("[document_parser]"): | |
| return content | |
| return "[WebFetch] Failed to read page: exhausted retries" | |
| def readpage_jina( | |
| self, | |
| url: str, | |
| *, | |
| start_line: int = 1, | |
| end_line: Optional[int] = None, | |
| max_chars: int = DEFAULT_WEBFETCH_MAX_CHARS, | |
| runtime_deadline: Optional[float] = None, | |
| ) -> str: | |
| content = self.html_readpage_jina(url, runtime_deadline=runtime_deadline) | |
| if not content or content.startswith("[WebFetch] Failed to read page:") or content == "[WebFetch] Empty content." or content.startswith("[document_parser]"): | |
| return "[WebFetch] Failed to read page: the provided webpage content could not be accessed. Please check the URL or file format." | |
| return self._format_page_content( | |
| url=url, | |
| content=content, | |
| start_line=start_line, | |
| end_line=end_line, | |
| max_chars=max_chars, | |
| ) | |
| def main(argv: Optional[list[str]] = None) -> int: | |
| parser = argparse.ArgumentParser(description="Run web tools directly.") | |
| subparsers = parser.add_subparsers(dest="tool", required=True) | |
| search_parser = subparsers.add_parser("search", help="Run WebSearch.") | |
| search_parser.add_argument("query", nargs="+") | |
| scholar_parser = subparsers.add_parser("scholar", help="Run ScholarSearch.") | |
| scholar_parser.add_argument("query", nargs="+") | |
| fetch_parser = subparsers.add_parser("fetch", help="Run WebFetch.") | |
| fetch_parser.add_argument("url") | |
| fetch_parser.add_argument("--start-line", type=int, default=1) | |
| fetch_parser.add_argument("--end-line", type=int) | |
| fetch_parser.add_argument("--max-chars", type=int) | |
| args = parser.parse_args(argv) | |
| load_dotenv(PROJECT_ROOT / ".env") | |
| if args.tool == "search": | |
| result = WebSearch().call({"query": " ".join(args.query)}) | |
| elif args.tool == "scholar": | |
| result = ScholarSearch().call({"query": " ".join(args.query)}) | |
| else: | |
| result = WebFetch().call( | |
| { | |
| "url": args.url, | |
| "start_line": args.start_line, | |
| "end_line": args.end_line, | |
| "max_chars": args.max_chars if args.max_chars is not None else webfetch_default_max_chars(), | |
| } | |
| ) | |
| print(result) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) | |