Spaces:
Configuration error
Configuration error
| import logging | |
| import os | |
| import asyncio | |
| from langchain_core.tools import StructuredTool | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, List | |
| from duckduckgo_search import DDGS | |
| from serpapi import GoogleSearch | |
| logger = logging.getLogger(__name__) | |
| class DuckDuckGoSearchInput(BaseModel): | |
| query: str = Field(description="Search query") | |
| original_query: str = Field(description="Original query for context") | |
| embedder: Optional[object] = Field(description="SentenceTransformer embedder", default=None) | |
| async def duckduckgo_search_func(query: str, original_query: str, embedder: Optional[object] = None) -> List[str]: | |
| """ | |
| Perform a DuckDuckGo search with retries and fall back to SerpAPI if needed. | |
| Args: | |
| query (str): Search query. | |
| original_query (str): Original query for context. | |
| embedder (Optional[object]): SentenceTransformer for result filtering. | |
| Returns: | |
| List[str]: List of search result snippets. | |
| """ | |
| async def try_duckduckgo(query: str, max_retries: int = 3) -> List[str]: | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"DuckDuckGo search attempt {attempt + 1} for query: {query}") | |
| with DDGS() as ddgs: | |
| results = [r['body'] for r in ddgs.text(query, max_results=5)] | |
| return results | |
| except Exception as e: | |
| if "Ratelimit" in str(e) and attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s | |
| logger.warning(f"DuckDuckGo rate limit hit, retrying in {wait_time}s: {e}") | |
| await asyncio.sleep(wait_time) | |
| else: | |
| logger.error(f"DuckDuckGo search failed for query '{query}': {e}") | |
| raise e | |
| return [] | |
| async def try_serpapi(query: str, max_retries: int = 3) -> List[str]: | |
| if not os.getenv("SERPAPI_API_KEY"): | |
| logger.warning("SERPAPI_API_KEY not set, cannot use SerpAPI fallback") | |
| return [] | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"SerpAPI search attempt {attempt + 1} for query: {query}") | |
| params = { | |
| "q": query, | |
| "api_key": os.getenv("SERPAPI_API_KEY"), | |
| "num": 5 | |
| } | |
| search = GoogleSearch(params) | |
| results = search.get_dict().get("organic_results", []) | |
| return [result.get("snippet", "") for result in results if "snippet" in result] | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s | |
| logger.warning(f"SerpAPI search failed, retrying in {wait_time}s: {e}") | |
| await asyncio.sleep(wait_time) | |
| else: | |
| logger.error(f"SerpAPI search failed for query '{query}': {e}") | |
| return [] | |
| try: | |
| # Try DuckDuckGo with retries | |
| logger.info(f"Executing DuckDuckGo search for query: {query}") | |
| results = await try_duckduckgo(query) | |
| # Fall back to SerpAPI if DuckDuckGo fails | |
| if not results: | |
| logger.info(f"DuckDuckGo returned no results, falling back to SerpAPI for query: {query}") | |
| results = await try_serpapi(query) | |
| # Rank results if embedder is provided | |
| if embedder and results: | |
| from sentence_transformers import util | |
| query_embedding = embedder.encode(original_query, convert_to_tensor=True) | |
| result_embeddings = embedder.encode(results, convert_to_tensor=True) | |
| scores = util.cos_sim(query_embedding, result_embeddings)[0] | |
| ranked_results = [results[i] for i in scores.argsort(descending=True)] | |
| return ranked_results[:3] | |
| return results[:3] if results else [] | |
| except Exception as e: | |
| logger.error(f"Search failed for query '{query}': {e}") | |
| return [] | |
| duckduckgo_search_tool = StructuredTool.from_function( | |
| func=duckduckgo_search_func, | |
| name="duckduckgo_search_tool", | |
| args_schema=DuckDuckGoSearchInput, | |
| coroutine=duckduckgo_search_func | |
| ) |