Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import math | |
| import random | |
| import asyncio | |
| import httpx | |
| import urllib.parse | |
| from bs4 import BeautifulSoup | |
| # Use modern ddgs package (v9+) — the old duckduckgo_search is deprecated and returns 0 results | |
| try: | |
| from ddgs import DDGS | |
| except ImportError: | |
| DDGS = None | |
| class ResearchAgent: | |
| """Production-Grade Real-Time AI Search Orchestrator for AURA AI. | |
| Primary: DuckDuckGo (via ddgs v9+), DDG HTML scraper, SearXNG, Google scraper. | |
| Implements: Async retrieval, retries, chunking, and TF-IDF semantic reranking. | |
| """ | |
| def __init__(self): | |
| # Read API Keys from environment | |
| self.tavily_key = os.getenv("TAVILY_API_KEY") | |
| self.exa_key = os.getenv("EXA_API_KEY") | |
| self.serper_key = os.getenv("SERPER_API_KEY") | |
| self.brave_key = os.getenv("BRAVE_API_KEY") | |
| self.user_agents = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" | |
| ] | |
| # ========================================== | |
| # 1. SYNCHRONOUS RETRIEVAL (Legacy Support) | |
| # ========================================== | |
| def search_live(self, query: str, max_results: int = 5) -> str: | |
| """Ultra-resilient synchronous search using DuckDuckGo, static HTML fallback, and scraper fallback to ensure 24/7 service availability. | |
| """ | |
| print(f"NEURAL RESEARCH GATEWAY (ONLY DDG): Orchestrating search query: '{query}'") | |
| results = [] | |
| clean_query = query.replace('"', '').strip() | |
| # 1. DuckDuckGo Primary API | |
| results = self._search_ddg_sync(clean_query, max_results) | |
| # 2. DuckDuckGo Static HTML Scraper Fallback (Traffic & rate-limit immune) | |
| if not results: | |
| results = self._search_ddg_html_sync(clean_query, max_results) | |
| # 3. Google Scraper Fallback | |
| if not results: | |
| results = self._search_google_scrape_sync(clean_query, max_results) | |
| if not results: | |
| return "NO_DATA: Live search gateways are currently congested. Relying on neural knowledge base." | |
| # Apply Advanced Context Chunking & TF-IDF Reranking | |
| compressed_context = self.chunk_and_rerank(query, results, max_results=max_results) | |
| return compressed_context | |
| # ========================================== | |
| # 2. ASYNCHRONOUS RETRIEVAL (Modern Pipeline) | |
| # ========================================== | |
| async def search_live_async(self, query: str, max_results: int = 5) -> str: | |
| """Fully non-blocking asynchronous search with multi-provider cascade.""" | |
| print(f"NEURAL RESEARCH GATEWAY (ASYNC): Scanning network for: '{query}'") | |
| clean_query = query.replace('"', '').strip() | |
| results = [] | |
| # 1. Tavily (highest quality, if key available) | |
| if not results and self.tavily_key: | |
| results = await self._search_tavily_async(clean_query, max_results) | |
| # 2. DuckDuckGo API | |
| if not results: | |
| results = await self._search_ddg_async(clean_query, max_results) | |
| # 3. DuckDuckGo HTML Scraper Fallback | |
| if not results: | |
| results = await self._search_ddg_html_async(clean_query, max_results) | |
| # 4. SearXNG Public Instance Fallback (works from cloud servers) | |
| if not results: | |
| results = await self._search_searxng_async(clean_query, max_results) | |
| # 5. Google Scrape Fallback | |
| if not results: | |
| results = await self._search_google_scrape_async(clean_query, max_results) | |
| if not results: | |
| return "NO_DATA: All search gateways exhausted. Relying on neural knowledge base." | |
| # Rerank and Compress | |
| return self.chunk_and_rerank(query, results, max_results=max_results) | |
| # ========================================== | |
| # 3. CHUNKING & TF-IDF RERANKING | |
| # ========================================== | |
| def chunk_and_rerank(self, query: str, results: list, max_results: int = 5) -> str: | |
| """Splits search snippets into clean, overlapping passages, | |
| performs TF-IDF relevance scoring against user query, | |
| deduplicates content, and returns a high-density consolidated context. | |
| """ | |
| chunks = [] | |
| seen_urls = {} | |
| # 1. Basic Chunking: split snippets/bodies into 400-char chunks with overlap | |
| for r in results: | |
| url = r.get('href', '#') | |
| title = r.get('title', 'Web Source') | |
| body = r.get('body', '') | |
| if not body: continue | |
| # Store title mapping for referencing | |
| seen_urls[url] = title | |
| # Simple sentence/window chunking | |
| words = body.split() | |
| chunk_size = 60 | |
| overlap = 15 | |
| for i in range(0, len(words), chunk_size - overlap): | |
| chunk_words = words[i:i + chunk_size] | |
| if len(chunk_words) < 10: continue # Skip tiny fragments | |
| chunk_text = " ".join(chunk_words) | |
| chunks.append({ | |
| 'text': chunk_text, | |
| 'url': url, | |
| 'title': title | |
| }) | |
| if not chunks: | |
| # Fallback if chunking produced nothing | |
| formatted = [] | |
| seen = set() | |
| for r in results[:max_results]: | |
| if r['href'] not in seen: | |
| formatted.append(f"Source: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}\n") | |
| seen.add(r['href']) | |
| return "\n".join(formatted) | |
| # 2. Pure-Python TF-IDF Relevance Scoring | |
| def tokenize(text): | |
| return re.findall(r'\w+', text.lower()) | |
| query_tokens = set(tokenize(query)) | |
| if not query_tokens: | |
| # Sort by search priority if query is untokenizable | |
| chunks = chunks[:max_results] | |
| else: | |
| # Calculate IDF for terms | |
| doc_tokens_list = [tokenize(c['text']) for c in chunks] | |
| idf = {} | |
| N = len(chunks) | |
| for token in query_tokens: | |
| df = sum(1 for doc_tokens in doc_tokens_list if token in doc_tokens) | |
| # Smooth IDF calculation | |
| idf[token] = math.log((N - df + 0.5) / (df + 0.5) + 1.0) | |
| # Score chunks | |
| for i, c in enumerate(chunks): | |
| doc_tokens = doc_tokens_list[i] | |
| if not doc_tokens: | |
| c['score'] = 0.0 | |
| continue | |
| score = 0.0 | |
| doc_len = len(doc_tokens) | |
| for token in query_tokens: | |
| tf = doc_tokens.count(token) / doc_len | |
| score += tf * idf.get(token, 0.0) | |
| c['score'] = score | |
| # Sort by TF-IDF relevance score descending | |
| chunks.sort(key=lambda x: x.get('score', 0.0), reverse=True) | |
| # 3. Deduplication & Consolidation (Select top compressed chunks) | |
| formatted_passages = [] | |
| selected_urls = set() | |
| for c in chunks[:8]: # Grab top 8 relevant chunks max to avoid prompt overflow | |
| text = c['text'] | |
| url = c['url'] | |
| title = c['title'] | |
| # Simple content overlap check | |
| duplicate = False | |
| for p in formatted_passages: | |
| # If 60% of words in this chunk are already present in another selected chunk, skip it | |
| common = set(tokenize(text)) & set(tokenize(p)) | |
| if len(common) > len(tokenize(text)) * 0.6: | |
| duplicate = True | |
| break | |
| if not duplicate: | |
| formatted_passages.append(f"Source: {title}\nURL: {url}\nPassage: {text}\n") | |
| selected_urls.add(url) | |
| return "\n".join(formatted_passages) | |
| # ========================================== | |
| # 4. SYNCHRONOUS PROVIDER INTEGRATIONS | |
| # ========================================== | |
| def _search_tavily_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [TAVILY-SYNC]: Initiating...") | |
| r = httpx.post( | |
| "https://api.tavily.com/search", | |
| json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results}, | |
| timeout=8.0 | |
| ) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])] | |
| except Exception as e: | |
| print(f"Tavily Sync Fail: {e}") | |
| return [] | |
| def _search_exa_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [EXA-SYNC]: Initiating...") | |
| headers = {"x-api-key": self.exa_key, "content-type": "application/json"} | |
| r = httpx.post("https://api.exa.ai/search", json={"query": query, "numResults": max_results, "useAutoprompt": True}, headers=headers, timeout=8.0) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])] | |
| except Exception as e: | |
| print(f"Exa Sync Fail: {e}") | |
| return [] | |
| def _search_serper_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [SERPER-SYNC]: Initiating...") | |
| headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"} | |
| r = httpx.post("https://google.serper.dev/search", json={"q": query, "num": max_results}, headers=headers, timeout=8.0) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])] | |
| except Exception as e: | |
| print(f"Serper Sync Fail: {e}") | |
| return [] | |
| def _search_brave_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [BRAVE-SYNC]: Initiating...") | |
| headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key} | |
| r = httpx.get(f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}", headers=headers, timeout=8.0) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])] | |
| except Exception as e: | |
| print(f"Brave Sync Fail: {e}") | |
| return [] | |
| def _search_ddg_sync(self, query: str, max_results: int) -> list: | |
| if DDGS is None: | |
| print("NEURAL RESEARCH [DDG-SYNC]: DDGS library not available. Install with: pip install ddgs") | |
| return [] | |
| # Retry up to 2 times with fresh DDGS instance | |
| for attempt in range(2): | |
| try: | |
| print(f"NEURAL RESEARCH [DDG-SYNC]: Attempt {attempt + 1}...") | |
| ddgs = DDGS() | |
| raw_results = ddgs.text(query, max_results=max_results) | |
| if raw_results and isinstance(raw_results, list): | |
| results = [] | |
| for r in raw_results: | |
| results.append({ | |
| 'title': r.get('title', 'DDG Source'), | |
| 'body': r.get('body', ''), | |
| 'href': r.get('href', '#') | |
| }) | |
| if results: | |
| print(f"DDG-SYNC Success: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| print(f"DDG Sync Attempt {attempt + 1} Fail: {e}") | |
| time.sleep(0.5) | |
| return [] | |
| def _search_ddg_html_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [DDG-HTML-SCRAPE]: Initiating...") | |
| headers = { | |
| "User-Agent": random.choice(self.user_agents), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5" | |
| } | |
| url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" | |
| r = httpx.get(url, headers=headers, timeout=6.0, follow_redirects=True) | |
| if r.status_code == 200: | |
| soup = BeautifulSoup(r.text, 'html.parser') | |
| results = [] | |
| for res_div in soup.find_all('div', class_='result')[:max_results]: | |
| title_a = res_div.find('a', class_='result__url') | |
| snippet_a = res_div.find('a', class_='result__snippet') | |
| if title_a: | |
| title = title_a.text.strip() | |
| href = title_a['href'] | |
| # Decode DDG redirection URL if necessary | |
| if href.startswith('//duckduckgo.com/y.js'): | |
| parsed = urllib.parse.urlparse(href) | |
| params = urllib.parse.parse_qs(parsed.query) | |
| if 'uddg' in params: | |
| href = params['uddg'][0] | |
| else: | |
| href = "https:" + href | |
| elif href.startswith('/'): | |
| href = "https://duckduckgo.com" + href | |
| body = snippet_a.text.strip() if snippet_a else "" | |
| results.append({'title': title, 'body': body, 'href': href}) | |
| print(f"DDG-HTML-SCRAPE Success: Found {len(results)} results.") | |
| return results | |
| except Exception as e: | |
| print(f"DDG HTML Scrape Fail: {e}") | |
| return [] | |
| def _search_google_scrape_sync(self, query: str, max_results: int) -> list: | |
| try: | |
| print("NEURAL RESEARCH [SCRAPE-SYNC]: Initiating...") | |
| headers = {"User-Agent": random.choice(self.user_agents)} | |
| google_url = f"https://www.google.com/search?q={urllib.parse.quote(query)}" | |
| r = httpx.get(google_url, headers=headers, timeout=5.0) | |
| if r.status_code == 200: | |
| soup = BeautifulSoup(r.text, 'html.parser') | |
| results = [] | |
| for g in soup.find_all('div', class_='g')[:max_results]: | |
| anchors = g.find_all('a') | |
| if anchors: | |
| results.append({ | |
| 'title': g.find('h3').text if g.find('h3') else 'Google Source', | |
| 'body': g.find('div', class_='VwiC3b').text if g.find('div', class_='VwiC3b') else '', | |
| 'href': anchors[0]['href'] | |
| }) | |
| return results | |
| except Exception as e: | |
| print(f"Google Scrape Sync Fail: {e}") | |
| return [] | |
| # ========================================== | |
| # 5. ASYNCHRONOUS PROVIDER INTEGRATIONS | |
| # ========================================== | |
| async def _search_tavily_async(self, query: str, max_results: int) -> list: | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| r = await client.post( | |
| "https://api.tavily.com/search", | |
| json={"api_key": self.tavily_key, "query": query, "search_depth": "basic", "max_results": max_results}, | |
| timeout=8.0 | |
| ) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Tavily Source'), 'body': x.get('content', ''), 'href': x.get('url', '#')} for x in r.json().get('results', [])] | |
| except Exception as e: | |
| print(f"Tavily Async Fail: {e}") | |
| return [] | |
| async def _search_exa_async(self, query: str, max_results: int) -> list: | |
| try: | |
| headers = {"x-api-key": self.exa_key, "content-type": "application/json"} | |
| async with httpx.AsyncClient() as client: | |
| r = await client.post( | |
| "https://api.exa.ai/search", | |
| json={"query": query, "numResults": max_results, "useAutoprompt": True}, | |
| headers=headers, | |
| timeout=8.0 | |
| ) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Exa Source'), 'body': x.get('text', x.get('highlights', [''])[0]), 'href': x.get('url', '#')} for x in r.json().get('results', [])] | |
| except Exception as e: | |
| print(f"Exa Async Fail: {e}") | |
| return [] | |
| async def _search_serper_async(self, query: str, max_results: int) -> list: | |
| try: | |
| headers = {"X-API-KEY": self.serper_key, "Content-Type": "application/json"} | |
| async with httpx.AsyncClient() as client: | |
| r = await client.post( | |
| "https://google.serper.dev/search", | |
| json={"q": query, "num": max_results}, | |
| headers=headers, | |
| timeout=8.0 | |
| ) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Serper Source'), 'body': x.get('snippet', ''), 'href': x.get('link', '#')} for x in r.json().get('organic', [])] | |
| except Exception as e: | |
| print(f"Serper Async Fail: {e}") | |
| return [] | |
| async def _search_brave_async(self, query: str, max_results: int) -> list: | |
| try: | |
| headers = {"Accept": "application/json", "X-Subscription-Token": self.brave_key} | |
| async with httpx.AsyncClient() as client: | |
| r = await client.get( | |
| f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={max_results}", | |
| headers=headers, | |
| timeout=8.0 | |
| ) | |
| if r.status_code == 200: | |
| return [{'title': x.get('title', 'Brave Source'), 'body': x.get('description', ''), 'href': x.get('url', '#')} for x in r.json().get('web', {}).get('results', [])] | |
| except Exception as e: | |
| print(f"Brave Async Fail: {e}") | |
| return [] | |
| async def _search_ddg_async(self, query: str, max_results: int) -> list: | |
| # Since duckduckgo_search library performs network calls inside context managers, | |
| # we can offload it safely to a worker thread using asyncio.to_thread to prevent blocking the event loop. | |
| try: | |
| return await asyncio.to_thread(self._search_ddg_sync, query, max_results) | |
| except Exception as e: | |
| print(f"DDG Async Offload Fail: {e}") | |
| return [] | |
| async def _search_ddg_html_async(self, query: str, max_results: int) -> list: | |
| try: | |
| return await asyncio.to_thread(self._search_ddg_html_sync, query, max_results) | |
| except Exception as e: | |
| print(f"DDG HTML Async Offload Fail: {e}") | |
| return [] | |
| async def _search_google_scrape_async(self, query: str, max_results: int) -> list: | |
| try: | |
| return await asyncio.to_thread(self._search_google_scrape_sync, query, max_results) | |
| except Exception as e: | |
| print(f"Google Scrape Async Offload Fail: {e}") | |
| return [] | |
| async def _search_searxng_async(self, query: str, max_results: int) -> list: | |
| """Search via public SearXNG instances - works well from cloud servers.""" | |
| instances = [ | |
| "https://search.sapti.me", | |
| "https://searx.tiekoetter.com", | |
| "https://search.bus-hit.me", | |
| "https://searx.be", | |
| ] | |
| for base_url in instances: | |
| try: | |
| print(f"NEURAL RESEARCH [SEARXNG]: Trying {base_url}...") | |
| async with httpx.AsyncClient() as client: | |
| r = await client.get( | |
| f"{base_url}/search", | |
| params={"q": query, "format": "json", "categories": "general", "language": "en"}, | |
| headers={"User-Agent": random.choice(self.user_agents)}, | |
| timeout=6.0, | |
| follow_redirects=True | |
| ) | |
| if r.status_code == 200: | |
| data = r.json() | |
| results = [] | |
| for x in data.get('results', [])[:max_results]: | |
| results.append({ | |
| 'title': x.get('title', 'SearXNG Source'), | |
| 'body': x.get('content', ''), | |
| 'href': x.get('url', '#') | |
| }) | |
| if results: | |
| print(f"SEARXNG Success from {base_url}: {len(results)} results") | |
| return results | |
| except Exception as e: | |
| print(f"SearXNG Fail ({base_url}): {e}") | |
| continue | |
| return [] | |
| def get_cricket_scores(self) -> str: | |
| """Fast helper for live cricket/IPL updates.""" | |
| return self.search_live("IPL match live score points table", max_results=3) | |