rkihacker commited on
Commit
ec1cc34
·
verified ·
1 Parent(s): 2bdf25f

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +94 -164
main.py CHANGED
@@ -2,8 +2,6 @@ import time
2
  import json
3
  import base64
4
  from typing import List, Optional, Dict, Any
5
- from urllib.parse import urlencode, urlparse, parse_qs
6
- from concurrent.futures import ThreadPoolExecutor
7
 
8
  import uvicorn
9
  from fastapi import FastAPI, HTTPException, Query, Request, Response
@@ -15,32 +13,45 @@ from bs4 import BeautifulSoup
15
 
16
  # --- Pydantic Models for API Responses ---
17
 
 
 
 
 
 
 
 
 
 
18
  class BingSearchResult(BaseModel):
19
- url: str = Field(..., description="The URL of the search result.")
 
20
  title: str = Field(..., description="The title of the search result.")
21
- description: str = Field(..., description="A brief description of the search result.")
22
- metadata: Dict[str, Any] = Field({}, description="Additional metadata for the result.")
 
 
23
 
24
  class BingImageResult(BaseModel):
25
- title: str = Field(..., description="The title of the image.")
 
26
  image_url: str = Field(..., description="The direct URL to the full-size image.")
27
  thumbnail_url: str = Field(..., description="The URL to the thumbnail of the image.")
28
  page_url: str = Field(..., description="The URL of the page where the image was found.")
29
  source: str = Field(..., description="The source or domain of the image.")
30
 
31
  class BingNewsResult(BaseModel):
32
- title: str = Field(..., description="The title of the news article.")
33
- url: str = Field(..., description="The URL to the news article.")
 
34
  description: str = Field(..., description="A snippet from the news article.")
35
- source: str = Field("", description="The source of the news article.")
36
 
37
 
38
  # --- Custom Middleware for Response Headers ---
39
 
40
  class CustomHeaderMiddleware(BaseHTTPMiddleware):
41
- async def dispatch(
42
- self, request: Request, call_next: RequestResponseEndpoint
43
- ) -> Response:
44
  start_time = time.time()
45
  response = await call_next(request)
46
  process_time = time.time() - start_time
@@ -52,29 +63,26 @@ class CustomHeaderMiddleware(BaseHTTPMiddleware):
52
  # --- Bing Search Service ---
53
 
54
  class BingSearch:
55
- """Asynchronous Bing search implementation with configurable parameters and advanced features."""
56
 
57
  def __init__(
58
  self,
59
  timeout: int = 10,
60
  proxies: Optional[Dict[str, str]] = None,
61
- verify: bool = True,
62
  lang: str = "en-US",
63
  impersonate: str = "chrome110"
64
  ):
65
  self.timeout = timeout
66
  self.proxies = proxies if proxies else {}
67
- self.verify = verify
68
  self.lang = lang
69
  self._base_url = "https://www.bing.com"
70
  self.session = AsyncSession(
71
  proxies=self.proxies,
72
- verify=self.verify,
73
  timeout=self.timeout,
74
  impersonate=impersonate
75
  )
76
  self.session.headers.update({
77
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
78
  })
79
 
80
  async def _fetch_html(self, url: str) -> str:
@@ -83,18 +91,23 @@ class BingSearch:
83
  resp.raise_for_status()
84
  return resp.text
85
  except Exception as e:
86
- raise HTTPException(status_code=500, detail=f"Failed to fetch Bing search results: {e}")
87
 
88
- def _get_url(self, tag):
89
- url = tag.get('href', '')
 
90
  try:
 
91
  parsed_url = urlparse(url)
92
  query_params = parse_qs(parsed_url.query)
93
  if "u" in query_params:
94
- encoded_url = query_params["u"][0][2:]
 
 
95
  decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===')
96
- return decoded_bytes.decode('utf-8')
97
  except Exception:
 
98
  return url
99
  return url
100
 
@@ -102,91 +115,77 @@ class BingSearch:
102
  self,
103
  keywords: str,
104
  region: Optional[str] = None,
105
- safesearch: str = "moderate",
106
  max_results: int = 10,
107
  ) -> List[BingSearchResult]:
108
  if not keywords:
109
  raise ValueError("Search keywords cannot be empty.")
110
 
111
  fetched_results = []
