rkihacker commited on
Commit
db4af16
verified
1 Parent(s): ec1cc34

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +133 -58
main.py CHANGED
@@ -2,6 +2,7 @@ import time
2
  import json
3
  import base64
4
  from typing import List, Optional, Dict, Any
 
5
 
6
  import uvicorn
7
  from fastapi import FastAPI, HTTPException, Query, Request, Response
@@ -14,7 +15,10 @@ from bs4 import BeautifulSoup
14
  # --- Pydantic Models for API Responses ---
15
 
16
  class SearchResultMetadata(BaseModel):
17
- """Defines the structure for metadata associated with a search result."""
 
 
 
18
  sitelinks: Optional[List[Dict[str, str]]] = Field(
19
  None, description="A list of sitelinks (title and URL) found under the main result."
20
  )
@@ -23,16 +27,16 @@ class SearchResultMetadata(BaseModel):
23
  )
24
 
25
  class BingSearchResult(BaseModel):
26
- """Represents a single text search result."""
27
  url: str = Field(..., description="The direct URL of the search result.")
28
  title: str = Field(..., description="The title of the search result.")
29
  description: str = Field(..., description="A brief description or snippet of the search result.")
30
  metadata: SearchResultMetadata = Field(
31
- default_factory=SearchResultMetadata, description="Additional metadata scraped for the result."
32
  )
33
 
34
  class BingImageResult(BaseModel):
35
- """Represents a single image search result."""
36
  title: str = Field(..., description="The title or description of the image.")
37
  image_url: str = Field(..., description="The direct URL to the full-size image.")
38
  thumbnail_url: str = Field(..., description="The URL to the thumbnail of the image.")
@@ -40,7 +44,7 @@ class BingImageResult(BaseModel):
40
  source: str = Field(..., description="The source or domain of the image.")
41
 
42
  class BingNewsResult(BaseModel):
43
- """Represents a single news article search result."""
44
  title: str = Field(..., description="The headline of the news article.")
45
  url: str = Field(..., description="The URL to the full news article.")
46
  description: str = Field(..., description="A snippet from the news article.")
@@ -50,7 +54,10 @@ class BingNewsResult(BaseModel):
50
  # --- Custom Middleware for Response Headers ---
51
 
52
  class CustomHeaderMiddleware(BaseHTTPMiddleware):
53
- """Middleware to add custom headers to every API response."""
 
 
 
54
  async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
55
  start_time = time.time()
56
  response = await call_next(request)
@@ -63,17 +70,21 @@ class CustomHeaderMiddleware(BaseHTTPMiddleware):
63
  # --- Bing Search Service ---
64
 
65
  class BingSearch:
66
- """Asynchronous Bing search implementation with advanced web scraping capabilities."""
 
 
 
 
67
 
68
  def __init__(
69
  self,
70
- timeout: int = 10,
71
  proxies: Optional[Dict[str, str]] = None,
72
  lang: str = "en-US",
73
  impersonate: str = "chrome110"
74
  ):
75
  self.timeout = timeout
76
- self.proxies = proxies if proxies else {}
77
  self.lang = lang
78
  self._base_url = "https://www.bing.com"
79
  self.session = AsyncSession(
@@ -81,52 +92,54 @@ class BingSearch:
81
  timeout=self.timeout,
82
  impersonate=impersonate
83
  )
 
84
  self.session.headers.update({
85
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
 
86
  })
87
 
88
  async def _fetch_html(self, url: str) -> str:
 
89
  try:
90
  resp = await self.session.get(url)
91
  resp.raise_for_status()
92
  return resp.text
93
  except Exception as e:
94
- raise HTTPException(status_code=502, detail=f"Failed to fetch Bing search results: {e}")
 
95
 
96
  def _parse_url(self, url: Optional[str]) -> str:
 
97
  if not url:
98
  return ""
99
  try:
100
- # Bing often uses a redirect URL; this attempts to extract the real URL.
101
  parsed_url = urlparse(url)
102
  query_params = parse_qs(parsed_url.query)
103
  if "u" in query_params:
104
- # The real URL is often Base64 encoded in the 'u' parameter.
105
- encoded_url = query_params["u"][0].replace("h=", "").split("&")[0]
106
- # Pad the string for correct Base64 decoding.
107
  decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===')
108
  return decoded_bytes.decode('utf-8', errors='ignore')
