import asyncio import base64 import json from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional from urllib.parse import parse_qs, urlparse from bs4 import BeautifulSoup # This import will work correctly after running `pip install --upgrade curl_cffi` from curl_cffi.aio import AsyncSession from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel, Field from webscout.litagent import LitAgent # --- FastAPI App Definition --- app = FastAPI( title="Snapzion Enhanced Search API", description="An advanced FastAPI wrapper for Bing Search, featuring AI-powered summarization and metadata enrichment.", version="2.0.1", # Version bump ) # --- Pydantic Models for Clearer Responses --- class BaseSearchResult(BaseModel): url: str title: str description: str class EnhancedBingSearchResult(BaseSearchResult): """Model for the enhanced search results with summary and metadata.""" summary: Optional[str] = Field(None, description="AI-generated summary of the page content.") source: Optional[str] = Field(None, description="The domain name of the result URL.") favicon: Optional[str] = Field(None, description="URL of the website's favicon.") class BingImageResult(BaseModel): title: str image: str thumbnail: str url: str source: str class BingNewsResult(BaseModel): title: str url: str description: str source: str = "" # --- Enhanced BingSearch Library --- class BingSearch: """ Bing search implementation rewritten for asynchronous performance and enhanced data retrieval. """ _lit_agent_instance: Optional[LitAgent] = None _executor = ThreadPoolExecutor(max_workers=10) def __init__( self, timeout: int = 10, proxies: Optional[Dict[str, str]] = None, verify: bool = True, lang: str = "en-US", sleep_interval: float = 0.0, impersonate: str = "chrome110" ): self.timeout = timeout self.proxies = proxies if proxies else {} self.verify = verify self.lang = lang self.sleep_interval = sleep_interval self._base_url = "https://www.bing.com" self.session = AsyncSession( proxies=self.proxies, verify=self.verify, timeout=self.timeout, impersonate=impersonate ) self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" }) @classmethod def get_lit_agent(cls) -> LitAgent: """Initializes LitAgent lazily.""" if cls._lit_agent_instance is None: cls._lit_agent_instance = LitAgent() return cls._lit_agent_instance async def _summarize_content(self, html_content: str) -> str: """Runs the synchronous summarize method in a thread pool.""" loop = asyncio.get_running_loop() agent = self.get_lit_agent() try: summary = await loop.run_in_executor( self._executor, agent.summarize, html_content ) return summary except Exception as e: print(f"Error during summarization: {e}") return "Could not generate summary." async def _enhance_result(self, result: BaseSearchResult) -> EnhancedBingSearchResult: """Fetches page content, generates summary, and extracts metadata.""" enhanced_result = EnhancedBingSearchResult(**result.model_dump()) try: parsed_url = urlparse(result.url) enhanced_result.source = parsed_url.netloc resp = await self.session.get(result.url, timeout=self.timeout) resp.raise_for_status() html = resp.text summary = await self._summarize_content(html) enhanced_result.summary = summary soup = BeautifulSoup(html, "html.parser") favicon_tag = soup.find("link", rel=lambda r: r and "icon" in r.lower()) if favicon_tag and favicon_tag.get("href"): favicon_url = favicon_tag["href"] if not favicon_url.startswith(('http://', 'https://', '//')): favicon_url = f"{parsed_url.scheme}://{parsed_url.netloc}{'/' if not favicon_url.startswith('/') else ''}{favicon_url}" elif favicon_url.startswith('//'): favicon_url = f"{parsed_url.scheme}:{favicon_url}" enhanced_result.favicon = favicon_url except Exception as e: print(f"Failed to enhance URL {result.url}: {e}") return enhanced_result def _selectors(self, element): selectors = { 'links': 'ol#b_results > li.b_algo', 'next': 'a.sb_pagN' } return selectors.get(element, '') def _first_page(self, query): url = f'{self._base_url}/search?q={query}&search=&form=QBLH' return {'url': url, 'data': None} def _next_page(self, soup): selector = self._selectors('next') next_page_tag = soup.select_one(selector) if next_page_tag and next_page_tag.get('href'): return {'url': self._base_url + next_page_tag['href'], 'data': None} return {'url': None, 'data': None} def _get_url(self, tag): # A more direct approach that is often sufficient return tag.get('href', '') async def text( self, keywords: str, max_results: int = 10, enhanced: bool = False, **kwargs ) -> List[BaseSearchResult | EnhancedBingSearchResult]: if not keywords: raise ValueError("Search keywords cannot be empty") fetched_results = [] fetched_links = set() async def fetch_page(url): try: resp = await self.session.get(url) resp.raise_for_status() return resp.text except Exception as e: raise Exception(f"Bing search failed: {str(e)}") current_url = self._first_page(keywords)['url'] while current_url and len(fetched_results) < max_results: html = await fetch_page(current_url) soup = BeautifulSoup(html, "html.parser") result_blocks = soup.select(self._selectors('links')) for result in result_blocks: title_tag = result.find('h2') if not title_tag: continue link_tag = title_tag.find('a') if not link_tag or not link_tag.has_attr('href'): continue url_val = self._get_url(link_tag) title = title_tag.get_text(strip=True) desc_container = result.find('div', class_='b_caption') description = "" if desc_container: p_tag = desc_container.find('p') if p_tag: description = p_tag.get_text(strip=True) if url_val and title: if url_val in fetched_links: continue fetched_results.append(BaseSearchResult(url=url_val, title=title, description=description)) fetched_links.add(url_val) if len(fetched_results) >= max_results: break if len(fetched_results) >= max_results: break next_page_info = self._next_page(soup) current_url = next_page_info['url'] if current_url: await asyncio.sleep(self.sleep_interval) results_to_return = fetched_results[:max_results] if enhanced and results_to_return: enhancement_tasks = [self._enhance_result(res) for res in results_to_return] return await asyncio.gather(*enhancement_tasks) return results_to_return async def suggestions(self, query: str, **kwargs) -> List[str]: if not query: raise ValueError("Query cannot be empty") region = kwargs.get('region', 'en-US') url = f"https://api.bing.com/osjson.aspx?query={query}&mkt={region}" resp = await self.session.get(url) resp.raise_for_status() data = resp.json() return data[1] if isinstance(data, list) and len(data) > 1 else [] async def images(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingImageResult]: if not keywords: raise ValueError("Keywords cannot be empty") url = f"{self._base_url}/images/search?q={keywords}&count={max_results}" resp = await self.session.get(url) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") results = [] for item in soup.select("a.iusc"): try: m_data = item.get("m") if not m_data: continue meta = json.loads(m_data) if meta.get("murl"): results.append(BingImageResult(title=meta.get("t", ""), image=meta.get("murl"), thumbnail=meta.get("turl", ""), url=meta.get("purl", ""), source=meta.get("surl", ""))) if len(results) >= max_results: break except Exception: continue return results async def news(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingNewsResult]: if not keywords: raise ValueError("Keywords cannot be empty") url = f"{self._base_url}/news/search?q={keywords}" resp = await self.session.get(url) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") results = [] for item in soup.select("div.news-card"): a_tag = item.find("a", class_="title") if not (a_tag and a_tag.has_attr('href')): continue desc_tag = item.find("div", class_="snippet") source_tag = item.find(attrs={"aria-label": "Publisher"}) results.append(BingNewsResult(title=a_tag.get_text(strip=True), url=a_tag['href'], description=desc_tag.get_text(strip=True) if desc_tag else "", source=source_tag.get_text(strip=True) if source_tag else "")) if len(results) >= max_results: break return results bing = BingSearch() # --- FastAPI Endpoints --- @app.get("/search", response_model=List[EnhancedBingSearchResult | BaseSearchResult], summary="Perform a standard or enhanced text search") async def text_search( query: str = Query(..., description="The search keywords."), max_results: int = Query(10, description="Maximum number of results to return."), enhanced: bool = Query(False, description="Enable AI summarization and metadata fetching (slower but more detailed).") ): """ Perform a text search on Bing. - Set `enhanced=true` to get AI-powered summaries and additional metadata for each result. """ try: results = await bing.text( keywords=query, max_results=max_results, enhanced=enhanced ) return results except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/suggestions", response_model=List[str], summary="Fetch search suggestions") async def get_suggestions( query: str = Query(..., description="The search query for which to fetch suggestions."), ): try: return await bing.suggestions(query=query) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/images", response_model=List[BingImageResult], summary="Search for images") async def image_search( query: str = Query(..., description="The search keywords for images."), max_results: int = Query(10, description="Maximum number of image results to return."), ): try: return await bing.images(keywords=query, max_results=max_results) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/news", response_model=List[BingNewsResult], summary="Search for news articles") async def news_search( query: str = Query(..., description="The search keywords for news."), max_results: int = Query(10, description="Maximum number of news results to return."), ): try: return await bing.news(keywords=query, max_results=max_results) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)