"""DuckDuckGo HTML web search tool. This mirrors Claw Code's Rust WebSearch behavior: fetch DuckDuckGo's HTML endpoint, extract result links, optionally filter domains, and return a JSON payload the model can cite. """ from __future__ import annotations import asyncio import html import json import os import time from dataclasses import dataclass from html.parser import HTMLParser from typing import Any from urllib.parse import parse_qsl, parse_qs, urlencode, urlparse, urlunparse import requests DEFAULT_SEARCH_URL = "https://html.duckduckgo.com/html/" WEB_SEARCH_BASE_URL_ENV = "CLAWD_WEB_SEARCH_BASE_URL" USER_AGENT = "clawd-rust-tools/0.1" REQUEST_TIMEOUT_SECONDS = 20 MAX_RESULTS = 8 @dataclass(frozen=True) class SearchHit: title: str url: str def as_json(self) -> dict[str, str]: return {"title": self.title, "url": self.url} class _AnchorParser(HTMLParser): def __init__(self, *, require_result_class: bool) -> None: super().__init__(convert_charrefs=True) self.require_result_class = require_result_class self.hits: list[tuple[str, str]] = [] self._active_href: str | None = None self._active_text: list[str] = [] def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag.lower() != "a": return attr_map = {key.lower(): value or "" for key, value in attrs} href = attr_map.get("href") if not href: return if self.require_result_class and "result__a" not in attr_map.get("class", ""): return self._active_href = href self._active_text = [] def handle_data(self, data: str) -> None: if self._active_href is not None: self._active_text.append(data) def handle_entityref(self, name: str) -> None: if self._active_href is not None: self._active_text.append(f"&{name};") def handle_charref(self, name: str) -> None: if self._active_href is not None: self._active_text.append(f"&#{name};") def handle_endtag(self, tag: str) -> None: if tag.lower() != "a" or self._active_href is None: return title = collapse_whitespace(html.unescape("".join(self._active_text))).strip() self.hits.append((self._active_href, title)) self._active_href = None self._active_text = [] def build_search_url(query: str) -> str: base = os.environ.get(WEB_SEARCH_BASE_URL_ENV, DEFAULT_SEARCH_URL) parsed = urlparse(base) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError(f"invalid search base URL: {base}") query_pairs = parse_qsl(parsed.query, keep_blank_values=True) query_pairs.append(("q", query)) return urlunparse(parsed._replace(query=urlencode(query_pairs))) def collapse_whitespace(value: str) -> str: return " ".join(value.split()) def decode_duckduckgo_redirect(url: str) -> str | None: if url.startswith("http://") or url.startswith("https://"): return html.unescape(url) if url.startswith("//"): joined = f"https:{url}" elif url.startswith("/"): joined = f"https://duckduckgo.com{url}" else: return None parsed = urlparse(joined) if parsed.path in {"/l", "/l/"}: uddg = parse_qs(parsed.query).get("uddg", []) if uddg: return html.unescape(uddg[0]) return joined def _extract_links(search_html: str, *, require_result_class: bool) -> list[SearchHit]: parser = _AnchorParser(require_result_class=require_result_class) parser.feed(search_html) hits: list[SearchHit] = [] for raw_url, title in parser.hits: if not title: continue decoded_url = decode_duckduckgo_redirect(raw_url) if decoded_url and ( decoded_url.startswith("http://") or decoded_url.startswith("https://") ): hits.append(SearchHit(title=title, url=decoded_url)) return hits def extract_search_hits(search_html: str) -> list[SearchHit]: return _extract_links(search_html, require_result_class=True) def extract_search_hits_from_generic_links(search_html: str) -> list[SearchHit]: return _extract_links(search_html, require_result_class=False) def normalize_domain_filter(domain: str) -> str: trimmed = domain.strip() parsed = urlparse(trimmed) candidate = parsed.hostname if parsed.scheme and parsed.hostname else trimmed return candidate.strip().lstrip(".").rstrip("/").lower() def host_matches_list(url: str, domains: list[str]) -> bool: host = urlparse(url).hostname if not host: return False normalized_host = host.lower() for domain in domains: normalized = normalize_domain_filter(domain) if normalized and ( normalized_host == normalized or normalized_host.endswith(f".{normalized}") ): return True return False def dedupe_hits(hits: list[SearchHit]) -> list[SearchHit]: seen: set[str] = set() deduped: list[SearchHit] = [] for hit in hits: if hit.url in seen: continue seen.add(hit.url) deduped.append(hit) return deduped def execute_web_search( query: str, allowed_domains: list[str] | None = None, blocked_domains: list[str] | None = None, tool_use_id: str = "web_search_1", ) -> dict[str, Any]: started = time.monotonic() search_url = build_search_url(query) response = requests.get( search_url, headers={"User-Agent": USER_AGENT}, timeout=REQUEST_TIMEOUT_SECONDS, allow_redirects=True, ) hits = extract_search_hits(response.text) if not hits and urlparse(response.url or search_url).hostname: hits = extract_search_hits_from_generic_links(response.text) if allowed_domains is not None: hits = [hit for hit in hits if host_matches_list(hit.url, allowed_domains)] if blocked_domains is not None: hits = [hit for hit in hits if not host_matches_list(hit.url, blocked_domains)] hits = dedupe_hits(hits)[:MAX_RESULTS] rendered_hits = "\n".join(f"- [{hit.title}]({hit.url})" for hit in hits) if hits: summary = ( f"Search results for {query!r}. Include a Sources section in the final answer.\n" f"{rendered_hits}" ) else: summary = f"No web search results matched the query {query!r}." return { "query": query, "results": [ summary, { "tool_use_id": tool_use_id, "content": [hit.as_json() for hit in hits], }, ], "durationSeconds": time.monotonic() - started, } WEB_SEARCH_TOOL_SPEC = { "name": "web_search", "description": "Search the web for current information and return cited results.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "minLength": 2}, "allowed_domains": { "type": "array", "items": {"type": "string"}, "description": "Optional allowlist of domains or URLs. Subdomains match.", }, "blocked_domains": { "type": "array", "items": {"type": "string"}, "description": "Optional blocklist of domains or URLs. Subdomains match.", }, }, "required": ["query"], "additionalProperties": False, }, } def _optional_string_list(arguments: dict[str, Any], key: str) -> list[str] | None: value = arguments.get(key) if value is None: return None if not isinstance(value, list) or not all(isinstance(item, str) for item in value): raise ValueError(f"{key} must be an array of strings") return value async def web_search_handler( arguments: dict[str, Any], session: Any = None, tool_call_id: str | None = None, **_kw: Any, ) -> tuple[str, bool]: query_value = arguments.get("query", "") if not isinstance(query_value, str): return ( "Error: web_search requires a query string with at least 2 characters.", False, ) query = query_value.strip() if len(query) < 2: return "Error: web_search requires a query with at least 2 characters.", False try: output = await asyncio.to_thread( execute_web_search, query=query, allowed_domains=_optional_string_list(arguments, "allowed_domains"), blocked_domains=_optional_string_list(arguments, "blocked_domains"), tool_use_id=tool_call_id or "web_search_1", ) except Exception as exc: return f"Error executing web search: {exc}", False return json.dumps(output, indent=2), True