| | import argparse |
| | import inspect |
| | import json |
| | import os |
| | import sys |
| | from pathlib import Path |
| | from urllib.parse import urlsplit |
| |
|
| | from dotenv import load_dotenv |
| | from ddgs import DDGS |
| | import requests |
| |
|
| |
|
| | def _load_env() -> None: |
| | cwd_env = Path.cwd() / ".env" |
| | if cwd_env.exists(): |
| | load_dotenv(dotenv_path=cwd_env) |
| | else: |
| | repo_env = Path(__file__).resolve().parents[0] / ".env" |
| | if repo_env.exists(): |
| | load_dotenv(dotenv_path=repo_env) |
| |
|
| | if os.getenv("DDGS_PROXY", "") == "": |
| | os.environ.pop("DDGS_PROXY", None) |
| |
|
| |
|
| | def _env_int(name: str, fallback: int | None) -> int | None: |
| | value = os.getenv(name) |
| | if value is None or value == "": |
| | return fallback |
| | try: |
| | return int(value) |
| | except ValueError: |
| | return fallback |
| |
|
| |
|
| | def _env_str(name: str, fallback: str | None) -> str | None: |
| | value = os.getenv(name) |
| | if value is None or value == "": |
| | return fallback |
| | return value |
| |
|
| |
|
| | def _build_parser() -> argparse.ArgumentParser: |
| | parser = argparse.ArgumentParser( |
| | prog="ddgs-search", |
| | description="CLI scaffold for DuckDuckGo Search (DDGS) text search", |
| | ) |
| | parser.add_argument( |
| | "query", |
| | nargs="+", |
| | help="Search query text", |
| | ) |
| | parser.add_argument( |
| | "--region", |
| | default=_env_str("DDGS_REGION", "us-en"), |
| | help="Region code (default: DDGS_REGION or us-en)", |
| | ) |
| | parser.add_argument( |
| | "--safesearch", |
| | choices=["on", "moderate", "off"], |
| | default=_env_str("DDGS_SAFESEARCH", "moderate"), |
| | help="Safe search level (default: DDGS_SAFESEARCH or moderate)", |
| | ) |
| | parser.add_argument( |
| | "--timelimit", |
| | choices=["d", "w", "m", "y"], |
| | default=_env_str("DDGS_TIMELIMIT", None), |
| | help="Time limit: d/w/m/y (default: DDGS_TIMELIMIT)", |
| | ) |
| | parser.add_argument( |
| | "--max-results", |
| | type=int, |
| | default=_env_int("DDGS_MAX_RESULTS", 10), |
| | help="Maximum number of results (default: DDGS_MAX_RESULTS or 10)", |
| | ) |
| | parser.add_argument( |
| | "--backend", |
| | default=_env_str("DDGS_BACKEND", "auto"), |
| | help="Backend to use (default: DDGS_BACKEND or auto)", |
| | ) |
| | parser.add_argument( |
| | "--proxy", |
| | default=_env_str("DDGS_PROXY", None), |
| | help="Proxy URL (default: DDGS_PROXY)", |
| | ) |
| | parser.add_argument( |
| | "--timeout", |
| | type=int, |
| | default=_env_int("DDGS_TIMEOUT", 30), |
| | help="Request timeout seconds (default: DDGS_TIMEOUT or 30)", |
| | ) |
| | verify_default = _env_str("DDGS_VERIFY", "true").lower() != "false" |
| | parser.add_argument( |
| | "--verify", |
| | dest="verify", |
| | action="store_true", |
| | default=verify_default, |
| | help="Enable SSL verification (default: DDGS_VERIFY or true)", |
| | ) |
| | parser.add_argument( |
| | "--no-verify", |
| | dest="verify", |
| | action="store_false", |
| | help="Disable SSL verification (not recommended)", |
| | ) |
| | parser.add_argument( |
| | "--format", |
| | choices=["json", "jsonl", "text"], |
| | default=_env_str("DDGS_OUTPUT", "json"), |
| | help="Output format: json, jsonl, text (default: DDGS_OUTPUT or json)", |
| | ) |
| | return parser |
| |
|
| |
|
| | def _print_text(results: list[dict]) -> None: |
| | for idx, item in enumerate(results, start=1): |
| | sys.stdout.write(f"[{idx}]\n") |
| | for key in sorted(item.keys()): |
| | value = item.get(key, "") |
| | sys.stdout.write(f"{key}: {value}\n") |
| | sys.stdout.write("\n") |
| |
|
| |
|
| | def _print_json(results: list[dict], jsonl: bool) -> None: |
| | if jsonl: |
| | for item in results: |
| | sys.stdout.write(json.dumps(item, ensure_ascii=True) + "\n") |
| | return |
| | sys.stdout.write(json.dumps(results, ensure_ascii=True, indent=2) + "\n") |
| |
|
| |
|
| | def _resolve_verify(verify_flag: bool) -> bool | str: |
| | if not verify_flag: |
| | return False |
| | ca_bundle = os.getenv("SSL_CERT_FILE") or os.getenv("REQUESTS_CA_BUNDLE") |
| | return ca_bundle or True |
| |
|
| |
|
| | def _is_pdf_url(url: str) -> bool: |
| | try: |
| | path = urlsplit(url).path.lower() |
| | except ValueError: |
| | path = url.lower() |
| | return path.endswith(".pdf") |
| |
|
| |
|
| | def _get_markdown_converter(): |
| | try: |
| | from markitdown import MarkItDown |
| | except ImportError: |
| | return None |
| | session = requests.Session() |
| | session.headers.update( |
| | { |
| | "User-Agent": ( |
| | "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " |
| | "AppleWebKit/537.36 (KHTML, like Gecko) " |
| | "Chrome/122.0.0.0 Safari/537.36" |
| | ), |
| | "Accept": ( |
| | "text/html,application/xhtml+xml,application/xml;q=0.9," |
| | "application/json;q=0.8,*/*;q=0.7" |
| | ), |
| | "Accept-Language": "en-US,en;q=0.9", |
| | "DNT": "1", |
| | "Upgrade-Insecure-Requests": "1", |
| | } |
| | ) |
| | return MarkItDown(requests_session=session) |
| |
|
| |
|
| | def _convert_url_to_markdown(converter, url: str) -> str: |
| | if hasattr(converter, "convert_uri"): |
| | result = converter.convert_uri(url) |
| | elif hasattr(converter, "convert_url"): |
| | result = converter.convert_url(url) |
| | else: |
| | result = converter.convert(url) |
| |
|
| | text = getattr(result, "markdown", None) |
| | if text is None: |
| | text = getattr(result, "text_content", None) |
| | if text is None: |
| | raise ValueError("MarkItDown result missing markdown content") |
| | return text |
| |
|
| |
|
| | def _attach_markdown(results: list[dict]) -> None: |
| | converter = _get_markdown_converter() |
| | for item in results: |
| | url = item.get("href") or item.get("url") |
| | if not url or _is_pdf_url(url): |
| | item["markdown"] = None |
| | continue |
| | if converter is None: |
| | item["markdown"] = None |
| | item["markdown_error"] = "markitdown_not_installed" |
| | continue |
| | try: |
| | item["markdown"] = _convert_url_to_markdown(converter, url) |
| | except Exception as exc: |
| | item["markdown"] = None |
| | item["markdown_error"] = str(exc) |
| |
|
| |
|
| | def ddgs_search( |
| | query: str, |
| | *, |
| | region: str, |
| | safesearch: str, |
| | timelimit: str | None, |
| | max_results: int, |
| | backend: str | None, |
| | proxy: str | None, |
| | timeout: int, |
| | verify: bool, |
| | ) -> list[dict]: |
| | ddgs_kwargs: dict[str, object] = { |
| | "region": region, |
| | "safesearch": safesearch, |
| | "max_results": max_results, |
| | } |
| | if timelimit: |
| | ddgs_kwargs["timelimit"] = timelimit |
| | if backend: |
| | ddgs_kwargs["backend"] = backend |
| |
|
| | ddgs_init_kwargs: dict[str, object] = {"timeout": timeout, "verify": _resolve_verify(verify)} |
| | if proxy: |
| | ddgs_params = inspect.signature(DDGS).parameters |
| | if "proxies" in ddgs_params: |
| | ddgs_init_kwargs["proxies"] = proxy |
| | elif "proxy" in ddgs_params: |
| | ddgs_init_kwargs["proxy"] = proxy |
| |
|
| | with DDGS(**ddgs_init_kwargs) as ddgs: |
| | results = list(ddgs.text(query, **ddgs_kwargs)) |
| |
|
| | _attach_markdown(results) |
| | return results |
| |
|
| |
|
| | def main(argv: list[str] | None = None) -> int: |
| | _load_env() |
| | parser = _build_parser() |
| | args = parser.parse_args(argv) |
| |
|
| | query = " ".join(args.query) |
| |
|
| | try: |
| | results = ddgs_search( |
| | query, |
| | region=args.region, |
| | safesearch=args.safesearch, |
| | timelimit=args.timelimit, |
| | max_results=args.max_results, |
| | backend=args.backend, |
| | proxy=args.proxy, |
| | timeout=args.timeout, |
| | verify=args.verify, |
| | ) |
| | except Exception as exc: |
| | sys.stderr.write(f"Error: {exc}\n") |
| | return 1 |
| |
|
| | if args.format == "text": |
| | _print_text(results) |
| | elif args.format == "jsonl": |
| | _print_json(results, jsonl=True) |
| | else: |
| | _print_json(results, jsonl=False) |
| |
|
| | return 0 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | raise SystemExit(main()) |
| |
|