File size: 12,598 Bytes
084da71
8e70b5f
084da71
e66582e
084da71
5a6ba72
084da71
8e70b5f
5a6ba72
 
084da71
 
 
2b7ed67
084da71
e66582e
084da71
 
5a6ba72
e66582e
075f035
084da71
ec1cc34
084da71
e66582e
 
 
084da71
 
 
 
 
 
2b7ed67
8e70b5f
e66582e
 
 
 
 
2b7ed67
8e70b5f
e66582e
 
 
 
8e70b5f
084da71
 
8e70b5f
084da71
 
 
 
 
 
8e70b5f
 
e66582e
8e70b5f
e66582e
8e70b5f
e66582e
8e70b5f
 
 
e66582e
 
8e70b5f
e66582e
8e70b5f
084da71
8e70b5f
e66582e
8e70b5f
 
 
94fa239
084da71
94fa239
8e70b5f
084da71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a6ba72
 
 
 
084da71
 
 
 
 
e66582e
 
084da71
 
e66582e
084da71
e66582e
 
 
 
 
 
 
 
 
5a6ba72
 
e66582e
 
5a6ba72
 
084da71
 
5a6ba72
084da71
8e70b5f
e66582e
94fa239
db4af16
e66582e
ec1cc34
084da71
e66582e
084da71
e66582e
 
 
 
 
 
 
 
084da71
e66582e
 
 
 
 
 
084da71
e66582e
 
084da71
e66582e
 
 
 
 
5a6ba72
 
 
 
 
 
e66582e
084da71
 
e66582e
084da71
e66582e
084da71
ec1cc34
e66582e
 
 
084da71
ec1cc34
084da71
 
 
 
 
 
 
8e70b5f
5a6ba72
 
 
 
084da71
 
 
 
db4af16
084da71
5a6ba72
084da71
 
 
 
8e70b5f
 
 
5a6ba72
 
 
084da71
 
 
 
 
 
 
5a6ba72
084da71
 
 
e66582e
db4af16
084da71
 
5a6ba72
084da71
 
 
 
 
e66582e
 
 
084da71
 
5a6ba72
5119d9b
8e70b5f
e66582e
084da71
2b7ed67
db4af16
e66582e
084da71
db4af16
8e70b5f
084da71
e66582e
 
084da71
e66582e
 
ec1cc34
e66582e
db4af16
5a6ba72
db4af16
e66582e
db4af16
 
5a6ba72
e66582e
 
8e70b5f
5a6ba72
5119d9b
8e70b5f
e66582e
2b7ed67
8e70b5f
084da71
e66582e
 
db4af16
5a6ba72
db4af16
e66582e
 
db4af16
 
084da71
e66582e
 
368667f
084da71
2b7ed67
e66582e
084da71
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import asyncio
import base64
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse

from bs4 import BeautifulSoup
# This import will work correctly after running `pip install --upgrade curl_cffi`
from curl_cffi.aio import AsyncSession
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from webscout.litagent import LitAgent

# --- FastAPI App Definition ---
app = FastAPI(
    title="Snapzion Enhanced Search API",
    description="An advanced FastAPI wrapper for Bing Search, featuring AI-powered summarization and metadata enrichment.",
    version="2.0.1", # Version bump
)

# --- Pydantic Models for Clearer Responses ---

class BaseSearchResult(BaseModel):
    url: str
    title: str
    description: str

class EnhancedBingSearchResult(BaseSearchResult):
    """Model for the enhanced search results with summary and metadata."""
    summary: Optional[str] = Field(None, description="AI-generated summary of the page content.")
    source: Optional[str] = Field(None, description="The domain name of the result URL.")
    favicon: Optional[str] = Field(None, description="URL of the website's favicon.")

class BingImageResult(BaseModel):
    title: str
    image: str
    thumbnail: str
    url: str
    source: str

class BingNewsResult(BaseModel):
    title: str
    url: str
    description: str
    source: str = ""

# --- Enhanced BingSearch Library ---