112
- fetched_links = set()
113
-
114
- url = f'{self._base_url}/search?q={keywords}&form=QBLH'
115
  if region:
116
  url += f"&setmkt={region}"
117
 
118
- while url and len(fetched_results) < max_results:
119
- html = await self._fetch_html(url)
120
- soup = BeautifulSoup(html, "html.parser")
121
-
122
- for result in soup.select('ol#b_results > li.b_algo'):
123
- title_tag = result.find('h2')
124
- if not title_tag:
125
- continue
126
-
127
- link_tag = title_tag.find('a')
128
- if not link_tag or not link_tag.has_attr('href'):
129
- continue
130
-
131
- url_val = self._get_url(link_tag)
132
- title = title_tag.get_text(strip=True)
133
-
134
- desc_container = result.find('div', class_='b_caption')
135
- description = desc_container.get_text(strip=True) if desc_container else ''
136
-
137
- if url_val and title and url_val not in fetched_links:
138
- fetched_results.append(BingSearchResult(url=url_val, title=title, description=description))
139
- fetched_links.add(url_val)
140
- if len(fetched_results) >= max_results:
141
- break
142
-
143
  if len(fetched_results) >= max_results:
144
  break
 
 
 
145
 
146
- next_page_tag = soup.select_one('a.sb_pagN')
147
- url = self._base_url + next_page_tag['href'] if next_page_tag and next_page_tag.get('href') else None
148
 
149
- return fetched_results[:max_results]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
- async def suggestions(self, query: str, region: Optional[str] = None) -> List[str]:
152
- if not query:
153
- raise ValueError("Search query cannot be empty.")
154
- params = {"query": query, "mkt": region if region else "en-US"}
155
- url = f"https://api.bing.com/osjson.aspx?{urlencode(params)}"
156
- try:
157
- resp = await self.session.get(url)
158
- resp.raise_for_status()
159
- data = resp.json()
160
- return data[1] if isinstance(data, list) and len(data) > 1 else []
161
- except Exception as e:
162
- raise HTTPException(status_code=500, detail=f"Failed to fetch suggestions: {e}")
163
 
164
  async def images(
165
- self,
166
- keywords: str,
167
- region: Optional[str] = None,
168
- safesearch: str = "moderate",
169
- max_results: int = 10
170
  ) -> List[BingImageResult]:
171
  if not keywords:
172
  raise ValueError("Search keywords cannot be empty.")
173
 
174
- safe_map = {"on": "Strict", "moderate": "Moderate", "off": "Off"}
175
- params = {
176
- "q": keywords, "count": max_results, "setlang": self.lang,
177
- "safeSearch": safe_map.get(safesearch.lower(), "Moderate"),
178
- }
179
- if region:
180
- params["mkt"] = region
181
-
182
- url = f"{self._base_url}/images/search?{urlencode(params)}"
183
  html = await self._fetch_html(url)
184
  soup = BeautifulSoup(html, "html.parser")
185
  results = []
186
 
187
  for item in soup.select("a.iusc"):
 
 
188
  try:
189
- meta = json.loads(item.get("m", '{}'))
 
 
 
190
  if meta.get("murl"):
191
  results.append(
192
  BingImageResult(
@@ -194,62 +193,20 @@ class BingSearch:
194
  image_url=meta.get("murl", ""),
195
  thumbnail_url=meta.get("turl", ""),
196
  page_url=meta.get("purl", ""),
197
- source=meta.get("surl", "")
198
  )
199
  )
200
- if len(results) >= max_results:
201
- break
202
  except (json.JSONDecodeError, KeyError):
203
  continue
204
- return results[:max_results]
205
-
206
- async def news(
207
- self,
208
- keywords: str,
209
- region: Optional[str] = None,
210
- safesearch: str = "moderate",
211
- max_results: int = 10,
212
- ) -> List[BingNewsResult]:
213
- if not keywords:
214
- raise ValueError("Search keywords cannot be empty.")
215
-
216
- safe_map = {"on": "Strict", "moderate": "Moderate", "off": "Off"}
217
- params = {
218
- "q": keywords, "form": "QBNH",
219
- "safeSearch": safe_map.get(safesearch.lower(), "Moderate"),
220
- }
221
- if region:
222
- params["mkt"] = region
223
-
224
- url = f"{self._base_url}/news/search?{urlencode(params)}"
225
- html = await self._fetch_html(url)
226
- soup = BeautifulSoup(html, "html.parser")
227
- results = []
228
-
229
- for item in soup.select("div.news-card"):
230
- a_tag = item.find("a", class_="title")
231
- if not a_tag:
232
- continue
233
-
234
- results.append(
235
- BingNewsResult(
236
- title=a_tag.get_text(strip=True),
237
- url=a_tag.get('href', ''),
238
- description=item.find("div", class_="snippet").get_text(strip=True) if item.find("div", class_="snippet") else "",
239
- source=item.find("div", class_="source").get_text(strip=True).split('·')[0].strip() if item.find("div", class_="source") else "",
240
- )
241
- )
242
- if len(results) >= max_results:
243
- break
244
- return results[:max_results]
245
 