109
- except Exception:
110
- # If parsing fails, return the original URL.
111
  return url
112
  return url
113
 
114
  async def text(
115
- self,
116
- keywords: str,
117
- region: Optional[str] = None,
118
- max_results: int = 10,
119
  ) -> List[BingSearchResult]:
 
120
  if not keywords:
121
  raise ValueError("Search keywords cannot be empty.")
122
 
123
- fetched_results = []
124
- url = f'{self._base_url}/search?q={urlencode({"q": keywords})}&form=QBLH'
125
  if region:
126
  url += f"&setmkt={region}"
127
 
128
  html = await self._fetch_html(url)
129
  soup = BeautifulSoup(html, "html.parser")
 
130
 
131
  for result in soup.select('li.b_algo'):
132
  if len(fetched_results) >= max_results:
@@ -134,7 +147,6 @@ class BingSearch:
134
 
135
  title_tag = result.find('h2')
136
  link_tag = title_tag.find('a') if title_tag else None
137
-
138
  if not link_tag or not link_tag.has_attr('href'):
139
  continue
140
 
@@ -142,38 +154,40 @@ class BingSearch:
142
  title = link_tag.get_text(strip=True)
143
  description = result.find('p').get_text(strip=True) if result.find('p') else ""
144
 
145
- # --- Metadata Extraction ---
146
- sitelinks = []
147
- sitelinks_container = result.select_one('ul.b_vlist')
148
- if sitelinks_container:
149
- for link_item in sitelinks_container.select('li a'):
150
- sitelinks.append({
151
- "title": link_item.get_text(strip=True),
152
- "url": self._parse_url(link_item.get('href'))
153
- })
154
-
155
- displayed_url_tag = result.select_one('cite')
156
- displayed_url = displayed_url_tag.get_text(strip=True) if displayed_url_tag else None
157
-
158
- metadata = SearchResultMetadata(
159
- sitelinks=sitelinks if sitelinks else None,
160
- displayed_url=displayed_url
161
- )
162
 
163
  if url_val and title:
164
  fetched_results.append(
165
  BingSearchResult(url=url_val, title=title, description=description, metadata=metadata)
166
  )
167
-
168
  return fetched_results
169
 
170
- async def images(
171
- self, keywords: str, max_results: int = 10
172
- ) -> List[BingImageResult]:
 
 
 
 
 
 
 
 
 
 
 
 
 
173
  if not keywords:
174
  raise ValueError("Search keywords cannot be empty.")
175
 
176
- url = f"{self._base_url}/images/search?{urlencode({'q': keywords})}"
 
177
  html = await self._fetch_html(url)
178
  soup = BeautifulSoup(html, "html.parser")
179
  results = []
@@ -182,15 +196,12 @@ class BingSearch:
182
  if len(results) >= max_results:
183
  break
184
  try:
