search-web-MCP-server / src /searcher /open_google_search.py
Xenobd's picture
Update src/searcher/open_google_search.py
3ae9631 verified
from typing import List, Optional
# Import with try-except for backward compatibility
try:
from ddgs import DDGS # New package name
except ImportError:
from duckduckgo_search import DDGS # Old package name for compatibility
from src.core.interface.searcher_interface import SearchInterface
from src.models.search_models import SearchItemResult, SearchResult
class DuckDuckGoSearch(SearchInterface):
def __init__(self):
pass
def search_custom_sites(
self, query: str, sites: Optional[list] = None
) -> SearchResult:
"""
Performs a DuckDuckGo search restricted to a dynamic list of specific sites.
Uses the 'site:' operator supported by DuckDuckGo.
Args:
query (str): The user's search query (e.g., "generative AI").
sites (list): A list of websites to search within (e.g., ['wired.com', 'theverge.com']).
Returns:
SearchResult: The search results.
"""
try:
# 1. Construct the dynamic query string
site_restriction = (
" OR ".join([f"site:{site}" for site in sites]) if sites else ""
)
full_query = f"{query} {site_restriction}".strip()
# 2. Execute the search - UPDATED: using 'query' parameter instead of 'keywords'
results = DDGS().text(query=full_query, max_results=2)
# 3. Map results to your application's data model
items = [
SearchItemResult(
url=item.get("href"),
title=item.get("title"),
description=item.get("body")
)
for item in results
]
return SearchResult(items=items)
except Exception as e:
raise Exception(f"An error occurred while searching with DuckDuckGo: {str(e)}")
def search_custom_domains(
self, query: str, domains: Optional[List[str]] = None
) -> SearchResult:
"""
Performs a DuckDuckGo search restricted to custom top-level domains (e.g., .edu, .gov).
Args:
query (str): The user's search query (e.g., "generative AI").
domains (List[str]): A list of domains to search within (e.g., ['.edu', '.gov']).
Returns:
SearchResult: The search results.
"""
try:
# 1. Construct query
domain_restriction = (
" OR ".join([f"site:{domain}" for domain in domains]) if domains else ""
)
full_query = f"{query} {domain_restriction}".strip()
# 2. Execute the search - UPDATED: using 'query' parameter instead of 'keywords'
results = DDGS().text(query=full_query, max_results=3)
# 3. Map results and return
items = [
SearchItemResult(
url=item.get("href"),
title=item.get("title"),
description=item.get("body")
)
for item in results
]
return SearchResult(items=items[:3000])
except Exception as e:
raise Exception(f"An error occurred while searching with DuckDuckGo: {str(e)}")