import argparse import inspect import json import os import sys from pathlib import Path from urllib.parse import urlsplit from dotenv import load_dotenv from ddgs import DDGS import requests def _load_env() -> None: cwd_env = Path.cwd() / ".env" if cwd_env.exists(): load_dotenv(dotenv_path=cwd_env) else: repo_env = Path(__file__).resolve().parents[0] / ".env" if repo_env.exists(): load_dotenv(dotenv_path=repo_env) if os.getenv("DDGS_PROXY", "") == "": os.environ.pop("DDGS_PROXY", None) def _env_int(name: str, fallback: int | None) -> int | None: value = os.getenv(name) if value is None or value == "": return fallback try: return int(value) except ValueError: return fallback def _env_str(name: str, fallback: str | None) -> str | None: value = os.getenv(name) if value is None or value == "": return fallback return value def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="ddgs-search", description="CLI scaffold for DuckDuckGo Search (DDGS) text search", ) parser.add_argument( "query", nargs="+", help="Search query text", ) parser.add_argument( "--region", default=_env_str("DDGS_REGION", "us-en"), help="Region code (default: DDGS_REGION or us-en)", ) parser.add_argument( "--safesearch", choices=["on", "moderate", "off"], default=_env_str("DDGS_SAFESEARCH", "moderate"), help="Safe search level (default: DDGS_SAFESEARCH or moderate)", ) parser.add_argument( "--timelimit", choices=["d", "w", "m", "y"], default=_env_str("DDGS_TIMELIMIT", None), help="Time limit: d/w/m/y (default: DDGS_TIMELIMIT)", ) parser.add_argument( "--max-results", type=int, default=_env_int("DDGS_MAX_RESULTS", 10), help="Maximum number of results (default: DDGS_MAX_RESULTS or 10)", ) parser.add_argument( "--backend", default=_env_str("DDGS_BACKEND", "auto"), help="Backend to use (default: DDGS_BACKEND or auto)", ) parser.add_argument( "--proxy", default=_env_str("DDGS_PROXY", None), help="Proxy URL (default: DDGS_PROXY)", ) parser.add_argument( "--timeout", type=int, default=_env_int("DDGS_TIMEOUT", 30), help="Request timeout seconds (default: DDGS_TIMEOUT or 30)", ) verify_default = _env_str("DDGS_VERIFY", "true").lower() != "false" parser.add_argument( "--verify", dest="verify", action="store_true", default=verify_default, help="Enable SSL verification (default: DDGS_VERIFY or true)", ) parser.add_argument( "--no-verify", dest="verify", action="store_false", help="Disable SSL verification (not recommended)", ) parser.add_argument( "--format", choices=["json", "jsonl", "text"], default=_env_str("DDGS_OUTPUT", "json"), help="Output format: json, jsonl, text (default: DDGS_OUTPUT or json)", ) return parser def _print_text(results: list[dict]) -> None: for idx, item in enumerate(results, start=1): sys.stdout.write(f"[{idx}]\n") for key in sorted(item.keys()): value = item.get(key, "") sys.stdout.write(f"{key}: {value}\n") sys.stdout.write("\n") def _print_json(results: list[dict], jsonl: bool) -> None: if jsonl: for item in results: sys.stdout.write(json.dumps(item, ensure_ascii=True) + "\n") return sys.stdout.write(json.dumps(results, ensure_ascii=True, indent=2) + "\n") def _resolve_verify(verify_flag: bool) -> bool | str: if not verify_flag: return False ca_bundle = os.getenv("SSL_CERT_FILE") or os.getenv("REQUESTS_CA_BUNDLE") return ca_bundle or True def _is_pdf_url(url: str) -> bool: try: path = urlsplit(url).path.lower() except ValueError: path = url.lower() return path.endswith(".pdf") def _get_markdown_converter(): try: from markitdown import MarkItDown except ImportError: return None session = requests.Session() session.headers.update( { "User-Agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ), "Accept": ( "text/html,application/xhtml+xml,application/xml;q=0.9," "application/json;q=0.8,*/*;q=0.7" ), "Accept-Language": "en-US,en;q=0.9", "DNT": "1", "Upgrade-Insecure-Requests": "1", } ) return MarkItDown(requests_session=session) def _convert_url_to_markdown(converter, url: str) -> str: if hasattr(converter, "convert_uri"): result = converter.convert_uri(url) elif hasattr(converter, "convert_url"): result = converter.convert_url(url) else: result = converter.convert(url) text = getattr(result, "markdown", None) if text is None: text = getattr(result, "text_content", None) if text is None: raise ValueError("MarkItDown result missing markdown content") return text def _attach_markdown(results: list[dict]) -> None: converter = _get_markdown_converter() for item in results: url = item.get("href") or item.get("url") if not url or _is_pdf_url(url): item["markdown"] = None continue if converter is None: item["markdown"] = None item["markdown_error"] = "markitdown_not_installed" continue try: item["markdown"] = _convert_url_to_markdown(converter, url) except Exception as exc: # noqa: BLE001 - surface conversion errors in response item["markdown"] = None item["markdown_error"] = str(exc) def ddgs_search( query: str, *, region: str, safesearch: str, timelimit: str | None, max_results: int, backend: str | None, proxy: str | None, timeout: int, verify: bool, ) -> list[dict]: ddgs_kwargs: dict[str, object] = { "region": region, "safesearch": safesearch, "max_results": max_results, } if timelimit: ddgs_kwargs["timelimit"] = timelimit if backend: ddgs_kwargs["backend"] = backend ddgs_init_kwargs: dict[str, object] = {"timeout": timeout, "verify": _resolve_verify(verify)} if proxy: ddgs_params = inspect.signature(DDGS).parameters if "proxies" in ddgs_params: ddgs_init_kwargs["proxies"] = proxy elif "proxy" in ddgs_params: ddgs_init_kwargs["proxy"] = proxy with DDGS(**ddgs_init_kwargs) as ddgs: results = list(ddgs.text(query, **ddgs_kwargs)) _attach_markdown(results) return results def main(argv: list[str] | None = None) -> int: _load_env() parser = _build_parser() args = parser.parse_args(argv) query = " ".join(args.query) try: results = ddgs_search( query, region=args.region, safesearch=args.safesearch, timelimit=args.timelimit, max_results=args.max_results, backend=args.backend, proxy=args.proxy, timeout=args.timeout, verify=args.verify, ) except Exception as exc: # noqa: BLE001 - CLI should show all errors sys.stderr.write(f"Error: {exc}\n") return 1 if args.format == "text": _print_text(results) elif args.format == "jsonl": _print_json(results, jsonl=True) else: _print_json(results, jsonl=False) return 0 if __name__ == "__main__": raise SystemExit(main())