185
- meta_json = item.get("m")
186
- if not meta_json:
187
- continue
188
- meta = json.loads(meta_json)
189
- if meta.get("murl"):
190
  results.append(
191
  BingImageResult(
192
  title=meta.get("t", ""),
193
- image_url=meta.get("murl", ""),
194
  thumbnail_url=meta.get("turl", ""),
195
  page_url=meta.get("purl", ""),
196
  source=urlparse(meta.get("purl", "")).netloc
@@ -200,13 +211,45 @@ class BingSearch:
200
  continue
201
  return results
202
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
 
204
  # --- FastAPI Application Setup ---
205
 
206
  app = FastAPI(
207
  title="Bing Search API",
208
  description="An advanced, asynchronous FastAPI wrapper to scrape Bing search results, powered by NiansuhAI.",
209
- version="3.0.0",
210
  )
211
 
212
  app.add_middleware(CustomHeaderMiddleware)
@@ -219,27 +262,59 @@ bing_search_service = BingSearch()
219
  async def text_search(
220
  query: str = Query(..., description="The search keywords."),
221
  region: Optional[str] = Query(None, description="The market/region for the search (e.g., 'en-US')."),
222
- max_results: int = Query(10, ge=1, le=50, description="Maximum number of results to return."),
223
  ):
 
 
 
 
224
  try:
225
  return await bing_search_service.text(keywords=query, region=region, max_results=max_results)
226
  except ValueError as e:
227
  raise HTTPException(status_code=400, detail=str(e))
228
  except Exception as e:
229
- raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
230
 
231
  @app.get("/images", response_model=List[BingImageResult], summary="Perform an image search")
232
  async def image_search(
233
  query: str = Query(..., description="The search keywords for images."),
234
- max_results: int = Query(10, ge=1, le=50, description="Maximum number of image results to return."),
235
  ):
 
236
  try:
237
  return await bing_search_service.images(keywords=query, max_results=max_results)
238
  except ValueError as e:
239
  raise HTTPException(status_code=400, detail=str(e))
240
- except Exception as e:
241
- raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
 
243
 
244
  if __name__ == "__main__":
 
245
  uvicorn.run(app, host="0.0.0.0", port=8000)
 
2
  import json
3
  import base64
4
  from typing import List, Optional, Dict, Any
5
+ from urllib.parse import urlencode, urlparse, parse_qs
6
 
7
  import uvicorn
8
  from fastapi import FastAPI, HTTPException, Query, Request, Response
 
15
  # --- Pydantic Models for API Responses ---
16
 
17
  class SearchResultMetadata(BaseModel):
18
+ """
19
+ Defines the structure for rich metadata associated with a search result,
20
+ such as sitelinks and the display URL.
21
+ """
22
  sitelinks: Optional[List[Dict[str, str]]] = Field(
23
  None, description="A list of sitelinks (title and URL) found under the main result."
24
  )
 
27
  )
28
 
29
  class BingSearchResult(BaseModel):
30
+ """Represents a single text search result from Bing."""
31
  url: str = Field(..., description="The direct URL of the search result.")
32
  title: str = Field(..., description="The title of the search result.")
33
  description: str = Field(..., description="A brief description or snippet of the search result.")
34
  metadata: SearchResultMetadata = Field(
35
+ default_factory=SearchResultMetadata, description="Additional rich metadata scraped for the result."
36
  )
37
 
38
  class BingImageResult(BaseModel):
39
+ """Represents a single image search result from Bing."""
40
  title: str = Field(..., description="The title or description of the image.")
41
  image_url: str = Field(..., description="The direct URL to the full-size image.")
42
  thumbnail_url: str = Field(..., description="The URL to the thumbnail of the image.")
 
44
  source: str = Field(..., description="The source or domain of the image.")
45
 
46
  class BingNewsResult(BaseModel):
47
+ """Represents a single news article search result from Bing."""
48
  title: str = Field(..., description="The headline of the news article.")
49
  url: str = Field(..., description="The URL to the full news article.")
50
  description: str = Field(..., description="A snippet from the news article.")
 
54
  # --- Custom Middleware for Response Headers ---
55
 
56
  class CustomHeaderMiddleware(BaseHTTPMiddleware):
57
+ """
58
+ This middleware adds custom headers to every API response, including
59
+ the processing time and a 'Powered-By' header.
60
+ """
61
  async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
62
  start_time = time.time()
63
  response = await call_next(request)
 
70
  # --- Bing Search Service ---
71
 
72
  class BingSearch:
73
+ """
74
+ An asynchronous service class for scraping search results from Bing.
75
+ It handles text, image, news, and suggestion searches using curl_cffi
76
+ for efficient, non-blocking HTTP requests.
77
+ """
78
 
79
  def __init__(
80
  self,
81
+ timeout: int = 15,
82
  proxies: Optional[Dict[str, str]] = None,
83
  lang: str = "en-US",
84
  impersonate: str = "chrome110"
85
  ):
86
  self.timeout = timeout
87
+ self.proxies = proxies or {}
88
  self.lang = lang
89
  self._base_url = "https://www.bing.com"
90
  self.session = AsyncSession(
 
92
  timeout=self.timeout,
93
  impersonate=impersonate
94
  )
95
+ # Use a realistic User-Agent to mimic a real browser
96
  self.session.headers.update({
97
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
98
+ "Accept-Language": "en-US,en;q=0.9",
99
  })
100
 
101
  async def _fetch_html(self, url: str) -> str:
102
+ """Asynchronously fetches HTML content from a given URL."""
103
  try:
104
  resp = await self.session.get(url)
105
  resp.raise_for_status()
106
  return resp.text
107
  except Exception as e:
108
+ # Raise an HTTPException that FastAPI can handle gracefully
109
+ raise HTTPException(status_code=502, detail=f"Failed to fetch Bing content: {e}")
110
 
111
  def _parse_url(self, url: Optional[str]) -> str:
112
+ """Decodes Bing's redirect URLs to find the actual destination URL."""
113
  if not url:
114
  return ""
115
  try:
 
116
  parsed_url = urlparse(url)
117
  query_params = parse_qs(parsed_url.query)
118
  if "u" in query_params:
119
+ encoded_url = query_params["u"][0].split("&")[0]
120
+ # Decode the Base64-encoded URL
 
121
  decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===')
122
  return decoded_bytes.decode('utf-8', errors='ignore')
123
+ except (KeyError, IndexError, Exception):
124
+ # Fallback to the original URL if parsing fails
125
  return url
126
  return url
127
 
128
  async def text(
129
+ self, keywords: str, region: Optional[str], max_results: int
 
 
 
130
  ) -> List[BingSearchResult]:
131
+ """Performs a text search and scrapes the results page."""
132
  if not keywords:
133
  raise ValueError("Search keywords cannot be empty.")
134
 
135
+ params = {"q": keywords, "form": "QBLH"}
136
+ url = f'{self._base_url}/search?{urlencode(params)}'
137
  if region:
138
  url += f"&setmkt={region}"
139
 
140
  html = await self._fetch_html(url)
141
  soup = BeautifulSoup(html, "html.parser")
142
+ fetched_results = []
143
 
144
  for result in soup.select('li.b_algo'):
145
  if len(fetched_results) >= max_results:
 
147
 
148
  title_tag = result.find('h2')
149
  link_tag = title_tag.find('a') if title_tag else None
 
150
  if not link_tag or not link_tag.has_attr('href'):
151
  continue
152
 
 
154
  title = link_tag.get_text(strip=True)
155
  description = result.find('p').get_text(strip=True) if result.find('p') else ""
156
 
157
+ sitelinks = [
158
+ {"title": a.get_text(strip=True), "url": self._parse_url(a.get('href'))}
159
+ for a in result.select('ul.b_vlist li a')
160
+ ]
161
+ displayed_url = result.cite.get_text(strip=True) if result.cite else None
162
+ metadata = SearchResultMetadata(sitelinks=sitelinks or None, displayed_url=displayed_url)
 
 
 
 
 
 
 
 
 
 
 
163
 
164
  if url_val and title:
165
  fetched_results.append(
166
  BingSearchResult(url=url_val, title=title, description=description, metadata=metadata)
167
  )
 
168
  return fetched_results
169
 
170
+ async def suggestions(self, query: str, region: Optional[str]) -> List[str]:
171
+ """Fetches auto-complete suggestions for a given query."""
172
+ if not query:
173
+ raise ValueError("Search query cannot be empty.")
174
+ params = {"query": query, "mkt": region or "en-US"}
175
+ url = f"https://api.bing.com/osjson.aspx?{urlencode(params)}"
176
+ try:
177
+ resp = await self.session.get(url)
178
+ resp.raise_for_status()
179
+ data = resp.json()
180
+ return data[1] if isinstance(data, list) and len(data) > 1 else []
181
+ except Exception as e:
182
+ raise HTTPException(status_code=502, detail=f"Failed to fetch suggestions: {e}")
183
+
184
+ async def images(self, keywords: str, max_results: int) -> List[BingImageResult]:
185
+ """Performs an image search and scrapes the results."""
186
  if not keywords:
187
  raise ValueError("Search keywords cannot be empty.")
188
 
189
+ params = {"q": keywords, "count": max_results}
190
+ url = f"{self._base_url}/images/search?{urlencode(params)}"
191
  html = await self._fetch_html(url)
192
  soup = BeautifulSoup(html, "html.parser")
193
  results = []
 
196
  if len(results) >= max_results:
197
  break
198
  try:
199
+ meta = json.loads(item["m"])
200
+ if "murl" in meta:
 
 
 
201
  results.append(
202
  BingImageResult(
203
  title=meta.get("t", ""),
204
+ image_url=meta["murl"],
205
  thumbnail_url=meta.get("turl", ""),
206
  page_url=meta.get("purl", ""),
207
  source=urlparse(meta.get("purl", "")).netloc
 
211
  continue
212
  return results
213
 
214
+ async def news(self, keywords: str, region: Optional[str], max_results: int) -> List[BingNewsResult]:
215
+ """Performs a news search and scrapes the results."""
216
+ if not keywords:
217
+ raise ValueError("Search keywords cannot be empty.")
218
+
219
+ params = {"q": keywords, "form": "QBNH"}
220
+ if region:
221
+ params["mkt"] = region
222
+
223
+ url = f"{self._base_url}/news/search?{urlencode(params)}"
224
+ html = await self._fetch_html(url)
225
+ soup = BeautifulSoup(html, "html.parser")
226
+ results = []
227
+
228
+ for item in soup.select("div.news-card"):
229
+ if len(results) >= max_results:
230
+ break
231
+ a_tag = item.find("a", class_="title")
232
+ snippet_tag = item.find("div", class_="snippet")
233
+ source_tag = item.find("div", class_="source")
234
+
235
+ if a_tag and a_tag.has_attr('href'):
236
+ results.append(
237
+ BingNewsResult(
238
+ title=a_tag.get_text(strip=True),
239
+ url=a_tag['href'],
240
+ description=snippet_tag.get_text(strip=True) if snippet_tag else "",
241
+ source=source_tag.get_text(strip=True).split('路')[0].strip() if source_tag else "",
242
+ )
243
+ )
244
+ return results
245
+
246
 
247
  # --- FastAPI Application Setup ---
248
 
249
  app = FastAPI(
250
  title="Bing Search API",
251
  description="An advanced, asynchronous FastAPI wrapper to scrape Bing search results, powered by NiansuhAI.",
252
+ version="3.1.0",
253
  )
254
 
255
  app.add_middleware(CustomHeaderMiddleware)
 
262
  async def text_search(
263
  query: str = Query(..., description="The search keywords."),
264
  region: Optional[str] = Query(None, description="The market/region for the search (e.g., 'en-US')."),
265
+ max_results: int = Query(10, ge=1, le=30, description="Maximum number of results to return."),
266
  ):
267
+ """
268
+ Performs a text search on Bing and returns a list of results,
269
+ each enriched with metadata like sitelinks.
270
+ """
271
  try:
272
  return await bing_search_service.text(keywords=query, region=region, max_results=max_results)
273
  except ValueError as e:
274
  raise HTTPException(status_code=400, detail=str(e))
275
  except Exception as e:
276
+ # Catch-all for any other unexpected errors
277
+ raise HTTPException(status_code=500, detail=f"An unexpected internal error occurred: {e}")
278
+
279
+ @app.get("/suggestions", response_model=List[str], summary="Get real-time search suggestions")
280
+ async def get_suggestions(
281
+ query: str = Query(..., description="The partial search query for which to fetch suggestions."),
282
+ region: Optional[str] = Query(None, description="The region for the suggestions (e.g., 'en-US')."),
283
+ ):
284
+ """Fetches real-time search suggestions from Bing's autocomplete service."""
285
+ try:
286
+ return await bing_search_service.suggestions(query=query, region=region)
287
+ except ValueError as e:
288
+ raise HTTPException(status_code=400, detail=str(e))
289
 
290
  @app.get("/images", response_model=List[BingImageResult], summary="Perform an image search")
291
  async def image_search(
292
  query: str = Query(..., description="The search keywords for images."),
293
+ max_results: int = Query(20, ge=1, le=100, description="Maximum number of image results to return."),
294
  ):
295
+ """Performs an image search on Bing and returns a list of image results."""
296
  try:
297
  return await bing_search_service.images(keywords=query, max_results=max_results)
298
  except ValueError as e:
299
  raise HTTPException(status_code=400, detail=str(e))
300
+
301
+ @app.get("/news", response_model=List[BingNewsResult], summary="Perform a news search")
302
+ async def news_search(
303
+ query: str = Query(..., description="The search keywords for news articles."),
304
+ region: Optional[str] = Query(None, description="The region for the news search (e.g., 'en-US')."),
305
+ max_results: int = Query(15, ge=1, le=50, description="Maximum number of news results to return."),
306
+ ):
307
+ """Performs a news search on Bing and returns a list of recent articles."""
308
+ try:
309
+ return await bing_search_service.news(keywords=query, region=region, max_results=max_results)
310
+ except ValueError as e:
311
+ raise HTTPException(status_code=400, detail=str(e))
312
+
313
+ @app.get("/", include_in_schema=False)
314
+ async def root():
315
+ return {"message": "Bing Search API is running. Visit /docs for documentation."}
316
 
317
 
318
  if __name__ == "__main__":
319
+ # Standard entry point to run the FastAPI application using Uvicorn
320
  uvicorn.run(app, host="0.0.0.0", port=8000)