Spaces:
Sleeping
Sleeping
File size: 6,110 Bytes
9d1ab9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | """
Search Engine
-------------
Handles web search and content extraction for Deep Research.
Uses 'duckduckgo-search' for privacy-friendly, reverse-engineered search.
Uses 'requests' + 'beautifulsoup4' for scraping.
"""
import logging
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger("kai_api.search")
class SearchEngine:
def __init__(self):
self.ddgs = DDGS()
# Use a more realistic browser header set
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://www.google.com/"
}
def simple_search(self, query: str, max_results: int = 10) -> list[dict]:
"""
Perform a simple text search using DuckDuckGo.
Tries multiple backends (api, html, lite) for robustness.
Returns: [{'title': str, 'href': str, 'body': str}, ...]
"""
backends = ["api", "html", "lite"]
for backend in backends:
try:
logger.info(f"Searching '{query}' using backend='{backend}'...")
results = list(self.ddgs.text(query, max_results=max_results, backend=backend))
if results:
logger.info(f"Found {len(results)} results via '{backend}'")
return results
except Exception as e:
logger.warning(f"Search backend '{backend}' failed: {e}")
logger.error(f"All 'ddgs' library backends failed. Attempting manual scraper fallback...")
return self._manual_search_ddg_html(query, max_results)
def _manual_search_ddg_html(self, query: str, max_results: int) -> list[dict]:
"""
Fallback: Manually scrape html.duckduckgo.com if the library fails.
"""
try:
url = "https://html.duckduckgo.com/html/"
data = {"q": query}
# Use existing headers
resp = requests.post(url, data=data, headers=self.headers, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, 'html.parser')
results = []
# DDG HTML Structure: .web-result
for result in soup.select(".web-result"):
if len(results) >= max_results:
break
title_tag = result.select_one(".result__a")
if not title_tag:
continue
title = title_tag.get_text(strip=True)
href = title_tag.get('href')
snippet_tag = result.select_one(".result__snippet")
snippet = snippet_tag.get_text(strip=True) if snippet_tag else ""
if href and title:
results.append({
"title": title,
"href": href,
"body": snippet
})
logger.info(f"Manual fallback found {len(results)} results.")
return results
except Exception as e:
logger.error(f"Manual scraper failed: {e}")
return []
def fetch_page_content(self, url: str) -> str:
"""
Fetch and parse a webpage. Returns properly formatted text.
"""
try:
# 5-second timeout is aggressive but necessary for responsiveness
resp = requests.get(url, headers=self.headers, timeout=5)
resp.raise_for_status()
# Use lxml for speed if available, else html.parser
soup = BeautifulSoup(resp.content, 'lxml')
# Kill distracting elements
for tag in soup(["script", "style", "nav", "footer", "header", "form", "iframe", "svg"]):
tag.decompose()
# Extract text
text = soup.get_text(separator=' ')
# collapse whitespace
tokens = text.split()
clean_text = ' '.join(tokens)
# Return modest amount
return clean_text[:5000]
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e} (Status: {getattr(e.response, 'status_code', 'N/A') if hasattr(e, 'response') else 'N/A'})")
return ""
def deep_research_gather(self, query: str, breadth: int = 5) -> str:
"""
Search for a query, then fetch the content of the top N results.
Returns a massive context string.
"""
logger.info(f"Deep Research Gathering for: {query}")
# 1. Search
results = self.simple_search(query, max_results=breadth)
if not results:
return ""
# 2. Parallel Fetch
context = []
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_result = {executor.submit(self.fetch_page_content, r['href']): r for r in results}
for future in future_to_result:
r = future_to_result[future]
try:
content = future.result()
if content and len(content) > 100:
context.append(f"=== SOURCE: {r['title']} ({r['href']}) ===\n{content}\n")
else:
# Fallback to snippet
context.append(f"=== SOURCE (Snippet Only): {r['title']} ({r['href']}) ===\n{r.get('body', '')}\n")
except Exception:
# Fallback to snippet on crash
context.append(f"=== SOURCE (Snippet Only): {r['title']} ({r['href']}) ===\n{r.get('body', '')}\n")
return "\n".join(context)
|