pikamomo's picture
Initial deployment
a60c0af
"""Search dispatch helpers using DuckDuckGo and Tavily."""
from __future__ import annotations
import logging
from typing import Any, Optional, Tuple
from config import Configuration
from utils import (
deduplicate_and_format_sources,
format_sources,
get_config_value,
)
logger = logging.getLogger(__name__)
MAX_TOKENS_PER_SOURCE = 2000
def _search_duckduckgo(query: str, max_results: int = 5) -> dict[str, Any]:
"""Execute search using DuckDuckGo."""
try:
from ddgs import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
formatted_results = []
for r in results:
formatted_results.append({
"title": r.get("title", ""),
"url": r.get("href", r.get("link", "")),
"content": r.get("body", r.get("snippet", "")),
})
return {
"results": formatted_results,
"backend": "duckduckgo",
"answer": None,
"notices": [],
}
except Exception as e:
logger.exception("DuckDuckGo search failed: %s", e)
return {
"results": [],
"backend": "duckduckgo",
"answer": None,
"notices": [f"Search failed: {str(e)}"],
}
def _search_tavily(query: str, max_results: int = 5) -> dict[str, Any]:
"""Execute search using Tavily API."""
try:
import os
from tavily import TavilyClient
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
return {
"results": [],
"backend": "tavily",
"answer": None,
"notices": ["Missing TAVILY_API_KEY environment variable"],
}
client = TavilyClient(api_key=api_key)
response = client.search(query, max_results=max_results)
formatted_results = []
for r in response.get("results", []):
formatted_results.append({
"title": r.get("title", ""),
"url": r.get("url", ""),
"content": r.get("content", ""),
"raw_content": r.get("raw_content"),
})
return {
"results": formatted_results,
"backend": "tavily",
"answer": response.get("answer"),
"notices": [],
}
except Exception as e:
logger.exception("Tavily search failed: %s", e)
return {
"results": [],
"backend": "tavily",
"answer": None,
"notices": [f"Search failed: {str(e)}"],
}
def _search_perplexity(query: str, max_results: int = 5) -> dict[str, Any]:
"""Execute search using Perplexity API."""
try:
import os
from openai import OpenAI
api_key = os.getenv("PERPLEXITY_API_KEY")
if not api_key:
return {
"results": [],
"backend": "perplexity",
"answer": None,
"notices": ["Missing PERPLEXITY_API_KEY environment variable"],
}
client = OpenAI(api_key=api_key, base_url="https://api.perplexity.ai")
response = client.chat.completions.create(
model="llama-3.1-sonar-small-128k-online",
messages=[{"role": "user", "content": query}],
)
answer = response.choices[0].message.content if response.choices else None
# Perplexity returns answer text, not structured results
return {
"results": [{
"title": "Perplexity Answer",
"url": "",
"content": answer or "",
}] if answer else [],
"backend": "perplexity",
"answer": answer,
"notices": [],
}
except Exception as e:
logger.exception("Perplexity search failed: %s", e)
return {
"results": [],
"backend": "perplexity",
"answer": None,
"notices": [f"Search failed: {str(e)}"],
}
def _search_searxng(query: str, max_results: int = 5, base_url: str = "http://localhost:8888") -> dict[str, Any]:
"""Execute search using SearXNG instance."""
try:
import requests
params = {
"q": query,
"format": "json",
"engines": "google,bing,duckduckgo",
}
response = requests.get(f"{base_url}/search", params=params, timeout=30)
response.raise_for_status()
data = response.json()
formatted_results = []
for r in data.get("results", [])[:max_results]:
formatted_results.append({
"title": r.get("title", ""),
"url": r.get("url", ""),
"content": r.get("content", ""),
})
return {
"results": formatted_results,
"backend": "searxng",
"answer": None,
"notices": [],
}
except Exception as e:
logger.exception("SearXNG search failed: %s", e)
return {
"results": [],
"backend": "searxng",
"answer": None,
"notices": [f"Search failed: {str(e)}"],
}
def dispatch_search(
query: str,
config: Configuration,
loop_count: int,
) -> Tuple[dict[str, Any] | None, list[str], Optional[str], str]:
"""Execute configured search backend and normalize response payload."""
search_api = get_config_value(config.search_api)
max_results = 5
try:
if search_api == "tavily":
payload = _search_tavily(query, max_results)
elif search_api == "perplexity":
payload = _search_perplexity(query, max_results)
elif search_api == "searxng":
payload = _search_searxng(query, max_results)
elif search_api == "advanced":
# Try Tavily first, fall back to DuckDuckGo
payload = _search_tavily(query, max_results)
if not payload.get("results"):
payload = _search_duckduckgo(query, max_results)
else:
# Default to DuckDuckGo
payload = _search_duckduckgo(query, max_results)
except Exception as exc:
logger.exception("Search backend %s failed: %s", search_api, exc)
raise
notices = list(payload.get("notices") or [])
backend_label = str(payload.get("backend") or search_api)
answer_text = payload.get("answer")
results = payload.get("results", [])
if notices:
for notice in notices:
logger.info("Search notice (%s): %s", backend_label, notice)
logger.info(
"Search backend=%s resolved_backend=%s answer=%s results=%s",
search_api,
backend_label,
bool(answer_text),
len(results),
)
return payload, notices, answer_text, backend_label
def prepare_research_context(
search_result: dict[str, Any] | None,
answer_text: Optional[str],
config: Configuration,
) -> tuple[str, str]:
"""Build structured context and source summary for downstream agents."""
sources_summary = format_sources(search_result)
context = deduplicate_and_format_sources(
search_result or {"results": []},
max_tokens_per_source=MAX_TOKENS_PER_SOURCE,
fetch_full_page=config.fetch_full_page,
)
if answer_text:
context = f"AI Direct Answer:\n{answer_text}\n\n{context}"
return sources_summary, context