class BingSearch:
    """
    Bing search implementation rewritten for asynchronous performance and enhanced data retrieval.
    """
    _lit_agent_instance: Optional[LitAgent] = None
    _executor = ThreadPoolExecutor(max_workers=10)

    def __init__(
        self,
        timeout: int = 10,
        proxies: Optional[Dict[str, str]] = None,
        verify: bool = True,
        lang: str = "en-US",
        sleep_interval: float = 0.0,
        impersonate: str = "chrome110"
    ):
        self.timeout = timeout
        self.proxies = proxies if proxies else {}
        self.verify = verify
        self.lang = lang
        self.sleep_interval = sleep_interval
        self._base_url = "https://www.bing.com"
        self.session = AsyncSession(
            proxies=self.proxies,
            verify=self.verify,
            timeout=self.timeout,
            impersonate=impersonate
        )
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
        })

    @classmethod
    def get_lit_agent(cls) -> LitAgent:
        """Initializes LitAgent lazily."""
        if cls._lit_agent_instance is None:
            cls._lit_agent_instance = LitAgent()
        return cls._lit_agent_instance

    async def _summarize_content(self, html_content: str) -> str:
        """Runs the synchronous summarize method in a thread pool."""
        loop = asyncio.get_running_loop()
        agent = self.get_lit_agent()
        try:
            summary = await loop.run_in_executor(
                self._executor, agent.summarize, html_content
            )
            return summary
        except Exception as e:
            print(f"Error during summarization: {e}")
            return "Could not generate summary."


    async def _enhance_result(self, result: BaseSearchResult) -> EnhancedBingSearchResult:
        """Fetches page content, generates summary, and extracts metadata."""
        enhanced_result = EnhancedBingSearchResult(**result.model_dump())
        try:
            parsed_url = urlparse(result.url)
            enhanced_result.source = parsed_url.netloc

            resp = await self.session.get(result.url, timeout=self.timeout)
            resp.raise_for_status()
            html = resp.text
            
            summary = await self._summarize_content(html)
            enhanced_result.summary = summary
            
            soup = BeautifulSoup(html, "html.parser")
            favicon_tag = soup.find("link", rel=lambda r: r and "icon" in r.lower())
            if favicon_tag and favicon_tag.get("href"):
                favicon_url = favicon_tag["href"]
                if not favicon_url.startswith(('http://', 'https://', '//')):
                    favicon_url = f"{parsed_url.scheme}://{parsed_url.netloc}{'/' if not favicon_url.startswith('/') else ''}{favicon_url}"
                elif favicon_url.startswith('//'):
                    favicon_url = f"{parsed_url.scheme}:{favicon_url}"
                enhanced_result.favicon = favicon_url
        except Exception as e:
            print(f"Failed to enhance URL {result.url}: {e}")
        return enhanced_result

    def _selectors(self, element):
        selectors = {
            'links': 'ol#b_results > li.b_algo',
            'next': 'a.sb_pagN'
        }
        return selectors.get(element, '')

    def _first_page(self, query):
        url = f'{self._base_url}/search?q={query}&search=&form=QBLH'
        return {'url': url, 'data': None}

    def _next_page(self, soup):
        selector = self._selectors('next')
        next_page_tag = soup.select_one(selector)
        if next_page_tag and next_page_tag.get('href'):
            return {'url': self._base_url + next_page_tag['href'], 'data': None}
        return {'url': None, 'data': None}

    def _get_url(self, tag):
        # A more direct approach that is often sufficient
        return tag.get('href', '')
        
    async def text(
        self, keywords: str, max_results: int = 10, enhanced: bool = False, **kwargs
    ) -> List[BaseSearchResult | EnhancedBingSearchResult]:
        if not keywords:
            raise ValueError("Search keywords cannot be empty")

        fetched_results = []
        fetched_links = set()

        async def fetch_page(url):
            try:
                resp = await self.session.get(url)
                resp.raise_for_status()
                return resp.text
            except Exception as e:
                raise Exception(f"Bing search failed: {str(e)}")

        current_url = self._first_page(keywords)['url']
        
        while current_url and len(fetched_results) < max_results:
            html = await fetch_page(current_url)
            soup = BeautifulSoup(html, "html.parser")
            
            result_blocks = soup.select(self._selectors('links'))
            
            for result in result_blocks:
                title_tag = result.find('h2')
                if not title_tag: continue
                
                link_tag = title_tag.find('a')
                if not link_tag or not link_tag.has_attr('href'): continue
                
                url_val = self._get_url(link_tag)
                title = title_tag.get_text(strip=True)

                desc_container = result.find('div', class_='b_caption')
                description = ""
                if desc_container:
                    p_tag = desc_container.find('p')
                    if p_tag:
                        description = p_tag.get_text(strip=True)

                if url_val and title:
                    if url_val in fetched_links: continue
                    fetched_results.append(BaseSearchResult(url=url_val, title=title, description=description))
                    fetched_links.add(url_val)
                    if len(fetched_results) >= max_results: break
            
            if len(fetched_results) >= max_results: break

            next_page_info = self._next_page(soup)
            current_url = next_page_info['url']
            if current_url:
                await asyncio.sleep(self.sleep_interval)

        results_to_return = fetched_results[:max_results]
        
        if enhanced and results_to_return:
            enhancement_tasks = [self._enhance_result(res) for res in results_to_return]
            return await asyncio.gather(*enhancement_tasks)
            
        return results_to_return

    async def suggestions(self, query: str, **kwargs) -> List[str]:
        if not query: raise ValueError("Query cannot be empty")
        region = kwargs.get('region', 'en-US')
        url = f"https://api.bing.com/osjson.aspx?query={query}&mkt={region}"
        resp = await self.session.get(url)
        resp.raise_for_status()
        data = resp.json()
        return data[1] if isinstance(data, list) and len(data) > 1 else []

    async def images(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingImageResult]:
        if not keywords: raise ValueError("Keywords cannot be empty")
        url = f"{self._base_url}/images/search?q={keywords}&count={max_results}"
        resp = await self.session.get(url)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")
        results = []
        for item in soup.select("a.iusc"):
            try:
                m_data = item.get("m")
                if not m_data: continue
                meta = json.loads(m_data)
                if meta.get("murl"):
                    results.append(BingImageResult(title=meta.get("t", ""), image=meta.get("murl"), thumbnail=meta.get("turl", ""), url=meta.get("purl", ""), source=meta.get("surl", "")))
                    if len(results) >= max_results: break
            except Exception: continue
        return results

    async def news(self, keywords: str, max_results: int = 10, **kwargs) -> List[BingNewsResult]:
        if not keywords: raise ValueError("Keywords cannot be empty")
        url = f"{self._base_url}/news/search?q={keywords}"
        resp = await self.session.get(url)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")
        results = []
        for item in soup.select("div.news-card"):
             a_tag = item.find("a", class_="title")
             if not (a_tag and a_tag.has_attr('href')): continue
             desc_tag = item.find("div", class_="snippet")
             source_tag = item.find(attrs={"aria-label": "Publisher"})
             results.append(BingNewsResult(title=a_tag.get_text(strip=True), url=a_tag['href'], description=desc_tag.get_text(strip=True) if desc_tag else "", source=source_tag.get_text(strip=True) if source_tag else ""))
             if len(results) >= max_results: break
        return results