246
 
247
  # --- FastAPI Application Setup ---
248
 
249
  app = FastAPI(
250
  title="Bing Search API",
251
- description="A FastAPI wrapper for the BingSearch library with advanced features, powered by NiansuhAI.",
252
- version="2.0.0",
253
  )
254
 
255
  app.add_middleware(CustomHeaderMiddleware)
@@ -258,57 +215,30 @@ bing_search_service = BingSearch()
258
 
259
  # --- API Endpoints ---
260
 
261
- @app.get("/search", response_model=List[BingSearchResult], summary="Perform a text search")
262
  async def text_search(
263
  query: str = Query(..., description="The search keywords."),
264
- region: Optional[str] = Query(None, description="The region for the search (e.g., 'us-US')."),
265
- safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."),
266
- max_results: int = Query(10, description="Maximum number of results to return."),
267
- ):
268
- try:
269
- return await bing_search_service.text(
270
- keywords=query, region=region, safesearch=safesearch, max_results=max_results
271
- )
272
- except ValueError as e:
273
- raise HTTPException(status_code=400, detail=str(e))
274
-
275
- @app.get("/suggestions", response_model=List[str], summary="Get search suggestions")
276
- async def get_suggestions(
277
- query: str = Query(..., description="The search query for which to fetch suggestions."),
278
- region: Optional[str] = Query(None, description="The region for the suggestions (e.g., 'en-US')."),
279
  ):
280
  try:
281
- return await bing_search_service.suggestions(query=query, region=region)
282
  except ValueError as e:
283
  raise HTTPException(status_code=400, detail=str(e))
 
 
284
 
285
  @app.get("/images", response_model=List[BingImageResult], summary="Perform an image search")
286
  async def image_search(
287
  query: str = Query(..., description="The search keywords for images."),
288
- region: Optional[str] = Query(None, description="The region for the image search (e.g., 'us-US')."),
289
- safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."),
290
- max_results: int = Query(10, description="Maximum number of image results to return."),
291
  ):
292
  try:
293
- return await bing_search_service.images(
294
- keywords=query, region=region, safesearch=safesearch, max_results=max_results
295
- )
296
- except ValueError as e:
297
- raise HTTPException(status_code=400, detail=str(e))
298
-
299
- @app.get("/news", response_model=List[BingNewsResult], summary="Perform a news search")
300
- async def news_search(
301
- query: str = Query(..., description="The search keywords for news."),
302
- region: Optional[str] = Query(None, description="The region for the news search (e.g., 'us-US')."),
303
- safesearch: str = Query("moderate", description="Safe search level ('on', 'moderate', 'off')."),
304
- max_results: int = Query(10, description="Maximum number of news results to return."),
305
- ):
306
- try:
307
- return await bing_search_service.news(
308
- keywords=query, region=region, safesearch=safesearch, max_results=max_results
309
- )
310
  except ValueError as e:
311
  raise HTTPException(status_code=400, detail=str(e))
 
 
312
 
313
 
314
  if __name__ == "__main__":
 
2
  import json
3
  import base64
4
  from typing import List, Optional, Dict, Any
 
 
5
 
6
  import uvicorn
7
  from fastapi import FastAPI, HTTPException, Query, Request, Response
 
13
 
14
  # --- Pydantic Models for API Responses ---
15
 
16
+ class SearchResultMetadata(BaseModel):
17
+ """Defines the structure for metadata associated with a search result."""
18
+ sitelinks: Optional[List[Dict[str, str]]] = Field(
19
+ None, description="A list of sitelinks (title and URL) found under the main result."
20
+ )
21
+ displayed_url: Optional[str] = Field(
22
+ None, description="The user-friendly display URL or breadcrumb shown on the search page."
23
+ )
24
+
25
  class BingSearchResult(BaseModel):
