import os import re import time import math import random import asyncio import httpx import urllib.parse from bs4 import BeautifulSoup # Use modern ddgs package (v9+) — the old duckduckgo_search is deprecated and returns 0 results try: from ddgs import DDGS except ImportError: DDGS = None class ResearchAgent: """Production-Grade Real-Time AI Search Orchestrator for AURA AI. Primary: DuckDuckGo (via ddgs v9+), DDG HTML scraper, SearXNG, Google scraper. Implements: Async retrieval, retries, chunking, and TF-IDF semantic reranking. """ def __init__(self): # Read API Keys from environment self.tavily_key = os.getenv("TAVILY_API_KEY") self.exa_key = os.getenv("EXA_API_KEY") self.serper_key = os.getenv("SERPER_API_KEY") self.brave_key = os.getenv("BRAVE_API_KEY") self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ] # ========================================== # 1. SYNCHRONOUS RETRIEVAL (Legacy Support) # ========================================== def search_live(self, query: str, max_results: int = 5) -> str: """Ultra-resilient synchronous search using DuckDuckGo, static HTML fallback, and scraper fallback to ensure 24/7 service availability. """ print(f"NEURAL RESEARCH GATEWAY (ONLY DDG): Orchestrating search query: '{query}'") results = [] clean_query = query.replace('"', '').strip() # 1. DuckDuckGo Primary API results = self._search_ddg_sync(clean_query, max_results) # 2. DuckDuckGo Static HTML Scraper Fallback (Traffic & rate-limit immune) if not results: results = self._search_ddg_html_sync(clean_query, max_results) # 3. Google Scraper Fallback if not results: results = self._search_google_scrape_sync(clean_query, max_results) if not results: return "NO_DATA: Live search gateways are currently congested. Relying on neural knowledge base." # Apply Advanced Context Chunking & TF-IDF Reranking compressed_context = self.chunk_and_rerank(query, results, max_results=max_results) return compressed_context # ========================================== # 2. ASYNCHRONOUS RETRIEVAL (Modern Pipeline) # ========================================== async def search_live_async(self, query: str, max_results: int = 5) -> str: """Fully non-blocking asynchronous search with multi-provider cascade.""" print(f"NEURAL RESEARCH GATEWAY (ASYNC): Scanning network for: '{query}'") clean_query = query.replace('"', '').strip() results = [] # 1. Tavily (highest quality, if key available) if not results and self.tavily_key: results = await self._search_tavily_async(clean_query, max_results) # 2. DuckDuckGo API if not results: results = await self._search_ddg_async(clean_query, max_results) # 3. DuckDuckGo HTML Scraper Fallback if not results: results = await self._search_ddg_html_async(clean_query, max_results) # 4. SearXNG Public Instance Fallback (works from cloud servers) if not results: results = await self._search_searxng_async(clean_query, max_results) # 5. Google Scrape Fallback if not results: results = await self._search_google_scrape_async(clean_query, max_results) if not results: return "NO_DATA: All search gateways exhausted. Relying on neural knowledge base." # Rerank and Compress return self.chunk_and_rerank(query, results, max_results=max_results) # ========================================== # 3. CHUNKING & TF-IDF RERANKING # ========================================== def chunk_and_rerank(self, query: str, results: list, max_results: int = 5) -> str: """Splits search snippets into clean, overlapping passages, performs TF-IDF relevance scoring against user query, deduplicates content, and returns a high-density consolidated context. """ chunks = [] seen_urls = {} # 1. Basic Chunking: split snippets/bodies into 400-char chunks with overlap for r in results: url = r.get('href', '#') title = r.get('title', 'Web Source') body = r.get('body', '') if not body: continue # Store title mapping for referencing seen_urls[url] = title # Simple sentence/window chunking words = body.split() chunk_size = 60 overlap = 15 for i in range(0, len(words), chunk_size - overlap): chunk_words = words[i:i + chunk_size] if len(chunk_words) < 10: continue # Skip tiny fragments chunk_text = " ".join(chunk_words) chunks.append({ 'text': chunk_text, 'url': url, 'title': title }) if not chunks: # Fallback if chunking produced nothing formatted = [] seen = set() for r in results[:max_results]: if r['href'] not in seen: formatted.append(f"Source: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}\n") seen.add(r['href']) return "\n".join(formatted) # 2. Pure-Python TF-IDF Relevance Scoring def tokenize(text): return re.findall(r'\w+', text.lower()) query_tokens = set(tokenize(query)) if not query_tokens: # Sort by search priority if query is untokenizable chunks = chunks[:max_results] else: # Calculate IDF for terms doc_tokens_list = [tokenize(c['text']) for c in chunks] idf = {} N = len(chunks) for token in query_tokens: df = sum(1 for doc_tokens in doc_tokens_list if token in doc_tokens) # Smooth IDF calculation idf[token] = math.log((N - df + 0.5) / (df + 0.5) + 1.0) # Score chunks for i, c in enumerate(chunks): doc_tokens = doc_tokens_list[i] if not doc_tokens: c['score'] = 0.0 continue score = 0.0 doc_len = len(doc_tokens) for token in query_tokens: tf = doc_tokens.count(token) / doc_len score += tf * idf.get(token, 0.0) c['score'] = score # Sort by TF-IDF relevance score descending chunks.sort(key=lambda x: x.get('score', 0.0), reverse=True) # 3. Deduplication & Consolidation (Select top compressed chunks) formatted_passages = [] selected_urls = set() for c in chunks[:8]: # Grab top 8 relevant chunks max to avoid prompt overflow text = c['text'] url = c['url'] title = c['title'] # Simple content overlap check duplicate = False for p in formatted_passages: # If 60% of words in this chunk are already present in another selected chunk, skip it common = set(tokenize(text)) & set(tokenize(p)) if len(common) > len(tokenize(text)) * 0.6: duplicate = True break if not duplicate: formatted_passages.append(f"Source: {title}\nURL: {url}\nPassage: {text}\n") selected_urls.add(url) return "\n".join(formatted_passages) # ========================================== # 4. SYNCHRONOUS PROVIDER INTEGRATIONS # ========================================== def _search_tavily_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [TAVILY-SYNC]: Initiating...") r = httpx.post( "https://api.tavily.com/search", json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results}, timeout=8.0 ) if r.status_code == 200: return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])] except Exception as e: print(f"Tavily Sync Fail: {e}") return [] def _search_exa_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [EXA-SYNC]: Initiating...") headers = {"x-api-key": self.exa_key, "content-type": "application/json"} r = httpx.post("https://api.exa.ai/search", json={"query": query, "numResults": max_results, "useAutoprompt": True}, headers=headers, timeout=8.0) if r.status_code == 200: return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])] except Exception as e: print(f"Exa Sync Fail: {e}") return [] def _search_serper_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [SERPER-SYNC]: Initiating...") headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"} r = httpx.post("https://google.serper.dev/search", json={"q": query, "num": max_results}, headers=headers, timeout=8.0) if r.status_code == 200: return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])] except Exception as e: print(f"Serper Sync Fail: {e}") return [] def _search_brave_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [BRAVE-SYNC]: Initiating...") headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key} r = httpx.get(f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}", headers=headers, timeout=8.0) if r.status_code == 200: return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])] except Exception as e: print(f"Brave Sync Fail: {e}") return [] def _search_ddg_sync(self, query: str, max_results: int) -> list: if DDGS is None: print("NEURAL RESEARCH [DDG-SYNC]: DDGS library not available. Install with: pip install ddgs") return [] # Retry up to 2 times with fresh DDGS instance for attempt in range(2): try: print(f"NEURAL RESEARCH [DDG-SYNC]: Attempt {attempt + 1}...") ddgs = DDGS() raw_results = ddgs.text(query, max_results=max_results) if raw_results and isinstance(raw_results, list): results = [] for r in raw_results: results.append({ 'title': r.get('title', 'DDG Source'), 'body': r.get('body', ''), 'href': r.get('href', '#') }) if results: print(f"DDG-SYNC Success: {len(results)} results") return results except Exception as e: print(f"DDG Sync Attempt {attempt + 1} Fail: {e}") time.sleep(0.5) return [] def _search_ddg_html_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [DDG-HTML-SCRAPE]: Initiating...") headers = { "User-Agent": random.choice(self.user_agents), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5" } url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" r = httpx.get(url, headers=headers, timeout=6.0, follow_redirects=True) if r.status_code == 200: soup = BeautifulSoup(r.text, 'html.parser') results = [] for res_div in soup.find_all('div', class_='result')[:max_results]: title_a = res_div.find('a', class_='result__url') snippet_a = res_div.find('a', class_='result__snippet') if title_a: title = title_a.text.strip() href = title_a['href'] # Decode DDG redirection URL if necessary if href.startswith('//duckduckgo.com/y.js'): parsed = urllib.parse.urlparse(href) params = urllib.parse.parse_qs(parsed.query) if 'uddg' in params: href = params['uddg'][0] else: href = "https:" + href elif href.startswith('/'): href = "https://duckduckgo.com" + href body = snippet_a.text.strip() if snippet_a else "" results.append({'title': title, 'body': body, 'href': href}) print(f"DDG-HTML-SCRAPE Success: Found {len(results)} results.") return results except Exception as e: print(f"DDG HTML Scrape Fail: {e}") return [] def _search_google_scrape_sync(self, query: str, max_results: int) -> list: try: print("NEURAL RESEARCH [SCRAPE-SYNC]: Initiating...") headers = {"User-Agent": random.choice(self.user_agents)} google_url = f"https://www.google.com/search?q={urllib.parse.quote(query)}" r = httpx.get(google_url, headers=headers, timeout=5.0) if r.status_code == 200: soup = BeautifulSoup(r.text, 'html.parser') results = [] for g in soup.find_all('div', class_='g')[:max_results]: anchors = g.find_all('a') if anchors: results.append({ 'title': g.find('h3').text if g.find('h3') else 'Google Source', 'body': g.find('div', class_='VwiC3b').text if g.find('div', class_='VwiC3b') else '', 'href': anchors[0]['href'] }) return results except Exception as e: print(f"Google Scrape Sync Fail: {e}") return [] # ========================================== # 5. ASYNCHRONOUS PROVIDER INTEGRATIONS # ========================================== async def _search_tavily_async(self, query: str, max_results: int) -> list: try: async with httpx.AsyncClient() as client: r = await client.post( "https://api.tavily.com/search", json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results}, timeout=8.0 ) if r.status_code == 200: return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])] except Exception as e: print(f"Tavily Async Fail: {e}") return [] async def _search_exa_async(self, query: str, max_results: int) -> list: try: headers = {"x-api-key": self.exa_key, "content-type": "application/json"} async with httpx.AsyncClient() as client: r = await client.post( "https://api.exa.ai/search", json={"query": query, "numResults": max_results, "useAutoprompt": True}, headers=headers, timeout=8.0 ) if r.status_code == 200: return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])] except Exception as e: print(f"Exa Async Fail: {e}") return [] async def _search_serper_async(self, query: str, max_results: int) -> list: try: headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"} async with httpx.AsyncClient() as client: r = await client.post( "https://google.serper.dev/search", json={"q": query, "num": max_results}, headers=headers, timeout=8.0 ) if r.status_code == 200: return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])] except Exception as e: print(f"Serper Async Fail: {e}") return [] async def _search_brave_async(self, query: str, max_results: int) -> list: try: headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key} async with httpx.AsyncClient() as client: r = await client.get( f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}", headers=headers, timeout=8.0 ) if r.status_code == 200: return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])] except Exception as e: print(f"Brave Async Fail: {e}") return [] async def _search_ddg_async(self, query: str, max_results: int) -> list: # Since duckduckgo_search library performs network calls inside context managers, # we can offload it safely to a worker thread using asyncio.to_thread to prevent blocking the event loop. try: return await asyncio.to_thread(self._search_ddg_sync, query, max_results) except Exception as e: print(f"DDG Async Offload Fail: {e}") return [] async def _search_ddg_html_async(self, query: str, max_results: int) -> list: try: return await asyncio.to_thread(self._search_ddg_html_sync, query, max_results) except Exception as e: print(f"DDG HTML Async Offload Fail: {e}") return [] async def _search_google_scrape_async(self, query: str, max_results: int) -> list: try: return await asyncio.to_thread(self._search_google_scrape_sync, query, max_results) except Exception as e: print(f"Google Scrape Async Offload Fail: {e}") return [] async def _search_searxng_async(self, query: str, max_results: int) -> list: """Search via public SearXNG instances - works well from cloud servers.""" instances = [ "https://search.sapti.me", "https://searx.tiekoetter.com", "https://search.bus-hit.me", "https://searx.be", ] for base_url in instances: try: print(f"NEURAL RESEARCH [SEARXNG]: Trying {base_url}...") async with httpx.AsyncClient() as client: r = await client.get( f"{base_url}/search", params={"q": query, "format": "json", "categories": "general", "language": "en"}, headers={"User-Agent": random.choice(self.user_agents)}, timeout=6.0, follow_redirects=True ) if r.status_code == 200: data = r.json() results = [] for x in data.get('results', [])[:max_results]: results.append({ 'title': x.get('title', 'SearXNG Source'), 'body': x.get('content', ''), 'href': x.get('url', '#') }) if results: print(f"SEARXNG Success from {base_url}: {len(results)} results") return results except Exception as e: print(f"SearXNG Fail ({base_url}): {e}") continue return [] def get_cricket_scores(self) -> str: """Fast helper for live cricket/IPL updates.""" return self.search_live("IPL match live score points table", max_results=3)