|
|
from fastapi import FastAPI, HTTPException, Query |
|
|
from typing import List, Optional |
|
|
from pydantic import BaseModel |
|
|
from time import sleep |
|
|
from curl_cffi.requests import Session |
|
|
from urllib.parse import urlencode, unquote, urlparse, parse_qs |
|
|
import base64 |
|
|
from typing import Dict, Any |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from webscout.litagent import LitAgent |
|
|
from bs4 import BeautifulSoup |
|
|
import json |
|
|
|
|
|
app = FastAPI( |
|
|
title="Snapzion Search API", |
|
|
description="A FastAPI wrapper for the Search library with advanced features.", |
|
|
version="1.0.0", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BingSearchResult(BaseModel): |
|
|
url: str |
|
|
title: str |
|
|
description: str |
|
|
metadata: Dict[str, Any] = {} |
|
|
|
|
|
class BingImageResult(BaseModel): |
|
|
title: str |
|
|
image: str |
|
|
thumbnail: str |
|
|
url: str |
|
|
source: str |
|
|
|
|
|
class BingNewsResult(BaseModel): |
|
|
title: str |
|
|
url: str |
|
|
description: str |
|
|
source: str = "" |
|
|
|
|
|
class BingSearch: |
|
|
"""Bing search implementation with configurable parameters and advanced features.""" |
|
|
_executor: ThreadPoolExecutor = ThreadPoolExecutor() |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
timeout: int = 10, |
|
|
proxies: Optional[Dict[str, str]] = None, |
|
|
verify: bool = True, |
|
|
lang: str = "en-US", |
|
|
sleep_interval: float = 0.0, |
|
|
impersonate: str = "chrome110" |
|
|
): |
|
|
self.timeout = timeout |
|
|
self.proxies = proxies if proxies else {} |
|
|
self.verify = verify |
|
|
self.lang = lang |
|
|
self.sleep_interval = sleep_interval |
|
|
self._base_url = "https://www.bing.com" |
|
|
self.session = Session( |
|
|
proxies=self.proxies, |
|
|
verify=self.verify, |
|
|
timeout=self.timeout, |
|
|
impersonate=impersonate |
|
|
) |
|
|
|
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" |
|
|
}) |
|
|
|
|
|
|
|
|
def _selectors(self, element): |
|
|
selectors = { |
|
|
'links': 'ol#b_results > li', |
|
|
'next': 'a.sb_pagN' |
|
|
} |
|
|
return selectors[element] |
|
|
|
|
|
def _first_page(self, query): |
|
|
url = f'{self._base_url}/search?q={query}&search=&form=QBLH' |
|
|
return {'url': url, 'data': None} |
|
|
|
|
|
def _next_page(self, soup): |
|
|
selector = self._selectors('next') |
|
|
next_page_tag = soup.select_one(selector) |
|
|
url = None |
|
|
if next_page_tag and next_page_tag.get('href'): |
|
|
url = self._base_url + next_page_tag['href'] |
|
|
return {'url': url, 'data': None} |
|
|
|
|
|
def _get_url(self, tag): |
|
|
url = tag.get('href', '') |
|
|
resp = url |
|
|
try: |
|
|
parsed_url = urlparse(url) |
|
|
query_params = parse_qs(parsed_url.query) |
|
|
if "u" in query_params: |
|
|
encoded_url = query_params["u"][0][2:] |
|
|
try: |
|
|
decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===') |
|
|
except base64.binascii.Error as e: |
|
|
print(f"Error decoding Base64 string: {e}") |
|
|
return url |
|
|
resp = decoded_bytes.decode('utf-8') |
|
|
except Exception as e: |
|
|
print(f"Error decoding Base64 string: {e}") |
|
|
return resp |
|
|
|
|
|
|
|
|
def text( |
|
|
self, |
|
|
keywords: str, |
|
|
region: str = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10, |
|
|
unique: bool = True |
|
|
) -> List[BingSearchResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty") |
|
|
|
|
|
fetched_results = [] |
|
|
fetched_links = set() |
|
|
|
|
|
def fetch_page(url): |
|
|
try: |
|
|
resp = self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
return resp.text |
|
|
except Exception as e: |
|
|
raise Exception(f"Bing search failed: {str(e)}") |
|
|
|
|
|
current_url = self._first_page(keywords)['url'] |
|
|
|
|
|
while current_url and len(fetched_results) < max_results: |
|
|
html = fetch_page(current_url) |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
|
|
|
result_blocks = soup.select(self._selectors('links')) |
|
|
|
|
|
for result in result_blocks: |
|
|
|
|
|
title_tag = result.find('h2') |
|
|
if not title_tag: |
|
|
continue |
|
|
|
|
|
link_tag = title_tag.find('a') |
|
|
if not link_tag or not link_tag.has_attr('href'): |
|
|
continue |
|
|
|
|
|
url_val = self._get_url(link_tag) |
|
|
title = title_tag.get_text(strip=True) |
|
|
|
|
|
|
|
|
desc_container = result.find('div', class_='b_caption') |
|
|
description = '' |
|
|
if desc_container: |
|
|
|
|
|
desc_p = desc_container.find('p') |
|
|
if desc_p: |
|
|
description = desc_p.get_text(strip=True) |
|
|
else: |
|
|
description = desc_container.get_text(strip=True) |
|
|
|
|
|
|
|
|
if not description: |
|
|
p_tag = result.find('p') |
|
|
if p_tag: |
|
|
description = p_tag.get_text(strip=True) |
|
|
|
|
|
if url_val and title: |
|
|
if unique and url_val in fetched_links: |
|
|
continue |
|
|
|
|
|
fetched_results.append(BingSearchResult(url=url_val, title=title, description=description)) |
|
|
fetched_links.add(url_val) |
|
|
|
|
|
if len(fetched_results) >= max_results: |
|
|
break |
|
|
|
|
|
if len(fetched_results) >= max_results: |
|
|
break |
|
|
|
|
|
|
|
|
next_page_info = self._next_page(soup) |
|
|
current_url = next_page_info['url'] |
|
|
if current_url: |
|
|
sleep(self.sleep_interval) |
|
|
|
|
|
return fetched_results[:max_results] |
|
|
|
|
|
|
|
|
def suggestions(self, query: str, region: str = None) -> List[str]: |
|
|
if not query: |
|
|
raise ValueError("Search query cannot be empty") |
|
|
params = { |
|
|
"query": query, |
|
|
"mkt": region if region else "en-US" |
|
|
} |
|
|
url = f"https://api.bing.com/osjson.aspx?{urlencode(params)}" |
|
|
try: |
|
|
resp = self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
data = resp.json() |
|
|
if isinstance(data, list) and len(data) > 1 and isinstance(data[1], list): |
|
|
return data[1] |
|
|
return [] |
|
|
except Exception as e: |
|
|
if hasattr(e, 'response') and e.response is not None: |
|
|
raise Exception(f"Bing suggestions failed with status {e.response.status_code}: {str(e)}") |
|
|
else: |
|
|
raise Exception(f"Bing suggestions failed: {str(e)}") |
|
|
|
|
|
def images( |
|
|
self, |
|
|
keywords: str, |
|
|
region: str = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10 |
|
|
) -> List[BingImageResult]: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty") |
|
|
safe_map = { |
|
|
"on": "Strict", |
|
|
"moderate": "Moderate", |
|
|
"off": "Off" |
|
|
} |
|
|
safe = safe_map.get(safesearch.lower(), "Moderate") |
|
|
params = { |
|
|
"q": keywords, |
|
|
"count": max_results, |
|
|
"setlang": self.lang, |
|
|
"safeSearch": safe, |
|
|
} |
|
|
if region: |
|
|
params["mkt"] = region |
|
|
url = f"{self._base_url}/images/search?{urlencode(params)}" |
|
|
try: |
|
|
resp = self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
html = resp.text |
|
|
except Exception as e: |
|
|
if hasattr(e, 'response') and e.response is not None: |
|
|
raise Exception(f"Bing image search failed with status {e.response.status_code}: {str(e)}") |
|
|
else: |
|
|
raise Exception(f"Bing image search failed: {str(e)}") |
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
results = [] |
|
|
for item in soup.select("a.iusc"): |
|
|
try: |
|
|
m = item.get("m") |
|
|
meta = json.loads(m) if m else {} |
|
|
image_url = meta.get("murl", "") |
|
|
thumb_url = meta.get("turl", "") |
|
|
title = meta.get("t", "") |
|
|
page_url = meta.get("purl", "") |
|
|
source = meta.get("surl", "") |
|
|
if image_url: |
|
|
results.append(BingImageResult(title=title, image=image_url, thumbnail=thumb_url, url=page_url, source=source)) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
except Exception: |
|
|
continue |
|
|
return results[:max_results] |
|
|
|
|
|
def news( |
|
|
self, |
|
|
keywords: str, |
|
|
region: str = None, |
|
|
safesearch: str = "moderate", |
|
|
max_results: int = 10, |
|
|
) -> List['BingNewsResult']: |
|
|
if not keywords: |
|
|
raise ValueError("Search keywords cannot be empty") |
|
|
safe_map = { |
|
|
"on": "Strict", |
|
|
"moderate": "Moderate", |
|
|
"off": "Off" |
|
|
} |
|
|
safe = safe_map.get(safesearch.lower(), "Moderate") |
|
|
params = { |
|
|
"q": keywords, |
|
|
"form": "QBNH", |
|
|
"safeSearch": safe, |
|
|
} |
|
|
if region: |
|
|
params["mkt"] = region |
|
|
url = f"{self._base_url}/news/search?{urlencode(params)}" |
|
|
try: |
|
|
resp = self.session.get(url) |
|
|
resp.raise_for_status() |
|
|
except Exception as e: |
|
|
if hasattr(e, 'response') and e.response is not None: |
|
|
raise Exception(f"Bing news search failed with status {e.response.status_code}: {str(e)}") |
|
|
else: |
|
|
raise Exception(f"Bing news search failed: {str(e)}") |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
results = [] |
|
|
for item in soup.select("div.news-card, div.card, div.newsitem, div.card-content, div.t_s_main"): |
|
|
a_tag = item.find("a") |
|
|
title = a_tag.get_text(strip=True) if a_tag else '' |
|
|
url_val = a_tag['href'] if a_tag and a_tag.has_attr('href') else '' |
|
|
desc_tag = item.find("div", class_="snippet") or item.find("div", class_="news-card-snippet") or item.find("div", class_="snippetText") |
|
|
description = desc_tag.get_text(strip=True) if desc_tag else '' |
|
|
source_tag = item.find("div", class_="source") |
|
|
source = source_tag.get_text(strip=True) if source_tag else '' |
|
|
if url_val and title: |
|
|
results.append(BingNewsResult(title=title, url=url_val, description=description, source=source)) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
if not results: |
|
|
for item in soup.select("a.title"): |
|
|
title = item.get_text(strip=True) |
|
|
url_val = item['href'] if item.has_attr('href') else '' |
|
|
description = '' |
|
|
source = '' |
|
|
if url_val and title: |
|
|
results.append(BingNewsResult(title=title, url=url_val, description=description, source=source)) |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
return results[:max_results] |
|
|
|
|
|
|
|
|
bing = BingSearch() |
|
|
|
|
|
@app.get("/search", response_model=List[BingSearchResult]) |
|
|
async def text_search( |
|
|
query: str = Query(..., description="The search keywords."), |
|
|
region: Optional[str] = Query(None, description="The region for the search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of results to return."), |
|
|
): |
|
|
""" |
|
|
Perform a text search on Bing. |
|
|
""" |
|
|
try: |
|
|
results = bing.text( |
|
|
keywords=query, |
|
|
region=region, |
|
|
safesearch=safesearch, |
|
|
max_results=max_results, |
|
|
) |
|
|
return results |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/suggestions", response_model=List[str]) |
|
|
async def get_suggestions( |
|
|
query: str = Query(..., description="The search query for which to fetch suggestions."), |
|
|
region: Optional[str] = Query(None, description="The region for the suggestions (e.g., 'en-US')."), |
|
|
): |
|
|
""" |
|
|
Fetches search suggestions for a given query. |
|
|
""" |
|
|
try: |
|
|
suggestions = bing.suggestions(query=query, region=region) |
|
|
return suggestions |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/images", response_model=List[BingImageResult]) |
|
|
async def image_search( |
|
|
query: str = Query(..., description="The search keywords for images."), |
|
|
region: Optional[str] = Query(None, description="The region for the image search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of image results to return."), |
|
|
): |
|
|
""" |
|
|
Perform an image search on Bing. |
|
|
""" |
|
|
try: |
|
|
results = bing.images( |
|
|
keywords=query, |
|
|
region=region, |
|
|
safesearch=safesearch, |
|
|
max_results=max_results, |
|
|
) |
|
|
return results |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/news", response_model=List[BingNewsResult]) |
|
|
async def news_search( |
|
|
query: str = Query(..., description="The search keywords for news."), |
|
|
region: Optional[str] = Query(None, description="The region for the news search (e.g., 'us-US')."), |
|
|
safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."), |
|
|
max_results: int = Query(10, description="Maximum number of news results to return."), |
|
|
): |
|
|
""" |
|
|
Perform a news search on Bing. |
|
|
""" |
|
|
try: |
|
|
results = bing.news( |
|
|
keywords=query, |
|
|
region=region, |
|
|
safesearch=safesearch, |
|
|
max_results=max_results, |
|
|
) |
|
|
return results |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |