Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """DuckDuckGo HTML web search tool. | |
| This mirrors Claw Code's Rust WebSearch behavior: fetch DuckDuckGo's HTML | |
| endpoint, extract result links, optionally filter domains, and return a | |
| JSON payload the model can cite. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import html | |
| import json | |
| import os | |
| import time | |
| from dataclasses import dataclass | |
| from html.parser import HTMLParser | |
| from typing import Any | |
| from urllib.parse import parse_qsl, parse_qs, urlencode, urlparse, urlunparse | |
| import requests | |
| DEFAULT_SEARCH_URL = "https://html.duckduckgo.com/html/" | |
| WEB_SEARCH_BASE_URL_ENV = "CLAWD_WEB_SEARCH_BASE_URL" | |
| USER_AGENT = "clawd-rust-tools/0.1" | |
| REQUEST_TIMEOUT_SECONDS = 20 | |
| MAX_RESULTS = 8 | |
| class SearchHit: | |
| title: str | |
| url: str | |
| def as_json(self) -> dict[str, str]: | |
| return {"title": self.title, "url": self.url} | |
| class _AnchorParser(HTMLParser): | |
| def __init__(self, *, require_result_class: bool) -> None: | |
| super().__init__(convert_charrefs=True) | |
| self.require_result_class = require_result_class | |
| self.hits: list[tuple[str, str]] = [] | |
| self._active_href: str | None = None | |
| self._active_text: list[str] = [] | |
| def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: | |
| if tag.lower() != "a": | |
| return | |
| attr_map = {key.lower(): value or "" for key, value in attrs} | |
| href = attr_map.get("href") | |
| if not href: | |
| return | |
| if self.require_result_class and "result__a" not in attr_map.get("class", ""): | |
| return | |
| self._active_href = href | |
| self._active_text = [] | |
| def handle_data(self, data: str) -> None: | |
| if self._active_href is not None: | |
| self._active_text.append(data) | |
| def handle_entityref(self, name: str) -> None: | |
| if self._active_href is not None: | |
| self._active_text.append(f"&{name};") | |
| def handle_charref(self, name: str) -> None: | |
| if self._active_href is not None: | |
| self._active_text.append(f"&#{name};") | |
| def handle_endtag(self, tag: str) -> None: | |
| if tag.lower() != "a" or self._active_href is None: | |
| return | |
| title = collapse_whitespace(html.unescape("".join(self._active_text))).strip() | |
| self.hits.append((self._active_href, title)) | |
| self._active_href = None | |
| self._active_text = [] | |
| def build_search_url(query: str) -> str: | |
| base = os.environ.get(WEB_SEARCH_BASE_URL_ENV, DEFAULT_SEARCH_URL) | |
| parsed = urlparse(base) | |
| if parsed.scheme not in {"http", "https"} or not parsed.netloc: | |
| raise ValueError(f"invalid search base URL: {base}") | |
| query_pairs = parse_qsl(parsed.query, keep_blank_values=True) | |
| query_pairs.append(("q", query)) | |
| return urlunparse(parsed._replace(query=urlencode(query_pairs))) | |
| def collapse_whitespace(value: str) -> str: | |
| return " ".join(value.split()) | |
| def decode_duckduckgo_redirect(url: str) -> str | None: | |
| if url.startswith("http://") or url.startswith("https://"): | |
| return html.unescape(url) | |
| if url.startswith("//"): | |
| joined = f"https:{url}" | |
| elif url.startswith("/"): | |
| joined = f"https://duckduckgo.com{url}" | |
| else: | |
| return None | |
| parsed = urlparse(joined) | |
| if parsed.path in {"/l", "/l/"}: | |
| uddg = parse_qs(parsed.query).get("uddg", []) | |
| if uddg: | |
| return html.unescape(uddg[0]) | |
| return joined | |
| def _extract_links(search_html: str, *, require_result_class: bool) -> list[SearchHit]: | |
| parser = _AnchorParser(require_result_class=require_result_class) | |
| parser.feed(search_html) | |
| hits: list[SearchHit] = [] | |
| for raw_url, title in parser.hits: | |
| if not title: | |
| continue | |
| decoded_url = decode_duckduckgo_redirect(raw_url) | |
| if decoded_url and ( | |
| decoded_url.startswith("http://") or decoded_url.startswith("https://") | |
| ): | |
| hits.append(SearchHit(title=title, url=decoded_url)) | |
| return hits | |
| def extract_search_hits(search_html: str) -> list[SearchHit]: | |
| return _extract_links(search_html, require_result_class=True) | |
| def extract_search_hits_from_generic_links(search_html: str) -> list[SearchHit]: | |
| return _extract_links(search_html, require_result_class=False) | |
| def normalize_domain_filter(domain: str) -> str: | |
| trimmed = domain.strip() | |
| parsed = urlparse(trimmed) | |
| candidate = parsed.hostname if parsed.scheme and parsed.hostname else trimmed | |
| return candidate.strip().lstrip(".").rstrip("/").lower() | |
| def host_matches_list(url: str, domains: list[str]) -> bool: | |
| host = urlparse(url).hostname | |
| if not host: | |
| return False | |
| normalized_host = host.lower() | |
| for domain in domains: | |
| normalized = normalize_domain_filter(domain) | |
| if normalized and ( | |
| normalized_host == normalized or normalized_host.endswith(f".{normalized}") | |
| ): | |
| return True | |
| return False | |
| def dedupe_hits(hits: list[SearchHit]) -> list[SearchHit]: | |
| seen: set[str] = set() | |
| deduped: list[SearchHit] = [] | |
| for hit in hits: | |
| if hit.url in seen: | |
| continue | |
| seen.add(hit.url) | |
| deduped.append(hit) | |
| return deduped | |
| def execute_web_search( | |
| query: str, | |
| allowed_domains: list[str] | None = None, | |
| blocked_domains: list[str] | None = None, | |
| tool_use_id: str = "web_search_1", | |
| ) -> dict[str, Any]: | |
| started = time.monotonic() | |
| search_url = build_search_url(query) | |
| response = requests.get( | |
| search_url, | |
| headers={"User-Agent": USER_AGENT}, | |
| timeout=REQUEST_TIMEOUT_SECONDS, | |
| allow_redirects=True, | |
| ) | |
| hits = extract_search_hits(response.text) | |
| if not hits and urlparse(response.url or search_url).hostname: | |
| hits = extract_search_hits_from_generic_links(response.text) | |
| if allowed_domains is not None: | |
| hits = [hit for hit in hits if host_matches_list(hit.url, allowed_domains)] | |
| if blocked_domains is not None: | |
| hits = [hit for hit in hits if not host_matches_list(hit.url, blocked_domains)] | |
| hits = dedupe_hits(hits)[:MAX_RESULTS] | |
| rendered_hits = "\n".join(f"- [{hit.title}]({hit.url})" for hit in hits) | |
| if hits: | |
| summary = ( | |
| f"Search results for {query!r}. Include a Sources section in the final answer.\n" | |
| f"{rendered_hits}" | |
| ) | |
| else: | |
| summary = f"No web search results matched the query {query!r}." | |
| return { | |
| "query": query, | |
| "results": [ | |
| summary, | |
| { | |
| "tool_use_id": tool_use_id, | |
| "content": [hit.as_json() for hit in hits], | |
| }, | |
| ], | |
| "durationSeconds": time.monotonic() - started, | |
| } | |
| WEB_SEARCH_TOOL_SPEC = { | |
| "name": "web_search", | |
| "description": "Search the web for current information and return cited results.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "minLength": 2}, | |
| "allowed_domains": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Optional allowlist of domains or URLs. Subdomains match.", | |
| }, | |
| "blocked_domains": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Optional blocklist of domains or URLs. Subdomains match.", | |
| }, | |
| }, | |
| "required": ["query"], | |
| "additionalProperties": False, | |
| }, | |
| } | |
| def _optional_string_list(arguments: dict[str, Any], key: str) -> list[str] | None: | |
| value = arguments.get(key) | |
| if value is None: | |
| return None | |
| if not isinstance(value, list) or not all(isinstance(item, str) for item in value): | |
| raise ValueError(f"{key} must be an array of strings") | |
| return value | |
| async def web_search_handler( | |
| arguments: dict[str, Any], | |
| session: Any = None, | |
| tool_call_id: str | None = None, | |
| **_kw: Any, | |
| ) -> tuple[str, bool]: | |
| query_value = arguments.get("query", "") | |
| if not isinstance(query_value, str): | |
| return ( | |
| "Error: web_search requires a query string with at least 2 characters.", | |
| False, | |
| ) | |
| query = query_value.strip() | |
| if len(query) < 2: | |
| return "Error: web_search requires a query with at least 2 characters.", False | |
| try: | |
| output = await asyncio.to_thread( | |
| execute_web_search, | |
| query=query, | |
| allowed_domains=_optional_string_list(arguments, "allowed_domains"), | |
| blocked_domains=_optional_string_list(arguments, "blocked_domains"), | |
| tool_use_id=tool_call_id or "web_search_1", | |
| ) | |
| except Exception as exc: | |
| return f"Error executing web search: {exc}", False | |
| return json.dumps(output, indent=2), True | |