26
+ """Represents a single text search result."""
27
+ url: str = Field(..., description="The direct URL of the search result.")
28
  title: str = Field(..., description="The title of the search result.")
29
+ description: str = Field(..., description="A brief description or snippet of the search result.")
30
+ metadata: SearchResultMetadata = Field(
31
+ default_factory=SearchResultMetadata, description="Additional metadata scraped for the result."
32
+ )
33
 
34
  class BingImageResult(BaseModel):
35
+ """Represents a single image search result."""
36
+ title: str = Field(..., description="The title or description of the image.")
37
  image_url: str = Field(..., description="The direct URL to the full-size image.")
38
  thumbnail_url: str = Field(..., description="The URL to the thumbnail of the image.")
39
  page_url: str = Field(..., description="The URL of the page where the image was found.")
40
  source: str = Field(..., description="The source or domain of the image.")
41
 
42
  class BingNewsResult(BaseModel):
43
+ """Represents a single news article search result."""
44
+ title: str = Field(..., description="The headline of the news article.")
45
+ url: str = Field(..., description="The URL to the full news article.")
46
  description: str = Field(..., description="A snippet from the news article.")
47
+ source: str = Field("", description="The publisher or source of the news article.")
48
 
49
 
50
  # --- Custom Middleware for Response Headers ---
51
 
52
  class CustomHeaderMiddleware(BaseHTTPMiddleware):
53
+ """Middleware to add custom headers to every API response."""
54
+ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
 
55
  start_time = time.time()
56
  response = await call_next(request)
57
  process_time = time.time() - start_time
 
63
  # --- Bing Search Service ---
64
 
65
  class BingSearch:
66
+ """Asynchronous Bing search implementation with advanced web scraping capabilities."""
67
 
68
  def __init__(
69
  self,
70
  timeout: int = 10,
71
  proxies: Optional[Dict[str, str]] = None,
 
72
  lang: str = "en-US",
73
  impersonate: str = "chrome110"
74
  ):
75
  self.timeout = timeout
76
  self.proxies = proxies if proxies else {}
 
77
  self.lang = lang
78
  self._base_url = "https://www.bing.com"
79
  self.session = AsyncSession(
80
  proxies=self.proxies,
 
81
  timeout=self.timeout,
82
  impersonate=impersonate
83
  )
84
  self.session.headers.update({
85
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
86
  })
87
 
88
  async def _fetch_html(self, url: str) -> str:
 
91
  resp.raise_for_status()
92
  return resp.text
93
  except Exception as e:
94
+ raise HTTPException(status_code=502, detail=f"Failed to fetch Bing search results: {e}")
95
 
96
+ def _parse_url(self, url: Optional[str]) -> str:
97
+ if not url:
98
+ return ""
99
  try:
100
+ # Bing often uses a redirect URL; this attempts to extract the real URL.
101
  parsed_url = urlparse(url)
102
  query_params = parse_qs(parsed_url.query)
103
  if "u" in query_params:
104
+ # The real URL is often Base64 encoded in the 'u' parameter.
105
+ encoded_url = query_params["u"][0].replace("h=", "").split("&")[0]
106
+ # Pad the string for correct Base64 decoding.
107
  decoded_bytes = base64.urlsafe_b64decode(encoded_url + '===')
108
+ return decoded_bytes.decode('utf-8', errors='ignore')
109
  except Exception:
110
+ # If parsing fails, return the original URL.
111
  return url
112
  return url
113
 
 
115
  self,
116
  keywords: str,
117
  region: Optional[str] = None,
 
118
  max_results: int = 10,
119
  ) -> List[BingSearchResult]:
120
  if not keywords:
121
  raise ValueError("Search keywords cannot be empty.")
122
 
123
  fetched_results = []
124
+ url = f'{self._base_url}/search?q={urlencode({"q": keywords})}&form=QBLH'
 
 
125
  if region:
126
  url += f"&setmkt={region}"
127
 
128
+ html = await self._fetch_html(url)
129
+ soup = BeautifulSoup(html, "html.parser")
130
+
131
+ for result in soup.select('li.b_algo'):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  if len(fetched_results) >= max_results:
133
  break
