|
|
import time |
|
|
import json |
|
|
import base64 |
|
|
from typing import List, Optional, Dict, Any |
|
|
from urllib.parse import urlencode, urlparse, parse_qs |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
import uvicorn |
|
|
from fastapi import FastAPI, HTTPException, Query, Request, Response |
|
|
from pydantic import BaseModel, Field |
|
|
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint |
|
|
from curl_cffi.requests import AsyncSession |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BingSearchResult(BaseModel): |
|
|
url: str = Field(..., description="The URL of the search result.") |
|
|
title: str = Field(..., description="The title of the search result.") |
|
|
description: str = Field(..., description="A brief description of the search result.") |
|
|
metadata: Dict[str, Any] = Field({}, description="Additional metadata for the result.") |
|
|
|
|
|
class BingImageResult(BaseModel): |
|
|
title: str = Field(..., description="The title of the image.") |
|
|
image_url: str = Field(..., description="The direct URL to the full-size image.") |
|
|
thumbnail_url: str = Field(..., description="The URL to the thumbnail of the image.") |
|
|
page_url: str = Field(..., description="The URL of the page where the image was found.") |
|
|
source: str = Field(..., description="The source or domain of the image.") |
|
|
|
|
|
class BingNewsResult(BaseModel): |
|
|
title: str = Field(..., description="The title of the news article.") |
|
|
url: str = Field(..., description="The URL to the news article.") |
|
|
description: str = Field(..., description="A snippet from the news article.") |
|
|
source: str = Field("", description="The source of the news article.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CustomHeaderMiddleware(BaseHTTPMiddleware): |
|
|
async def dispatch( |
|
|
self, request: Request, call_next: RequestResponseEndpoint |
|
|
) -> Response: |
|
|
start_time = time.time() |
|
|
response = await call_next(request) |
|
|
process_time = time.time() - start_time |
|
|
response.headers["X-Response-Time"] = f"{process_time:.4f}s" |
|
|
response.headers["X-Powered-By"] = "NiansuhAI" |
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BingSearch: |
|
|
"""Asynchronous Bing search implementation with configurable parameters and advanced features.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
timeout: int = 10, |
|
|
proxies: Optional[Dict[str, str]] = None, |
|
|
verify: bool = True, |
|
|
lang: str = "en-US", |
|
|
impersonate: str = "chrome110" |
|
|
): |
|
|
self.timeout = timeout |
|
|
self.proxies = proxies if proxies else {} |
|
|
self.verify = verify |
|
|
self.lang = lang |
|
|
self._base_url = "https://www.bing.com" |
|
|
self.session = AsyncSession( |
|
|
proxies=self.proxies, |
|
|
verify=self.verify, |
|
|
timeout=self.timeout, |
|
|
impersonate=impersonate |
|
|
) |
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" |
|
|
}) |
|
|
|
|
|
async def _fetch_html(self, url: str) -> str: |
|
|
try: |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
return resp.text |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to fetch Bing search results: {e}") |
|
|
|
|
|
def _get_url(self, tag): |
|
|
url = tag.get('href', '') |
|
|
try: |
|
|
parsed_url = urlparse(url) |
|
|
query_params = parse_qs(parsed_url.query) |
|
|
if "u" in query_params: |
|
|
encoded_url = query_params["u"][0][2:] |
|
|
decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===') |
|
|
return decoded_bytes.decode('utf-8') |
|
|
except Exception: |
|
|
return url |
|
|
return url |
|
|
|
|
|
async def text( |
|
|
self, |
|
|
keywords: str, |
|
|
region: Optional[str] = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10, |
|
|
) -> List[BingSearchResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty.") |
|
|
|
|
|
fetched_results = [] |
|
|
fetched_links = set() |
|
|
|
|
|
url = f'{self._base_url}/search?q={keywords}&form=QBLH' |
|
|
if region: |
|
|
url += f"&setmkt={region}" |
|
|
|
|
|
while url and len(fetched_results) < max_results: |
|
|
html = await self._fetch_html(url) |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
for result in soup.select('ol#b_results > li.b_algo'): |
|
|
title_tag = result.find('h2') |
|
|
if not title_tag: |
|
|
continue |
|
|
|
|
|
link_tag = title_tag.find('a') |
|
|
if not link_tag or not link_tag.has_attr('href'): |
|
|
continue |
|
|
|
|
|
url_val = self._get_url(link_tag) |
|
|
title = title_tag.get_text(strip=True) |
|
|
|
|
|
desc_container = result.find('div', class_='b_caption') |
|
|
description = desc_container.get_text(strip=True) if desc_container else '' |
|
|
|
|
|
if url_val and title and url_val not in fetched_links: |
|
|
fetched_results.append(BingSearchResult(url=url_val, title=title, description=description)) |
|
|
fetched_links.add(url_val) |
|
|
if len(fetched_results) >= max_results: |
|
|
break |
|
|
|
|
|
if len(fetched_results) >= max_results: |
|
|
break |
|
|
|
|
|
next_page_tag = soup.select_one('a.sb_pagN') |
|
|
url = self._base_url + next_page_tag['href'] if next_page_tag and next_page_tag.get('href') else None |
|
|
|
|
|
return fetched_results[:max_results] |
|
|
|
|
|
async def suggestions(self, query: str, region: Optional[str] = None) -> List[str]: |
|
|
if not query: |
|
|
raise ValueError("Search query cannot be empty.") |
|
|
params = {"query": query, "mkt": region if region else "en-US"} |
|
|
url = f"https://api.bing.com/osjson.aspx?{urlencode(params)}" |
|
|
try: |
|
|
resp = await self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
data = resp.json() |
|
|
return data[1] if isinstance(data, list) and len(data) > 1 else [] |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to fetch suggestions: {e}") |
|
|
|
|
|
async def images( |
|
|
self, |
|
|
keywords: str, |
|
|
region: Optional[str] = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10 |
|
|
) -> List[BingImageResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty.") |
|
|
|
|
|
safe_map = {"on": "Strict", "moderate": "Moderate", "off": "Off"} |
|
|
params = { |
|
|
"q": keywords, "count": max_results, "setlang": self.lang, |
|
|
"safeSearch": safe_map.get(safesearch.lower(), "Moderate"), |
|
|
} |
|
|
if region: |
|
|
params["mkt"] = region |
|
|
|
|
|
url = f"{self._base_url}/images/search?{urlencode(params)}" |
|
|
html = await self._fetch_html(url) |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
results = [] |
|
|
|
|
|
for item in soup.select("a.iusc"): |
|
|
try: |
|
|
meta = json.loads(item.get("m", '{}')) |
|
|
if meta.get("murl"): |
|
|
results.append( |
|
|
BingImageResult( |
|
|
title=meta.get("t", ""), |
|
|
image_url=meta.get("murl", ""), |
|
|
thumbnail_url=meta.get("turl", ""), |
|
|
page_url=meta.get("purl", ""), |
|
|
source=meta.get("surl", "") |
|
|
) |
|
|
) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
except (json.JSONDecodeError, KeyError): |
|
|
continue |
|
|
return results[:max_results] |
|
|
|
|
|
async def news( |
|
|
self, |
|
|
keywords: str, |
|
|
region: Optional[str] = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10, |
|
|
) -> List[BingNewsResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty.") |
|
|
|
|
|
safe_map = {"on": "Strict", "moderate": "Moderate", "off": "Off"} |
|
|
params = { |
|
|
"q": keywords, "form": "QBNH", |
|
|
"safeSearch": safe_map.get(safesearch.lower(), "Moderate"), |
|
|
} |
|
|
if region: |
|
|
params["mkt"] = region |
|
|
|
|
|
url = f"{self._base_url}/news/search?{urlencode(params)}" |
|
|
html = await self._fetch_html(url) |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
results = [] |
|
|
|
|
|
for item in soup.select("div.news-card"): |
|
|
a_tag = item.find("a", class_="title") |
|
|
if not a_tag: |
|
|
continue |
|
|
|
|
|
results.append( |
|
|
BingNewsResult( |
|
|
title=a_tag.get_text(strip=True), |
|
|
url=a_tag.get('href', ''), |
|
|
description=item.find("div", class_="snippet").get_text(strip=True) if item.find("div", class_="snippet") else "", |
|
|
source=item.find("div", class_="source").get_text(strip=True).split('·')[0].strip() if item.find("div", class_="source") else "", |
|
|
) |
|
|
) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
return results[:max_results] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Bing Search API", |
|
|
description="A FastAPI wrapper for the BingSearch library with advanced features, powered by NiansuhAI.", |
|
|
version="2.0.0", |
|
|
) |
|
|
|
|
|
app.add_middleware(CustomHeaderMiddleware) |
|
|
bing_search_service = BingSearch() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/search", response_model=List[BingSearchResult], summary="Perform a text search") |
|
|
async def text_search( |
|
|
query: str = Query(..., description="The search keywords."), |
|
|
region: Optional[str] = Query(None, description="The region for the search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of results to return."), |
|
|
): |
|
|
try: |
|
|
return await bing_search_service.text( |
|
|
keywords=query, region=region, safesearch=safesearch, max_results=max_results |
|
|
) |
|
|
except ValueError as e: |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
@app.get("/suggestions", response_model=List[str], summary="Get search suggestions") |
|
|
async def get_suggestions( |
|
|
query: str = Query(..., description="The search query for which to fetch suggestions."), |
|
|
region: Optional[str] = Query(None, description="The region for the suggestions (e.g., 'en-US')."), |
|
|
): |
|
|
try: |
|
|
return await bing_search_service.suggestions(query=query, region=region) |
|
|
except ValueError as e: |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
@app.get("/images", response_model=List[BingImageResult], summary="Perform an image search") |
|
|
async def image_search( |
|
|
query: str = Query(..., description="The search keywords for images."), |
|
|
region: Optional[str] = Query(None, description="The region for the image search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of image results to return."), |
|
|
): |
|
|
try: |
|
|
return await bing_search_service.images( |
|
|
keywords=query, region=region, safesearch=safesearch, max_results=max_results |
|
|
) |
|
|
except ValueError as e: |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
@app.get("/news", response_model=List[BingNewsResult], summary="Perform a news search") |
|
|
async def news_search( |
|
|
query: str = Query(..., description="The search keywords for news."), |
|
|
region: Optional[str] = Query(None, description="The region for the news search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of news results to return."), |
|
|
): |
|
|
try: |
|
|
return await bing_search_service.news( |
|
|
keywords=query, region=region, safesearch=safesearch, max_results=max_results |
|
|
) |
|
|
except ValueError as e: |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |