File size: 21,825 Bytes
c8cd75e
 
 
 
94aa23a
c8cd75e
 
94aa23a
 
11e0a6b
 
4e5c781
 
 
11e0a6b
b97a17b
 
c8cd75e
 
11e0a6b
 
c8cd75e
b97a17b
 
c8cd75e
da080f5
c8cd75e
 
 
 
94aa23a
c8cd75e
 
 
94aa23a
e0596d4
c8cd75e
 
 
 
 
 
 
 
da080f5
c8cd75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0596d4
c8cd75e
 
 
da080f5
c8cd75e
 
 
 
4e5c781
 
c8cd75e
4e5c781
c8cd75e
4e5c781
 
 
 
 
 
 
c8cd75e
4e5c781
da080f5
c8cd75e
4e5c781
 
 
 
c8cd75e
4e5c781
94aa23a
c8cd75e
 
94aa23a
4e5c781
94aa23a
c8cd75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94aa23a
c8cd75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94aa23a
c8cd75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e5c781
11e0a6b
4e5c781
11e0a6b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8cd75e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51da85f
4e5c781
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c8cd75e
e0596d4
da080f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
import os
import re
import time
import math
import random
import asyncio
import httpx
import urllib.parse
from bs4 import BeautifulSoup

# Use modern ddgs package (v9+) — the old duckduckgo_search is deprecated and returns 0 results
try:
    from ddgs import DDGS
except ImportError:
    DDGS = None

class ResearchAgent:
    """Production-Grade Real-Time AI Search Orchestrator for AURA AI.
    
    Primary: DuckDuckGo (via ddgs v9+), DDG HTML scraper, SearXNG, Google scraper.
    Implements: Async retrieval, retries, chunking, and TF-IDF semantic reranking.
    """
    
    def __init__(self):
        # Read API Keys from environment
        self.tavily_key = os.getenv("TAVILY_API_KEY")
        self.exa_key = os.getenv("EXA_API_KEY")
        self.serper_key = os.getenv("SERPER_API_KEY")
        self.brave_key = os.getenv("BRAVE_API_KEY")
        
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
        ]

    # ==========================================
    # 1. SYNCHRONOUS RETRIEVAL (Legacy Support)
    # ==========================================
    def search_live(self, query: str, max_results: int = 5) -> str:
        """Ultra-resilient synchronous search using DuckDuckGo, static HTML fallback, and scraper fallback to ensure 24/7 service availability.
        """
        print(f"NEURAL RESEARCH GATEWAY (ONLY DDG): Orchestrating search query: '{query}'")
        
        results = []
        clean_query = query.replace('"', '').strip()
        
        # 1. DuckDuckGo Primary API
        results = self._search_ddg_sync(clean_query, max_results)
            
        # 2. DuckDuckGo Static HTML Scraper Fallback (Traffic & rate-limit immune)
        if not results:
            results = self._search_ddg_html_sync(clean_query, max_results)
            
        # 3. Google Scraper Fallback
        if not results:
            results = self._search_google_scrape_sync(clean_query, max_results)
            
        if not results:
            return "NO_DATA: Live search gateways are currently congested. Relying on neural knowledge base."

        # Apply Advanced Context Chunking & TF-IDF Reranking
        compressed_context = self.chunk_and_rerank(query, results, max_results=max_results)
        return compressed_context

    # ==========================================
    # 2. ASYNCHRONOUS RETRIEVAL (Modern Pipeline)
    # ==========================================
    async def search_live_async(self, query: str, max_results: int = 5) -> str:
        """Fully non-blocking asynchronous search with multi-provider cascade."""
        print(f"NEURAL RESEARCH GATEWAY (ASYNC): Scanning network for: '{query}'")
        clean_query = query.replace('"', '').strip()
        results = []
        
        # 1. Tavily (highest quality, if key available)
        if not results and self.tavily_key:
            results = await self._search_tavily_async(clean_query, max_results)
        
        # 2. DuckDuckGo API
        if not results:
            results = await self._search_ddg_async(clean_query, max_results)
            
        # 3. DuckDuckGo HTML Scraper Fallback
        if not results:
            results = await self._search_ddg_html_async(clean_query, max_results)

        # 4. SearXNG Public Instance Fallback (works from cloud servers)
        if not results:
            results = await self._search_searxng_async(clean_query, max_results)
            
        # 5. Google Scrape Fallback
        if not results:
            results = await self._search_google_scrape_async(clean_query, max_results)
            
        if not results:
            return "NO_DATA: All search gateways exhausted. Relying on neural knowledge base."

        # Rerank and Compress
        return self.chunk_and_rerank(query, results, max_results=max_results)

    # ==========================================
    # 3. CHUNKING & TF-IDF RERANKING
    # ==========================================
    def chunk_and_rerank(self, query: str, results: list, max_results: int = 5) -> str:
        """Splits search snippets into clean, overlapping passages,
        performs TF-IDF relevance scoring against user query,
        deduplicates content, and returns a high-density consolidated context.
        """
        chunks = []
        seen_urls = {}
        
        # 1. Basic Chunking: split snippets/bodies into 400-char chunks with overlap
        for r in results:
            url = r.get('href', '#')
            title = r.get('title', 'Web Source')
            body = r.get('body', '')
            
            if not body: continue
            
            # Store title mapping for referencing
            seen_urls[url] = title
            
            # Simple sentence/window chunking
            words = body.split()
            chunk_size = 60
            overlap = 15
            
            for i in range(0, len(words), chunk_size - overlap):
                chunk_words = words[i:i + chunk_size]
                if len(chunk_words) < 10: continue # Skip tiny fragments
                chunk_text = " ".join(chunk_words)
                chunks.append({
                    'text': chunk_text,
                    'url': url,
                    'title': title
                })
                
        if not chunks:
            # Fallback if chunking produced nothing
            formatted = []
            seen = set()
            for r in results[:max_results]:
                if r['href'] not in seen:
                    formatted.append(f"Source: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}\n")
                    seen.add(r['href'])
            return "\n".join(formatted)

        # 2. Pure-Python TF-IDF Relevance Scoring
        def tokenize(text):
            return re.findall(r'\w+', text.lower())

        query_tokens = set(tokenize(query))
        if not query_tokens:
            # Sort by search priority if query is untokenizable
            chunks = chunks[:max_results]
        else:
            # Calculate IDF for terms
            doc_tokens_list = [tokenize(c['text']) for c in chunks]
            idf = {}
            N = len(chunks)
            for token in query_tokens:
                df = sum(1 for doc_tokens in doc_tokens_list if token in doc_tokens)
                # Smooth IDF calculation
                idf[token] = math.log((N - df + 0.5) / (df + 0.5) + 1.0)
                
            # Score chunks
            for i, c in enumerate(chunks):
                doc_tokens = doc_tokens_list[i]
                if not doc_tokens:
                    c['score'] = 0.0
                    continue
                score = 0.0
                doc_len = len(doc_tokens)
                for token in query_tokens:
                    tf = doc_tokens.count(token) / doc_len
                    score += tf * idf.get(token, 0.0)
                c['score'] = score
                
            # Sort by TF-IDF relevance score descending
            chunks.sort(key=lambda x: x.get('score', 0.0), reverse=True)

        # 3. Deduplication & Consolidation (Select top compressed chunks)
        formatted_passages = []
        selected_urls = set()
        
        for c in chunks[:8]: # Grab top 8 relevant chunks max to avoid prompt overflow
            text = c['text']
            url = c['url']
            title = c['title']
            
            # Simple content overlap check
            duplicate = False
            for p in formatted_passages:
                # If 60% of words in this chunk are already present in another selected chunk, skip it
                common = set(tokenize(text)) & set(tokenize(p))
                if len(common) > len(tokenize(text)) * 0.6:
                    duplicate = True
                    break
            
            if not duplicate:
                formatted_passages.append(f"Source: {title}\nURL: {url}\nPassage: {text}\n")
                selected_urls.add(url)
                
        return "\n".join(formatted_passages)

    # ==========================================
    # 4. SYNCHRONOUS PROVIDER INTEGRATIONS
    # ==========================================
    def _search_tavily_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [TAVILY-SYNC]: Initiating...")
            r = httpx.post(
                "https://api.tavily.com/search",
                json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results},
                timeout=8.0
            )
            if r.status_code == 200:
                return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])]
        except Exception as e:
            print(f"Tavily Sync Fail: {e}")
        return []

    def _search_exa_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [EXA-SYNC]: Initiating...")
            headers = {"x-api-key": self.exa_key, "content-type": "application/json"}
            r = httpx.post("https://api.exa.ai/search", json={"query": query, "numResults": max_results, "useAutoprompt": True}, headers=headers, timeout=8.0)
            if r.status_code == 200:
                return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])]
        except Exception as e:
            print(f"Exa Sync Fail: {e}")
        return []

    def _search_serper_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [SERPER-SYNC]: Initiating...")
            headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"}
            r = httpx.post("https://google.serper.dev/search", json={"q": query, "num": max_results}, headers=headers, timeout=8.0)
            if r.status_code == 200:
                return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])]
        except Exception as e:
            print(f"Serper Sync Fail: {e}")
        return []

    def _search_brave_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [BRAVE-SYNC]: Initiating...")
            headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key}
            r = httpx.get(f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}", headers=headers, timeout=8.0)
            if r.status_code == 200:
                return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])]
        except Exception as e:
            print(f"Brave Sync Fail: {e}")
        return []

    def _search_ddg_sync(self, query: str, max_results: int) -> list:
        if DDGS is None:
            print("NEURAL RESEARCH [DDG-SYNC]: DDGS library not available. Install with: pip install ddgs")
            return []
        # Retry up to 2 times with fresh DDGS instance
        for attempt in range(2):
            try:
                print(f"NEURAL RESEARCH [DDG-SYNC]: Attempt {attempt + 1}...")
                ddgs = DDGS()
                raw_results = ddgs.text(query, max_results=max_results)
                if raw_results and isinstance(raw_results, list):
                    results = []
                    for r in raw_results:
                        results.append({
                            'title': r.get('title', 'DDG Source'),
                            'body': r.get('body', ''),
                            'href': r.get('href', '#')
                        })
                    if results:
                        print(f"DDG-SYNC Success: {len(results)} results")
                        return results
            except Exception as e:
                print(f"DDG Sync Attempt {attempt + 1} Fail: {e}")
                time.sleep(0.5)
        return []

    def _search_ddg_html_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [DDG-HTML-SCRAPE]: Initiating...")
            headers = {
                "User-Agent": random.choice(self.user_agents),
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
                "Accept-Language": "en-US,en;q=0.5"
            }
            url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}"
            r = httpx.get(url, headers=headers, timeout=6.0, follow_redirects=True)
            if r.status_code == 200:
                soup = BeautifulSoup(r.text, 'html.parser')
                results = []
                for res_div in soup.find_all('div', class_='result')[:max_results]:
                    title_a = res_div.find('a', class_='result__url')
                    snippet_a = res_div.find('a', class_='result__snippet')
                    if title_a:
                        title = title_a.text.strip()
                        href = title_a['href']
                        # Decode DDG redirection URL if necessary
                        if href.startswith('//duckduckgo.com/y.js'):
                            parsed = urllib.parse.urlparse(href)
                            params = urllib.parse.parse_qs(parsed.query)
                            if 'uddg' in params:
                                href = params['uddg'][0]
                            else:
                                href = "https:" + href
                        elif href.startswith('/'):
                            href = "https://duckduckgo.com" + href
                        
                        body = snippet_a.text.strip() if snippet_a else ""
                        results.append({'title': title, 'body': body, 'href': href})
                print(f"DDG-HTML-SCRAPE Success: Found {len(results)} results.")
                return results
        except Exception as e:
            print(f"DDG HTML Scrape Fail: {e}")
        return []

    def _search_google_scrape_sync(self, query: str, max_results: int) -> list:
        try:
            print("NEURAL RESEARCH [SCRAPE-SYNC]: Initiating...")
            headers = {"User-Agent": random.choice(self.user_agents)}
            google_url = f"https://www.google.com/search?q={urllib.parse.quote(query)}"
            r = httpx.get(google_url, headers=headers, timeout=5.0)
            if r.status_code == 200:
                soup = BeautifulSoup(r.text, 'html.parser')
                results = []
                for g in soup.find_all('div', class_='g')[:max_results]:
                    anchors = g.find_all('a')
                    if anchors:
                        results.append({
                            'title': g.find('h3').text if g.find('h3') else 'Google Source',
                            'body': g.find('div', class_='VwiC3b').text if g.find('div', class_='VwiC3b') else '',
                            'href': anchors[0]['href']
                        })
                return results
        except Exception as e:
            print(f"Google Scrape Sync Fail: {e}")
        return []

    # ==========================================
    # 5. ASYNCHRONOUS PROVIDER INTEGRATIONS
    # ==========================================
    async def _search_tavily_async(self, query: str, max_results: int) -> list:
        try:
            async with httpx.AsyncClient() as client:
                r = await client.post(
                    "https://api.tavily.com/search",
                    json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results},
                    timeout=8.0
                )
                if r.status_code == 200:
                    return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])]
        except Exception as e:
            print(f"Tavily Async Fail: {e}")
        return []

    async def _search_exa_async(self, query: str, max_results: int) -> list:
        try:
            headers = {"x-api-key": self.exa_key, "content-type": "application/json"}
            async with httpx.AsyncClient() as client:
                r = await client.post(
                    "https://api.exa.ai/search",
                    json={"query": query, "numResults": max_results, "useAutoprompt": True},
                    headers=headers,
                    timeout=8.0
                )
                if r.status_code == 200:
                    return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])]
        except Exception as e:
            print(f"Exa Async Fail: {e}")
        return []

    async def _search_serper_async(self, query: str, max_results: int) -> list:
        try:
            headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"}
            async with httpx.AsyncClient() as client:
                r = await client.post(
                    "https://google.serper.dev/search",
                    json={"q": query, "num": max_results},
                    headers=headers,
                    timeout=8.0
                )
                if r.status_code == 200:
                    return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])]
        except Exception as e:
            print(f"Serper Async Fail: {e}")
        return []

    async def _search_brave_async(self, query: str, max_results: int) -> list:
        try:
            headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key}
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}",
                    headers=headers,
                    timeout=8.0
                )
                if r.status_code == 200:
                    return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])]
        except Exception as e:
            print(f"Brave Async Fail: {e}")
        return []

    async def _search_ddg_async(self, query: str, max_results: int) -> list:
        # Since duckduckgo_search library performs network calls inside context managers, 
        # we can offload it safely to a worker thread using asyncio.to_thread to prevent blocking the event loop.
        try:
            return await asyncio.to_thread(self._search_ddg_sync, query, max_results)
        except Exception as e:
            print(f"DDG Async Offload Fail: {e}")
        return []

    async def _search_ddg_html_async(self, query: str, max_results: int) -> list:
        try:
            return await asyncio.to_thread(self._search_ddg_html_sync, query, max_results)
        except Exception as e:
            print(f"DDG HTML Async Offload Fail: {e}")
        return []

    async def _search_google_scrape_async(self, query: str, max_results: int) -> list:
        try:
            return await asyncio.to_thread(self._search_google_scrape_sync, query, max_results)
        except Exception as e:
            print(f"Google Scrape Async Offload Fail: {e}")
        return []

    async def _search_searxng_async(self, query: str, max_results: int) -> list:
        """Search via public SearXNG instances - works well from cloud servers."""
        instances = [
            "https://search.sapti.me",
            "https://searx.tiekoetter.com",
            "https://search.bus-hit.me",
            "https://searx.be",
        ]
        for base_url in instances:
            try:
                print(f"NEURAL RESEARCH [SEARXNG]: Trying {base_url}...")
                async with httpx.AsyncClient() as client:
                    r = await client.get(
                        f"{base_url}/search",
                        params={"q": query, "format": "json", "categories": "general", "language": "en"},
                        headers={"User-Agent": random.choice(self.user_agents)},
                        timeout=6.0,
                        follow_redirects=True
                    )
                    if r.status_code == 200:
                        data = r.json()
                        results = []
                        for x in data.get('results', [])[:max_results]:
                            results.append({
                                'title': x.get('title', 'SearXNG Source'),
                                'body': x.get('content', ''),
                                'href': x.get('url', '#')
                            })
                        if results:
                            print(f"SEARXNG Success from {base_url}: {len(results)} results")
                            return results
            except Exception as e:
                print(f"SearXNG Fail ({base_url}): {e}")
                continue
        return []

    def get_cricket_scores(self) -> str:
        """Fast helper for live cricket/IPL updates."""
        return self.search_live("IPL match live score points table", max_results=3)