Brave / main.py
Greff3's picture
Update main.py
5a6ba72 verified
raw
history blame
12.6 kB
import asyncio
import base64
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from bs4 import BeautifulSoup
# This import will work correctly after running `pip install --upgrade curl_cffi`
from curl_cffi.aio import AsyncSession
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from webscout.litagent import LitAgent
# --- FastAPI App Definition ---
app = FastAPI(
title="Snapzion Enhanced Search API",
description="An advanced FastAPI wrapper for Bing Search, featuring AI-powered summarization and metadata enrichment.",
version="2.0.1", # Version bump
)
# --- Pydantic Models for Clearer Responses ---
class BaseSearchResult(BaseModel):
url: str
title: str
description: str
class EnhancedBingSearchResult(BaseSearchResult):
"""Model for the enhanced search results with summary and metadata."""
summary: Optional[str] = Field(None, description="AI-generated summary of the page content.")
source: Optional[str] = Field(None, description="The domain name of the result URL.")
favicon: Optional[str] = Field(None, description="URL of the website's favicon.")
class BingImageResult(BaseModel):
title: str
image: str
thumbnail: str
url: str
source: str
class BingNewsResult(BaseModel):
title: str
url: str
description: str
source: str = ""
# --- Enhanced BingSearch Library ---
class BingSearch:
"""
Bing search implementation rewritten for asynchronous performance and enhanced data retrieval.
"""
_lit_agent_instance: Optional[LitAgent] = None
_executor = ThreadPoolExecutor(max_workers=10)
def __init__(
self,
timeout: int = 10,
proxies: Optional[Dict[str, str]] = None,
verify: bool = True,
lang: str = "en-US",
sleep_interval: float = 0.0,
impersonate: str = "chrome110"
):
self.timeout = timeout
self.proxies = proxies if proxies else {}
self.verify = verify
self.lang = lang
self.sleep_interval = sleep_interval
self._base_url = "https://www.bing.com"
self.session = AsyncSession(
proxies=self.proxies,
verify=self.verify,
timeout=self.timeout,
impersonate=impersonate
)
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
})
@classmethod
def get_lit_agent(cls) -> LitAgent:
"""Initializes LitAgent lazily."""
if cls._lit_agent_instance is None:
cls._lit_agent_instance = LitAgent()
return cls._lit_agent_instance
async def _summarize_content(self, html_content: str) -> str:
"""Runs the synchronous summarize method in a thread pool."""
loop = asyncio.get_running_loop()
agent = self.get_lit_agent()
try:
summary = await loop.run_in_executor(
self._executor, agent.summarize, html_content
)
return summary
except Exception as e:
print(f"Error during summarization: {e}")
return "Could not generate summary."
async def _enhance_result(self, result: BaseSearchResult) -> EnhancedBingSearchResult:
"""Fetches page content, generates summary, and extracts metadata."""
enhanced_result = EnhancedBingSearchResult(**result.model_dump())
try:
parsed_url = urlparse(result.url)
enhanced_result.source = parsed_url.netloc
resp = await self.session.get(result.url, timeout=self.timeout)
resp.raise_for_status()
html = resp.text
summary = await self._summarize_content(html)
enhanced_result.summary = summary
soup = BeautifulSoup(html, "html.parser")
favicon_tag = soup.find("link", rel=lambda r: r and "icon" in r.lower())
if favicon_tag and favicon_tag.get("href"):
favicon_url = favicon_tag["href"]
if not favicon_url.startswith(('http://', 'https://', '//')):
favicon_url = f"{parsed_url.scheme}://{parsed_url.netloc}{'/' if not favicon_url.startswith('/') else ''}{favicon_url}"
elif favicon_url.startswith('//'):
favicon_url = f"{parsed_url.scheme}:{favicon_url}"
enhanced_result.favicon = favicon_url
except Exception as e:
print(f"Failed to enhance URL {result.url}: {e}")
return enhanced_result
def _selectors(self, element):
selectors = {
'links': 'ol#b_results > li.b_algo',
'next': 'a.sb_pagN'
}
return selectors.get(element, '')
def _first_page(self, query):
url = f'{self._base_url}/search?q={query}&search=&form=QBLH'
return {'url': url, 'data': None}
def _next_page(self, soup):
selector = self._selectors('next')
next_page_tag = soup.select_one(selector)
if next_page_tag and next_page_tag.get('href'):
return {'url': self._base_url + next_page_tag['href'], 'data': None}
return {'url': None, 'data': None}
def _get_url(self, tag):
# A more direct approach that is often sufficient
return tag.get('href', '')
async def text(
self, keywords: str, max_results: int = 10, enhanced: bool = False, **kwargs
) -> List[BaseSearchResult | EnhancedBingSearchResult]:
if not keywords:
raise ValueError("Search keywords cannot be empty")
fetched_results = []
fetched_links = set()
async def fetch_page(url):
try:
resp = await self.session.get(url)
resp.raise_for_status()
return resp.text
except Exception as e:
raise Exception(f"Bing search failed: {str(e)}")
current_url = self._first_page(keywords)['url']
while current_url and len(fetched_results) < max_results:
html = await fetch_page(current_url)
soup = BeautifulSoup(html, "html.parser")
result_blocks = soup.select(self._selectors('links'))
for result in result_blocks:
title_tag = result.find('h2')
if not title_tag: continue
link_tag = title_tag.find('a')
if not link_tag or not link_tag.has_attr('href'): continue
url_val = self._get_url(link_tag)
title = title_tag.get_text(strip=True)
desc_container = result.find('div', class_='b_caption')
description = ""
if desc_container:
p_tag = desc_container.find('p')
if p_tag:
description = p_tag.get_text(strip=True)
if url_val and title:
if url_val in fetched_links: continue
fetched_results.append(BaseSearchResult(url=url_val, title=title, description=description))
fetched_links.add(url_val)
if len(fetched_results) >= max_results: break
if len(fetched_results) >= max_results: break
next_page_info = self._next_page(soup)
current_url = next_page_info['url']
if current_url:
await asyncio.sleep(self.sleep_interval)
results_to_return = fetched_results[:max_results]
if enhanced and results_to_return:
enhancement_tasks = [self._enhance_result(res) for res in results_to_return]
return await asyncio.gather(*enhancement_tasks)
return results_to_return
async def suggestions(self, query: str, **kwargs) -> List[str]:
if not query: raise ValueError("Query cannot be empty")
region = kwargs.get('region', 'en-US')
url = f"https://api.bing.com/osjson.aspx?query={query}&mkt={region}"
resp = await self.session.get(url)
resp.raise_for_status()
data = resp.json()
return data[1] if isinstance(data, list) and len(data) > 1 else []
async def images(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingImageResult]:
if not keywords: raise ValueError("Keywords cannot be empty")
url = f"{self._base_url}/images/search?q={keywords}&count={max_results}"
resp = await self.session.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for item in soup.select("a.iusc"):
try:
m_data = item.get("m")
if not m_data: continue
meta = json.loads(m_data)
if meta.get("murl"):
results.append(BingImageResult(title=meta.get("t", ""), image=meta.get("murl"), thumbnail=meta.get("turl", ""), url=meta.get("purl", ""), source=meta.get("surl", "")))
if len(results) >= max_results: break
except Exception: continue
return results
async def news(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingNewsResult]:
if not keywords: raise ValueError("Keywords cannot be empty")
url = f"{self._base_url}/news/search?q={keywords}"
resp = await self.session.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for item in soup.select("div.news-card"):
a_tag = item.find("a", class_="title")
if not (a_tag and a_tag.has_attr('href')): continue
desc_tag = item.find("div", class_="snippet")
source_tag = item.find(attrs={"aria-label": "Publisher"})
results.append(BingNewsResult(title=a_tag.get_text(strip=True), url=a_tag['href'], description=desc_tag.get_text(strip=True) if desc_tag else "", source=source_tag.get_text(strip=True) if source_tag else ""))
if len(results) >= max_results: break
return results
bing = BingSearch()
# --- FastAPI Endpoints ---
@app.get("/search", response_model=List[EnhancedBingSearchResult | BaseSearchResult], summary="Perform a standard or enhanced text search")
async def text_search(
query: str = Query(..., description="The search keywords."),
max_results: int = Query(10, description="Maximum number of results to return."),
enhanced: bool = Query(False, description="Enable AI summarization and metadata fetching (slower but more detailed).")
):
"""
Perform a text search on Bing.
- Set `enhanced=true` to get AI-powered summaries and additional metadata for each result.
"""
try:
results = await bing.text(
keywords=query,
max_results=max_results,
enhanced=enhanced
)
return results
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/suggestions", response_model=List[str], summary="Fetch search suggestions")
async def get_suggestions(
query: str = Query(..., description="The search query for which to fetch suggestions."),
):
try:
return await bing.suggestions(query=query)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/images", response_model=List[BingImageResult], summary="Search for images")
async def image_search(
query: str = Query(..., description="The search keywords for images."),
max_results: int = Query(10, description="Maximum number of image results to return."),
):
try:
return await bing.images(keywords=query, max_results=max_results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/news", response_model=List[BingNewsResult], summary="Search for news articles")
async def news_search(
query: str = Query(..., description="The search keywords for news."),
max_results: int = Query(10, description="Maximum number of news results to return."),
):
try:
return await bing.news(keywords=query, max_results=max_results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)