134
+
135
+ title_tag = result.find('h2')
136
+ link_tag = title_tag.find('a') if title_tag else None
137
 
138
+ if not link_tag or not link_tag.has_attr('href'):
139
+ continue
140
 
141
+ url_val = self._parse_url(link_tag.get('href'))
142
+ title = link_tag.get_text(strip=True)
143
+ description = result.find('p').get_text(strip=True) if result.find('p') else ""
144
+
145
+ # --- Metadata Extraction ---
146
+ sitelinks = []
147
+ sitelinks_container = result.select_one('ul.b_vlist')
148
+ if sitelinks_container:
149
+ for link_item in sitelinks_container.select('li a'):
150
+ sitelinks.append({
151
+ "title": link_item.get_text(strip=True),
152
+ "url": self._parse_url(link_item.get('href'))
153
+ })
154
+
155
+ displayed_url_tag = result.select_one('cite')
156
+ displayed_url = displayed_url_tag.get_text(strip=True) if displayed_url_tag else None
157
 
158
+ metadata = SearchResultMetadata(
159
+ sitelinks=sitelinks if sitelinks else None,
160
+ displayed_url=displayed_url
161
+ )
162
+
163
+ if url_val and title:
164
+ fetched_results.append(
165
+ BingSearchResult(url=url_val, title=title, description=description, metadata=metadata)
166
+ )
167
+
168
+ return fetched_results
 
169
 
170
  async def images(
171
+ self, keywords: str, max_results: int = 10
 
 
 
 
172
  ) -> List[BingImageResult]:
173
  if not keywords:
174
  raise ValueError("Search keywords cannot be empty.")
175
 
176
+ url = f"{self._base_url}/images/search?{urlencode({'q': keywords})}"
 
 
 
 
 
 
 
 
177
  html = await self._fetch_html(url)
178
  soup = BeautifulSoup(html, "html.parser")
179
  results = []
180
 
181
  for item in soup.select("a.iusc"):
182
+ if len(results) >= max_results:
183
+ break
184
  try:
185
+ meta_json = item.get("m")
186
+ if not meta_json:
187
+ continue
188
+ meta = json.loads(meta_json)
189
  if meta.get("murl"):
190
  results.append(
191
  BingImageResult(
 
193
  image_url=meta.get("murl", ""),
194
  thumbnail_url=meta.get("turl", ""),
195
  page_url=meta.get("purl", ""),
196
+ source=urlparse(meta.get("purl", "")).netloc
197
  )
198
  )
 
 
199
  except (json.JSONDecodeError, KeyError):
200
  continue
201
+ return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
 
203
 
204
  # --- FastAPI Application Setup ---
205
 
206
  app = FastAPI(
207
  title="Bing Search API",
208
+ description="An advanced, asynchronous FastAPI wrapper to scrape Bing search results, powered by NiansuhAI.",
209
+ version="3.0.0",
210
  )
211
 
212
  app.add_middleware(CustomHeaderMiddleware)
 
215
 
216
  # --- API Endpoints ---
217
 
218
+ @app.get("/search", response_model=List[BingSearchResult], summary="Perform a text search with rich metadata")
219
  async def text_search(
220
  query: str = Query(..., description="The search keywords."),
221
+ region: Optional[str] = Query(None, description="The market/region for the search (e.g., 'en-US')."),
222
+ max_results: int = Query(10, ge=1, le=50, description="Maximum number of results to return."),
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  ):
224
  try:
225
+ return await bing_search_service.text(keywords=query, region=region, max_results=max_results)
226
  except ValueError as e:
227
  raise HTTPException(status_code=400, detail=str(e))
228
+ except Exception as e:
229
+ raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
230
 
231
  @app.get("/images", response_model=List[BingImageResult], summary="Perform an image search")
232
  async def image_search(
233
  query: str = Query(..., description="The search keywords for images."),
234
+ max_results: int = Query(10, ge=1, le=50, description="Maximum number of image results to return."),
 
 
235
  ):
236
  try:
237
+ return await bing_search_service.images(keywords=query, max_results=max_results)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238
  except ValueError as e:
239
  raise HTTPException(status_code=400, detail=str(e))
240
+ except Exception as e:
241
+ raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
242
 
243
 
244
  if __name__ == "__main__":