bing = BingSearch()

# --- FastAPI Endpoints ---

@app.get("/search", response_model=List[EnhancedBingSearchResult | BaseSearchResult], summary="Perform a standard or enhanced text search")
async def text_search(
    query: str = Query(..., description="The search keywords."),
    max_results: int = Query(10, description="Maximum number of results to return."),
    enhanced: bool = Query(False, description="Enable AI summarization and metadata fetching (slower but more detailed).")
):
    """
    Perform a text search on Bing.
    - Set `enhanced=true` to get AI-powered summaries and additional metadata for each result.
    """
    try:
        results = await bing.text(
            keywords=query,
            max_results=max_results,
            enhanced=enhanced
        )
        return results
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/suggestions", response_model=List[str], summary="Fetch search suggestions")
async def get_suggestions(
    query: str = Query(..., description="The search query for which to fetch suggestions."),
):
    try:
        return await bing.suggestions(query=query)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/images", response_model=List[BingImageResult], summary="Search for images")
async def image_search(
    query: str = Query(..., description="The search keywords for images."),
    max_results: int = Query(10, description="Maximum number of image results to return."),
):
    try:
        return await bing.images(keywords=query, max_results=max_results)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/news", response_model=List[BingNewsResult], summary="Search for news articles")
async def news_search(
    query: str = Query(..., description="The search keywords for news."),
    max_results: int = Query(10, description="Maximum number of news results to return."),
):
    try:
        return await bing.news(keywords=query, max_results=max_results)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)