Spaces:
Sleeping
Sleeping
| """Search dispatch helpers using DuckDuckGo and Tavily.""" | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Optional, Tuple | |
| from config import Configuration | |
| from utils import ( | |
| deduplicate_and_format_sources, | |
| format_sources, | |
| get_config_value, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| MAX_TOKENS_PER_SOURCE = 2000 | |
| def _search_duckduckgo(query: str, max_results: int = 5) -> dict[str, Any]: | |
| """Execute search using DuckDuckGo.""" | |
| try: | |
| from ddgs import DDGS | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| formatted_results = [] | |
| for r in results: | |
| formatted_results.append({ | |
| "title": r.get("title", ""), | |
| "url": r.get("href", r.get("link", "")), | |
| "content": r.get("body", r.get("snippet", "")), | |
| }) | |
| return { | |
| "results": formatted_results, | |
| "backend": "duckduckgo", | |
| "answer": None, | |
| "notices": [], | |
| } | |
| except Exception as e: | |
| logger.exception("DuckDuckGo search failed: %s", e) | |
| return { | |
| "results": [], | |
| "backend": "duckduckgo", | |
| "answer": None, | |
| "notices": [f"Search failed: {str(e)}"], | |
| } | |
| def _search_tavily(query: str, max_results: int = 5) -> dict[str, Any]: | |
| """Execute search using Tavily API.""" | |
| try: | |
| import os | |
| from tavily import TavilyClient | |
| api_key = os.getenv("TAVILY_API_KEY") | |
| if not api_key: | |
| return { | |
| "results": [], | |
| "backend": "tavily", | |
| "answer": None, | |
| "notices": ["Missing TAVILY_API_KEY environment variable"], | |
| } | |
| client = TavilyClient(api_key=api_key) | |
| response = client.search(query, max_results=max_results) | |
| formatted_results = [] | |
| for r in response.get("results", []): | |
| formatted_results.append({ | |
| "title": r.get("title", ""), | |
| "url": r.get("url", ""), | |
| "content": r.get("content", ""), | |
| "raw_content": r.get("raw_content"), | |
| }) | |
| return { | |
| "results": formatted_results, | |
| "backend": "tavily", | |
| "answer": response.get("answer"), | |
| "notices": [], | |
| } | |
| except Exception as e: | |
| logger.exception("Tavily search failed: %s", e) | |
| return { | |
| "results": [], | |
| "backend": "tavily", | |
| "answer": None, | |
| "notices": [f"Search failed: {str(e)}"], | |
| } | |
| def _search_perplexity(query: str, max_results: int = 5) -> dict[str, Any]: | |
| """Execute search using Perplexity API.""" | |
| try: | |
| import os | |
| from openai import OpenAI | |
| api_key = os.getenv("PERPLEXITY_API_KEY") | |
| if not api_key: | |
| return { | |
| "results": [], | |
| "backend": "perplexity", | |
| "answer": None, | |
| "notices": ["Missing PERPLEXITY_API_KEY environment variable"], | |
| } | |
| client = OpenAI(api_key=api_key, base_url="https://api.perplexity.ai") | |
| response = client.chat.completions.create( | |
| model="llama-3.1-sonar-small-128k-online", | |
| messages=[{"role": "user", "content": query}], | |
| ) | |
| answer = response.choices[0].message.content if response.choices else None | |
| # Perplexity returns answer text, not structured results | |
| return { | |
| "results": [{ | |
| "title": "Perplexity Answer", | |
| "url": "", | |
| "content": answer or "", | |
| }] if answer else [], | |
| "backend": "perplexity", | |
| "answer": answer, | |
| "notices": [], | |
| } | |
| except Exception as e: | |
| logger.exception("Perplexity search failed: %s", e) | |
| return { | |
| "results": [], | |
| "backend": "perplexity", | |
| "answer": None, | |
| "notices": [f"Search failed: {str(e)}"], | |
| } | |
| def _search_searxng(query: str, max_results: int = 5, base_url: str = "http://localhost:8888") -> dict[str, Any]: | |
| """Execute search using SearXNG instance.""" | |
| try: | |
| import requests | |
| params = { | |
| "q": query, | |
| "format": "json", | |
| "engines": "google,bing,duckduckgo", | |
| } | |
| response = requests.get(f"{base_url}/search", params=params, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| formatted_results = [] | |
| for r in data.get("results", [])[:max_results]: | |
| formatted_results.append({ | |
| "title": r.get("title", ""), | |
| "url": r.get("url", ""), | |
| "content": r.get("content", ""), | |
| }) | |
| return { | |
| "results": formatted_results, | |
| "backend": "searxng", | |
| "answer": None, | |
| "notices": [], | |
| } | |
| except Exception as e: | |
| logger.exception("SearXNG search failed: %s", e) | |
| return { | |
| "results": [], | |
| "backend": "searxng", | |
| "answer": None, | |
| "notices": [f"Search failed: {str(e)}"], | |
| } | |
| def dispatch_search( | |
| query: str, | |
| config: Configuration, | |
| loop_count: int, | |
| ) -> Tuple[dict[str, Any] | None, list[str], Optional[str], str]: | |
| """Execute configured search backend and normalize response payload.""" | |
| search_api = get_config_value(config.search_api) | |
| max_results = 5 | |
| try: | |
| if search_api == "tavily": | |
| payload = _search_tavily(query, max_results) | |
| elif search_api == "perplexity": | |
| payload = _search_perplexity(query, max_results) | |
| elif search_api == "searxng": | |
| payload = _search_searxng(query, max_results) | |
| elif search_api == "advanced": | |
| # Try Tavily first, fall back to DuckDuckGo | |
| payload = _search_tavily(query, max_results) | |
| if not payload.get("results"): | |
| payload = _search_duckduckgo(query, max_results) | |
| else: | |
| # Default to DuckDuckGo | |
| payload = _search_duckduckgo(query, max_results) | |
| except Exception as exc: | |
| logger.exception("Search backend %s failed: %s", search_api, exc) | |
| raise | |
| notices = list(payload.get("notices") or []) | |
| backend_label = str(payload.get("backend") or search_api) | |
| answer_text = payload.get("answer") | |
| results = payload.get("results", []) | |
| if notices: | |
| for notice in notices: | |
| logger.info("Search notice (%s): %s", backend_label, notice) | |
| logger.info( | |
| "Search backend=%s resolved_backend=%s answer=%s results=%s", | |
| search_api, | |
| backend_label, | |
| bool(answer_text), | |
| len(results), | |
| ) | |
| return payload, notices, answer_text, backend_label | |
| def prepare_research_context( | |
| search_result: dict[str, Any] | None, | |
| answer_text: Optional[str], | |
| config: Configuration, | |
| ) -> tuple[str, str]: | |
| """Build structured context and source summary for downstream agents.""" | |
| sources_summary = format_sources(search_result) | |
| context = deduplicate_and_format_sources( | |
| search_result or {"results": []}, | |
| max_tokens_per_source=MAX_TOKENS_PER_SOURCE, | |
| fetch_full_page=config.fetch_full_page, | |
| ) | |
| if answer_text: | |
| context = f"AI Direct Answer:\n{answer_text}\n\n{context}" | |
| return sources_summary, context | |