|
|
import asyncio |
|
|
import base64 |
|
|
import json |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import Any, Dict, List, Optional |
|
|
from urllib.parse import parse_qs, urlparse |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
from curl_cffi.aio import AsyncSession |
|
|
from fastapi import FastAPI, HTTPException, Query |
|
|
from pydantic import BaseModel, Field |
|
|
from webscout.litagent import LitAgent |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Snapzion Enhanced Search API", |
|
|
description="An advanced FastAPI wrapper for Bing Search, featuring AI-powered summarization and metadata enrichment.", |
|
|
version="2.0.1", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class BaseSearchResult(BaseModel): |
|
|
url: str |
|
|
title: str |
|
|
description: str |
|
|
|
|
|
class EnhancedBingSearchResult(BaseSearchResult): |
|
|
"""Model for the enhanced search results with summary and metadata.""" |
|
|
summary: Optional[str] = Field(None, description="AI-generated summary of the page content.") |
|
|
source: Optional[str] = Field(None, description="The domain name of the result URL.") |
|
|
favicon: Optional[str] = Field(None, description="URL of the website's favicon.") |
|
|
|
|
|
class BingImageResult(BaseModel): |
|
|
title: str |
|
|
image: str |
|
|
thumbnail: str |
|
|
url: str |
|
|
source: str |
|
|
|
|
|
class BingNewsResult(BaseModel): |
|
|
title: str |
|
|
url: str |
|
|
description: str |
|
|
source: str = "" |
|
|
|
|
|
|
|
|
|
|
|
class BingSearch: |
|
|
""" |
|
|
Bing search implementation rewritten for asynchronous performance and enhanced data retrieval. |
|
|
""" |
|
|
_lit_agent_instance: Optional[LitAgent] = None |
|
|
_executor = ThreadPoolExecutor(max_workers=10) |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
timeout: int = 10, |
|
|
proxies: Optional[Dict[str, str]] = None, |
|
|
verify: bool = True, |
|
|
lang: str = "en-US", |
|
|
sleep_interval: float = 0.0, |
|
|
impersonate: str = "chrome110" |
|
|
): |
|
|
self.timeout = timeout |
|
|
self.proxies = proxies if proxies else {} |
|
|
self.verify = verify |
|
|
self.lang = lang |
|
|
self.sleep_interval = sleep_interval |
|
|
self._base_url = "https://www.bing.com" |
|
|
self.session = AsyncSession( |
|
|
proxies=self.proxies, |
|
|
verify=self.verify, |
|
|
timeout=self.timeout, |
|
|
impersonate=impersonate |
|
|
) |
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
|
}) |
|
|
|
|
|
@classmethod |
|
|
def get_lit_agent(cls) -> LitAgent: |
|
|
"""Initializes LitAgent lazily.""" |
|
|
if cls._lit_agent_instance is None: |
|
|
cls._lit_agent_instance = LitAgent() |
|
|
return cls._lit_agent_instance |
|
|
|
|
|
async def _summarize_content(self, html_content: str) -> str: |
|
|
"""Runs the synchronous summarize method in a thread pool.""" |
|
|
loop = asyncio.get_running_loop() |
|
|
agent = self.get_lit_agent() |
|
|
try: |
|
|
summary = await loop.run_in_executor( |
|
|
self._executor, agent.summarize, html_content |
|
|
) |
|
|
return summary |
|
|
except Exception as e: |
|
|
print(f"Error during summarization: {e}") |
|
|
return "Could not generate summary." |
|
|
|
|
|
|
|
|
async def _enhance_result(self, result: BaseSearchResult) -> EnhancedBingSearchResult: |
|
|
"""Fetches page content, generates summary, and extracts metadata.""" |
|
|
enhanced_result = EnhancedBingSearchResult(**result.model_dump()) |
|
|
try: |
|
|
parsed_url = urlparse(result.url) |
|
|
enhanced_result.source = parsed_url.netloc |
|
|
|
|
|
resp = await self.session.get(result.url, timeout=self.timeout) |
|
|
resp.raise_for_status() |
|
|
html = resp.text |
|
|
|
|
|
summary = await self._summarize_content(html) |
|
|
enhanced_result.summary = summary |
|
|
|
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
favicon_tag = soup.find("link", rel=lambda r: r and "icon" in r.lower()) |
|
|
if favicon_tag and favicon_tag.get("href"): |
|
|
favicon_url = favicon_tag["href"] |
|
|
if not favicon_url.startswith(('http://', 'https://', '//')): |
|
|
favicon_url = f"{parsed_url.scheme}://{parsed_url.netloc}{'/' if not favicon_url.startswith('/') else ''}{favicon_url}" |
|
|
elif favicon_url.startswith('//'): |
|
|
favicon_url = f"{parsed_url.scheme}:{favicon_url}" |
|
|
enhanced_result.favicon = favicon_url |
|
|
except Exception as e: |
|
|
print(f"Failed to enhance URL {result.url}: {e}") |
|
|
return enhanced_result |
|
|
|
|
|
def _selectors(self, element): |
|
|
selectors = { |
|
|
'links': 'ol#b_results > li.b_algo', |
|
|
'next': 'a.sb_pagN' |
|
|
} |
|
|
return selectors.get(element, '') |
|
|
|
|
|
def _first_page(self, query): |
|
|
url = f'{self._base_url}/search?q={query}&search=&form=QBLH' |
|
|
return {'url': url, 'data': None} |
|
|
|
|
|
def _next_page(self, soup): |
|
|
selector = self._selectors('next') |
|
|
next_page_tag = soup.select_one(selector) |
|
|
if next_page_tag and next_page_tag.get('href'): |
|
|
return {'url': self._base_url + next_page_tag['href'], 'data': None} |
|
|
return {'url': None, 'data': None} |
|
|
|
|
|
def _get_url(self, tag): |
|
|
|
|
|
return tag.get('href', '') |
|
|
|
|
|
async def text( |
|
|
self, keywords: str, max_results: int = 10, enhanced: bool = False, **kwargs |
|
|
) -> List[BaseSearchResult | EnhancedBingSearchResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty") |
|
|
|
|
|
fetched_results = [] |
|
|
fetched_links = set() |
|
|
|
|
|
async def fetch_page(url): |
|
|
try: |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
return resp.text |
|
|
except Exception as e: |
|
|
raise Exception(f"Bing search failed: {str(e)}") |
|
|
|
|
|
current_url = self._first_page(keywords)['url'] |
|
|
|
|
|
while current_url and len(fetched_results) < max_results: |
|
|
html = await fetch_page(current_url) |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
result_blocks = soup.select(self._selectors('links')) |
|
|
|
|
|
for result in result_blocks: |
|
|
title_tag = result.find('h2') |
|
|
if not title_tag: continue |
|
|
|
|
|
link_tag = title_tag.find('a') |
|
|
if not link_tag or not link_tag.has_attr('href'): continue |
|
|
|
|
|
url_val = self._get_url(link_tag) |
|
|
title = title_tag.get_text(strip=True) |
|
|
|
|
|
desc_container = result.find('div', class_='b_caption') |
|
|
description = "" |
|
|
if desc_container: |
|
|
p_tag = desc_container.find('p') |
|
|
if p_tag: |
|
|
description = p_tag.get_text(strip=True) |
|
|
|
|
|
if url_val and title: |
|
|
if url_val in fetched_links: continue |
|
|
fetched_results.append(BaseSearchResult(url=url_val, title=title, description=description)) |
|
|
fetched_links.add(url_val) |
|
|
if len(fetched_results) >= max_results: break |
|
|
|
|
|
if len(fetched_results) >= max_results: break |
|
|
|
|
|
next_page_info = self._next_page(soup) |
|
|
current_url = next_page_info['url'] |
|
|
if current_url: |
|
|
await asyncio.sleep(self.sleep_interval) |
|
|
|
|
|
results_to_return = fetched_results[:max_results] |
|
|
|
|
|
if enhanced and results_to_return: |
|
|
enhancement_tasks = [self._enhance_result(res) for res in results_to_return] |
|
|
return await asyncio.gather(*enhancement_tasks) |
|
|
|
|
|
return results_to_return |
|
|
|
|
|
async def suggestions(self, query: str, **kwargs) -> List[str]: |
|
|
if not query: raise ValueError("Query cannot be empty") |
|
|
region = kwargs.get('region', 'en-US') |
|
|
url = f"https://api.bing.com/osjson.aspx?query={query}&mkt={region}" |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
data = resp.json() |
|
|
return data[1] if isinstance(data, list) and len(data) > 1 else [] |
|
|
|
|
|
async def images(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingImageResult]: |
|
|
if not keywords: raise ValueError("Keywords cannot be empty") |
|
|
url = f"{self._base_url}/images/search?q={keywords}&count={max_results}" |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
results = [] |
|
|
for item in soup.select("a.iusc"): |
|
|
try: |
|
|
m_data = item.get("m") |
|
|
if not m_data: continue |
|
|
meta = json.loads(m_data) |
|
|
if meta.get("murl"): |
|
|
results.append(BingImageResult(title=meta.get("t", ""), image=meta.get("murl"), thumbnail=meta.get("turl", ""), url=meta.get("purl", ""), source=meta.get("surl", ""))) |
|
|
if len(results) >= max_results: break |
|
|
except Exception: continue |
|
|
return results |
|
|
|
|
|
async def news(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingNewsResult]: |
|
|
if not keywords: raise ValueError("Keywords cannot be empty") |
|
|
url = f"{self._base_url}/news/search?q={keywords}" |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
results = [] |
|
|
for item in soup.select("div.news-card"): |
|
|
a_tag = item.find("a", class_="title") |
|
|
if not (a_tag and a_tag.has_attr('href')): continue |
|
|
desc_tag = item.find("div", class_="snippet") |
|
|
source_tag = item.find(attrs={"aria-label": "Publisher"}) |
|
|
results.append(BingNewsResult(title=a_tag.get_text(strip=True), url=a_tag['href'], description=desc_tag.get_text(strip=True) if desc_tag else "", source=source_tag.get_text(strip=True) if source_tag else "")) |
|
|
if len(results) >= max_results: break |
|
|
return results |
|
|
|
|
|
bing = BingSearch() |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/search", response_model=List[EnhancedBingSearchResult | BaseSearchResult], summary="Perform a standard or enhanced text search") |
|
|
async def text_search( |
|
|
query: str = Query(..., description="The search keywords."), |
|
|
max_results: int = Query(10, description="Maximum number of results to return."), |
|
|
enhanced: bool = Query(False, description="Enable AI summarization and metadata fetching (slower but more detailed).") |
|
|
): |
|
|
""" |
|
|
Perform a text search on Bing. |
|
|
- Set `enhanced=true` to get AI-powered summaries and additional metadata for each result. |
|
|
""" |
|
|
try: |
|
|
results = await bing.text( |
|
|
keywords=query, |
|
|
max_results=max_results, |
|
|
enhanced=enhanced |
|
|
) |
|
|
return results |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/suggestions", response_model=List[str], summary="Fetch search suggestions") |
|
|
async def get_suggestions( |
|
|
query: str = Query(..., description="The search query for which to fetch suggestions."), |
|
|
): |
|
|
try: |
|
|
return await bing.suggestions(query=query) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/images", response_model=List[BingImageResult], summary="Search for images") |
|
|
async def image_search( |
|
|
query: str = Query(..., description="The search keywords for images."), |
|
|
max_results: int = Query(10, description="Maximum number of image results to return."), |
|
|
): |
|
|
try: |
|
|
return await bing.images(keywords=query, max_results=max_results) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/news", response_model=List[BingNewsResult], summary="Search for news articles") |
|
|
async def news_search( |
|
|
query: str = Query(..., description="The search keywords for news."), |
|
|
max_results: int = Query(10, description="Maximum number of news results to return."), |
|
|
): |
|
|
try: |
|
|
return await bing.news(keywords=query, max_results=max_results) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |