Spaces:
Sleeping
Sleeping
| import asyncio | |
| import aiohttp | |
| import gradio as gr | |
| import json | |
| import re | |
| import time | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple | |
| from urllib.parse import quote_plus, urljoin | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import newspaper | |
| from newspaper import Article | |
| import logging | |
| import warnings | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore") | |
| logging.getLogger().setLevel(logging.ERROR) | |
| class SearchResult: | |
| """Data class for search results""" | |
| title: str | |
| url: str | |
| snippet: str | |
| content: str = "" | |
| publication_date: Optional[str] = None | |
| relevance_score: float = 0.0 | |
| class QueryEnhancer: | |
| """Enhance user queries with search operators and entity quoting""" | |
| def __init__(self): | |
| # Common named entity patterns | |
| self.entity_patterns = [ | |
| r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', # Proper names | |
| r'\b[A-Z]{2,}(?:\s+[A-Z][a-z]+)*\b', # Acronyms + words | |
| r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(?:Inc|Corp|LLC|Ltd|Co|Company|Trust|Group|Holdings)\b' # Companies | |
| ] | |
| def enhance_query(self, query: str) -> str: | |
| """Enhance query by quoting named entities and adding operators""" | |
| enhanced = query | |
| # Find and quote named entities | |
| for pattern in self.entity_patterns: | |
| matches = re.findall(pattern, enhanced) | |
| for match in matches: | |
| if len(match.split()) > 1: # Only quote multi-word entities | |
| enhanced = enhanced.replace(match, f'"{match}"') | |
| return enhanced | |
| class SearchEngineInterface: | |
| """Interface for different search engines""" | |
| def __init__(self): | |
| self.session = None | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'Sec-Fetch-User': '?1', | |
| 'Cache-Control': 'max-age=0', | |
| } | |
| async def get_session(self): | |
| """Get or create aiohttp session with better configuration""" | |
| if self.session is None or self.session.closed: | |
| connector = aiohttp.TCPConnector( | |
| limit=20, | |
| limit_per_host=5, | |
| ttl_dns_cache=300, | |
| use_dns_cache=True, | |
| keepalive_timeout=30, | |
| enable_cleanup_closed=True | |
| ) | |
| timeout = aiohttp.ClientTimeout(total=45, connect=15, sock_read=30) | |
| self.session = aiohttp.ClientSession( | |
| headers=self.headers, | |
| connector=connector, | |
| timeout=timeout, | |
| trust_env=True | |
| ) | |
| return self.session | |
| async def search_google(self, query: str, num_results: int = 10) -> List[SearchResult]: | |
| """Search Google and parse results""" | |
| try: | |
| session = await self.get_session() | |
| url = f"https://www.google.com/search?q={quote_plus(query)}&num={num_results}" | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| return [] | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| results = [] | |
| # Parse Google search results | |
| for g in soup.find_all('div', class_='g')[:num_results]: | |
| try: | |
| title_elem = g.find('h3') | |
| if not title_elem: | |
| continue | |
| title = title_elem.get_text() | |
| # Get URL | |
| link_elem = g.find('a') | |
| if not link_elem or not link_elem.get('href'): | |
| continue | |
| url = link_elem['href'] | |
| # Get snippet | |
| snippet_elem = g.find('span', class_=['st', 'aCOpRe']) | |
| if not snippet_elem: | |
| snippet_elem = g.find('div', class_=['s', 'st']) | |
| snippet = snippet_elem.get_text() if snippet_elem else "" | |
| if title and url.startswith('http'): | |
| results.append(SearchResult(title=title, url=url, snippet=snippet)) | |
| except Exception as e: | |
| continue | |
| return results | |
| except Exception as e: | |
| print(f"Google search error: {e}") | |
| return [] | |
| async def search_bing(self, query: str, num_results: int = 10) -> List[SearchResult]: | |
| """Search Bing and parse results""" | |
| try: | |
| session = await self.get_session() | |
| url = f"https://www.bing.com/search?q={quote_plus(query)}&count={num_results}" | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| return [] | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| results = [] | |
| # Parse Bing search results | |
| for result in soup.find_all('li', class_='b_algo')[:num_results]: | |
| try: | |
| title_elem = result.find('h2') | |
| if not title_elem: | |
| continue | |
| link_elem = title_elem.find('a') | |
| if not link_elem: | |
| continue | |
| title = link_elem.get_text() | |
| url = link_elem.get('href', '') | |
| snippet_elem = result.find('p', class_='b_paractl') or result.find('div', class_='b_caption') | |
| snippet = snippet_elem.get_text() if snippet_elem else "" | |
| if title and url.startswith('http'): | |
| results.append(SearchResult(title=title, url=url, snippet=snippet)) | |
| except Exception as e: | |
| continue | |
| return results | |
| except Exception as e: | |
| print(f"Bing search error: {e}") | |
| return [] | |
| async def search_yahoo(self, query: str, num_results: int = 10) -> List[SearchResult]: | |
| """Search Yahoo and parse results""" | |
| try: | |
| session = await self.get_session() | |
| url = f"https://search.yahoo.com/search?p={quote_plus(query)}&n={num_results}" | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| return [] | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| results = [] | |
| # Parse Yahoo search results | |
| for result in soup.find_all('div', class_='dd')[:num_results]: | |
| try: | |
| title_elem = result.find('h3', class_='title') | |
| if not title_elem: | |
| continue | |
| link_elem = title_elem.find('a') | |
| if not link_elem: | |
| continue | |
| title = link_elem.get_text() | |
| url = link_elem.get('href', '') | |
| snippet_elem = result.find('div', class_='compText') | |
| snippet = snippet_elem.get_text() if snippet_elem else "" | |
| if title and url.startswith('http'): | |
| results.append(SearchResult(title=title, url=url, snippet=snippet)) | |
| except Exception as e: | |
| continue | |
| return results | |
| except Exception as e: | |
| print(f"Yahoo search error: {e}") | |
| return [] | |
| async def close(self): | |
| """Close the session safely""" | |
| if self.session and not self.session.closed: | |
| await self.session.close() | |
| # Wait a bit for the underlying connections to close | |
| await asyncio.sleep(0.1) | |
| class ContentScraper: | |
| """Scrape and parse article content using newspaper3k with robust error handling""" | |
| def __init__(self): | |
| self.session = None | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'cross-site', | |
| 'Sec-Fetch-User': '?1', | |
| 'Cache-Control': 'no-cache', | |
| 'Pragma': 'no-cache' | |
| } | |
| # Domains known to block scrapers - we'll handle these differently | |
| self.blocked_domains = { | |
| 'bloomberg.com', 'wsj.com', 'ft.com', 'nytimes.com', | |
| 'washingtonpost.com', 'economist.com', 'reuters.com' | |
| } | |
| async def get_session(self): | |
| """Get or create aiohttp session with robust configuration""" | |
| if self.session is None or self.session.closed: | |
| connector = aiohttp.TCPConnector( | |
| limit=30, | |
| limit_per_host=10, | |
| ttl_dns_cache=300, | |
| use_dns_cache=True, | |
| keepalive_timeout=60, | |
| enable_cleanup_closed=True, | |
| ssl=False # Disable SSL verification for problematic sites | |
| ) | |
| timeout = aiohttp.ClientTimeout(total=60, connect=20, sock_read=40) | |
| self.session = aiohttp.ClientSession( | |
| headers=self.headers, | |
| connector=connector, | |
| timeout=timeout, | |
| trust_env=True | |
| ) | |
| return self.session | |
| def is_blocked_domain(self, url: str) -> bool: | |
| """Check if domain is known to block scrapers""" | |
| from urllib.parse import urlparse | |
| try: | |
| domain = urlparse(url).netloc.lower() | |
| return any(blocked in domain for blocked in self.blocked_domains) | |
| except: | |
| return False | |
| async def scrape_article_fallback(self, url: str) -> Tuple[str, Optional[str]]: | |
| """Enhanced fallback scraping method using direct HTTP request""" | |
| try: | |
| session = await self.get_session() | |
| # Add random delay to avoid rate limiting | |
| await asyncio.sleep(0.2) | |
| async with session.get(url, allow_redirects=True) as response: | |
| if response.status != 200: | |
| return "", None | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # Remove unwanted elements | |
| for unwanted in soup(["script", "style", "nav", "header", "footer", "aside", "iframe", "noscript"]): | |
| unwanted.decompose() | |
| # Try multiple content extraction strategies | |
| content = "" | |
| # Strategy 1: Look for common article content containers | |
| content_selectors = [ | |
| # Generic selectors | |
| 'article', '[role="main"]', 'main', '.main-content', '.content', | |
| # News-specific selectors | |
| '.story-body', '.article-body', '.entry-content', '.post-content', | |
| '.article-content', '.story-content', '.news-content', | |
| # Site-specific selectors | |
| '[data-module="ArticleBody"]', '.RichTextStoryBody', '.InlineVideo', | |
| '.zone-content', '.field-name-body', '.story-text', | |
| # CNN specific | |
| '.zn-body__paragraph', '.zn-body-text', | |
| # Fox News specific | |
| '.article-body', '.article-text', | |
| # NBC specific | |
| '.articleText', '.inline-story-content', | |
| # AP News specific | |
| '.Article', '.RichTextStoryBody', | |
| # BBC specific | |
| '[data-component="text-block"]', '.ssrcss-1q0x1qg-Paragraph', | |
| # Generic fallbacks | |
| '.text', '.body', '[class*="content"]', '[class*="article"]', '[class*="story"]' | |
| ] | |
| for selector in content_selectors: | |
| try: | |
| elements = soup.select(selector) | |
| if elements: | |
| texts = [] | |
| for elem in elements: | |
| text = elem.get_text(separator=' ', strip=True) | |
| if len(text) > 50: # Only meaningful content | |
| texts.append(text) | |
| if texts: | |
| content = ' '.join(texts) | |
| if len(content) > 200: # Good content found | |
| break | |
| except: | |
| continue | |
| # Strategy 2: If no structured content, get all paragraphs | |
| if not content or len(content) < 100: | |
| paragraphs = soup.find_all('p') | |
| p_texts = [] | |
| for p in paragraphs: | |
| text = p.get_text(strip=True) | |
| # Filter out short paragraphs, likely navigation/ads | |
| if len(text) > 30 and not any(skip in text.lower() for skip in | |
| ['cookie', 'advertisement', 'subscribe', 'newsletter', | |
| 'follow us', 'social media', 'share this']): | |
| p_texts.append(text) | |
| if p_texts: | |
| content = ' '.join(p_texts) | |
| # Strategy 3: Extract from divs with text content | |
| if not content or len(content) < 100: | |
| divs = soup.find_all('div') | |
| div_texts = [] | |
| for div in divs: | |
| # Only direct text, not nested | |
| text = div.get_text(separator=' ', strip=True) | |
| if 100 < len(text) < 1000: # Reasonable paragraph length | |
| # Check if it's likely article content | |
| if any(word in text.lower() for word in ['said', 'according', 'reported', 'stated', 'announced']): | |
| div_texts.append(text) | |
| if div_texts: | |
| content = ' '.join(div_texts[:3]) # Take first 3 relevant divs | |
| # Try to extract publication date | |
| pub_date = None | |
| date_selectors = [ | |
| 'time[datetime]', '[datetime]', | |
| '.published-date', '.post-date', '.article-date', | |
| '.timestamp', '.date', '.publish-date', | |
| '[data-testid="timestamp"]', '.byline-timestamp', | |
| '.story-date', '.news-date' | |
| ] | |
| for selector in date_selectors: | |
| try: | |
| date_elem = soup.select_one(selector) | |
| if date_elem: | |
| pub_date = (date_elem.get('datetime') or | |
| date_elem.get('content') or | |
| date_elem.get_text(strip=True)) | |
| if pub_date: | |
| break | |
| except: | |
| continue | |
| # Don't limit content length here - let LLM handle full content | |
| if content: | |
| # Remove excessive whitespace | |
| content = ' '.join(content.split()) | |
| return content, pub_date | |
| except Exception as e: | |
| print(f"Enhanced fallback scraping failed for {url}: {str(e)[:100]}...") | |
| return "", None | |
| async def scrape_article(self, url: str) -> Tuple[str, Optional[str]]: | |
| """Scrape article content with multiple fallback strategies""" | |
| content = "" | |
| pub_date = None | |
| # Method 1: Try newspaper3k first (simple approach) | |
| try: | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| if article.text and len(article.text.strip()) > 100: | |
| content = article.text.strip() # Don't limit content length | |
| pub_date = article.publish_date.isoformat() if article.publish_date else None | |
| return content, pub_date | |
| except Exception as e: | |
| print(f"Newspaper3k failed for {url}: {str(e)[:100]}...") | |
| # Method 2: Fallback to direct HTTP scraping | |
| try: | |
| content, pub_date = await self.scrape_article_fallback(url) | |
| if content and len(content.strip()) > 50: | |
| return content, pub_date | |
| except Exception as e: | |
| print(f"Fallback scraping failed for {url}: {str(e)[:100]}...") | |
| # Method 3: Last resort - try to get at least the title/snippet | |
| try: | |
| session = await self.get_session() | |
| async with session.get(url, allow_redirects=True) as response: | |
| if response.status == 200: | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # Get at least the title and meta description | |
| title = soup.find('title') | |
| title_text = title.get_text().strip() if title else "" | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| desc_text = meta_desc.get('content', '').strip() if meta_desc else "" | |
| if title_text or desc_text: | |
| content = f"{title_text}. {desc_text}".strip() | |
| return content, None | |
| except Exception as e: | |
| print(f"Last resort scraping failed for {url}: {str(e)[:100]}...") | |
| return "", None | |
| async def scrape_multiple(self, search_results: List[SearchResult], max_successful: int = None) -> List[SearchResult]: | |
| """Scrape multiple articles with robust error handling and retry logic""" | |
| if not search_results: | |
| return search_results | |
| max_successful = max_successful or len(search_results) | |
| successful_scraped = 0 | |
| semaphore = asyncio.Semaphore(5) # Limit concurrent requests | |
| async def scrape_with_semaphore(result: SearchResult) -> SearchResult: | |
| nonlocal successful_scraped | |
| if successful_scraped >= max_successful: | |
| return result | |
| async with semaphore: | |
| try: | |
| # Skip if already have enough successful results | |
| if successful_scraped >= max_successful: | |
| return result | |
| content, pub_date = await self.scrape_article(result.url) | |
| if content and len(content.strip()) > 50: | |
| result.content = content | |
| result.publication_date = pub_date | |
| successful_scraped += 1 | |
| print(f"β Successfully scraped: {result.url[:60]}...") | |
| else: | |
| print(f"β οΈ No content extracted from: {result.url[:60]}...") | |
| except Exception as e: | |
| print(f"β Failed to scrape {result.url[:60]}...: {e}") | |
| return result | |
| # Process all URLs but stop when we have enough successful results | |
| tasks = [] | |
| for result in search_results: | |
| if successful_scraped < max_successful: | |
| tasks.append(scrape_with_semaphore(result)) | |
| else: | |
| break | |
| if tasks: | |
| scraped_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Filter out exceptions and return successful results | |
| valid_results = [] | |
| for result in scraped_results: | |
| if not isinstance(result, Exception): | |
| valid_results.append(result) | |
| else: | |
| valid_results = search_results | |
| # Return results with content first, then others | |
| results_with_content = [r for r in valid_results if r.content.strip()] | |
| results_without_content = [r for r in valid_results if not r.content.strip()] | |
| print(f"π Scraping summary: {len(results_with_content)} successful, {len(results_without_content)} failed") | |
| return results_with_content + results_without_content | |
| async def close(self): | |
| """Close the session""" | |
| if self.session: | |
| await self.session.close() | |
| class EmbeddingFilter: | |
| """Filter search results using embedding-based similarity""" | |
| def __init__(self): | |
| self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') | |
| def filter_by_relevance(self, query: str, search_results: List[SearchResult], | |
| threshold: float = 0.1) -> List[SearchResult]: | |
| """Filter results by cosine similarity with query""" | |
| if not search_results: | |
| return search_results | |
| # Combine title, snippet, and content for each result | |
| result_texts = [] | |
| for result in search_results: | |
| combined_text = f"{result.title} {result.snippet} {result.content[:1000]}" | |
| result_texts.append(combined_text) | |
| if not result_texts: | |
| return search_results | |
| try: | |
| # Add query to the corpus for vectorization | |
| all_texts = [query] + result_texts | |
| # Vectorize texts | |
| tfidf_matrix = self.vectorizer.fit_transform(all_texts) | |
| # Calculate cosine similarity between query and each result | |
| query_vector = tfidf_matrix[0:1] | |
| result_vectors = tfidf_matrix[1:] | |
| similarities = cosine_similarity(query_vector, result_vectors)[0] | |
| # Add relevance scores and filter | |
| filtered_results = [] | |
| for i, result in enumerate(search_results): | |
| result.relevance_score = similarities[i] | |
| if similarities[i] >= threshold: | |
| filtered_results.append(result) | |
| # Sort by relevance score | |
| filtered_results.sort(key=lambda x: x.relevance_score, reverse=True) | |
| return filtered_results | |
| except Exception as e: | |
| print(f"Embedding filter error: {e}") | |
| return search_results | |
| class LLMSummarizer: | |
| """Improved summarizer without content validation filtering - sends all scraped content to LLM""" | |
| def __init__(self, groq_api_key: str = "", openrouter_api_key: str = ""): | |
| self.groq_api_key = groq_api_key | |
| self.openrouter_api_key = openrouter_api_key | |
| self.groq_model = "meta-llama/llama-4-maverick-17b-128e-instruct" | |
| self.openrouter_model = "deepseek/deepseek-r1:free" | |
| def create_system_prompt(self) -> str: | |
| """Create system prompt for summarization""" | |
| return """You are an expert research assistant. Your task is to analyze search results and provide a comprehensive, accurate summary that directly answers the user's query. | |
| CRITICAL INSTRUCTIONS: | |
| 1. Analyze ALL provided content carefully and thoroughly | |
| 2. Extract and synthesize any information relevant to answering the user's question | |
| 3. Include specific facts, dates, numbers, and quotes when present | |
| 4. If information is contradictory between sources, mention this | |
| 5. Cite sources by mentioning the publication or website name | |
| 6. Be thorough and detailed in your analysis | |
| 7. If some content seems tangentially related, still include relevant portions | |
| 8. Focus on directly answering the user's query with the most relevant information first | |
| Format your response as a comprehensive summary, not bullet points. Provide a thorough analysis of all the content provided.""" | |
| def prepare_content_for_llm(self, query: str, search_results: List[SearchResult]) -> str: | |
| """Prepare content for LLM without validation filtering - include ALL scraped content""" | |
| # No content validation - include all results that have any content | |
| valid_results = [result for result in search_results if result.content.strip()] | |
| if not valid_results: | |
| return f"""Query: "{query}" | |
| No content was successfully scraped from the search results. This might be due to anti-bot protections or network issues.""" | |
| content_parts = [f'User Query: "{query}"\n'] | |
| content_parts.append(f"Number of sources with content: {len(valid_results)}\n") | |
| for i, result in enumerate(valid_results, 1): | |
| content_parts.append(f"=== SOURCE {i} ===") | |
| content_parts.append(f"Title: {result.title}") | |
| content_parts.append(f"URL: {result.url}") | |
| if result.publication_date: | |
| content_parts.append(f"Date: {result.publication_date}") | |
| if result.relevance_score > 0: | |
| content_parts.append(f"Relevance Score: {result.relevance_score:.3f}") | |
| # Include snippet if it's different from content start | |
| if result.snippet and not result.content.startswith(result.snippet[:50]): | |
| content_parts.append(f"Snippet: {result.snippet}") | |
| # Include FULL content without truncation - let the LLM handle the large context | |
| content = result.content.strip() | |
| content_parts.append(f"Content: {content}") | |
| content_parts.append("") # Empty line between sources | |
| return "\n".join(content_parts) | |
| async def summarize_with_groq(self, query: str, search_results: List[SearchResult], | |
| temperature: float = 0.3, max_tokens: int = 8000) -> str: | |
| """Enhanced Groq summarization with increased token limits and no content filtering""" | |
| if not self.groq_api_key: | |
| return "Groq API key not provided" | |
| try: | |
| # Prepare content without validation filtering | |
| prepared_content = self.prepare_content_for_llm(query, search_results) | |
| # Debug output | |
| print(f"DEBUG - Sending {len(prepared_content)} characters to Groq AI") | |
| print(f"DEBUG - Results with content: {len([r for r in search_results if r.content])}") | |
| print(f"DEBUG - Max completion tokens: {max_tokens}") | |
| user_prompt = f"""Please analyze the following search results and provide a comprehensive summary that directly answers the user's query. | |
| {prepared_content} | |
| Instructions: | |
| - Focus on information relevant to the query: "{query}" | |
| - Analyze ALL provided content thoroughly | |
| - Be specific and factual, include dates/numbers when available | |
| - Mention source publications when referencing information | |
| - If results contain limited relevant information, state this clearly but still extract what you can | |
| - Provide a comprehensive analysis of all available content""" | |
| headers = { | |
| "Authorization": f"Bearer {self.openrouter_api_key}", | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://huggingface.co/spaces", | |
| "X-Title": "AI Search Engine" | |
| } | |
| payload = { | |
| "model": self.openrouter_model, | |
| "messages": [ | |
| {"role": "system", "content": self.create_system_prompt()}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post("https://openrouter.ai/api/v1/chat/completions", | |
| headers=headers, json=payload) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| summary = result["choices"][0]["message"]["content"] | |
| # Add debug info | |
| debug_info = f"\n\n[Content Sources: {len([r for r in search_results if r.content])} with content, {len(search_results)} total]" | |
| return summary + debug_info | |
| else: | |
| error_text = await response.text() | |
| return f"OpenRouter API error: {response.status} - {error_text}" | |
| except Exception as e: | |
| return f"Error with OpenRouter summarization: {str(e)}" | |
| class AISearchEngine: | |
| """Main AI-powered search engine class""" | |
| def __init__(self, groq_api_key: str = "", openrouter_api_key: str = ""): | |
| self.query_enhancer = QueryEnhancer() | |
| self.search_interface = SearchEngineInterface() | |
| self.content_scraper = ContentScraper() | |
| self.embedding_filter = EmbeddingFilter() | |
| self.llm_summarizer = LLMSummarizer(groq_api_key, openrouter_api_key) | |
| async def search_and_summarize(self, | |
| query: str, | |
| search_engines: List[str], | |
| model: str, | |
| use_embeddings: bool, | |
| temperature: float, | |
| max_results: int, | |
| max_tokens: int) -> Tuple[str, str]: | |
| """Main search and summarization pipeline with robust error handling""" | |
| start_time = time.time() | |
| status_updates = [] | |
| try: | |
| # Step 1: Query Enhancement | |
| status_updates.append("π Enhancing search query...") | |
| enhanced_query = self.query_enhancer.enhance_query(query) | |
| status_updates.append(f"Enhanced query: {enhanced_query}") | |
| # Step 2: Parallel Search across engines | |
| status_updates.append("π Searching across multiple engines...") | |
| search_tasks = [] | |
| if "Google" in search_engines: | |
| search_tasks.append(self.search_interface.search_google(enhanced_query, max_results)) | |
| if "Bing" in search_engines: | |
| search_tasks.append(self.search_interface.search_bing(enhanced_query, max_results)) | |
| if "Yahoo" in search_engines: | |
| search_tasks.append(self.search_interface.search_yahoo(enhanced_query, max_results)) | |
| if not search_tasks: | |
| return "No search engines selected", "\n".join(status_updates) | |
| search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) | |
| # Combine and deduplicate results, handling exceptions | |
| all_results = [] | |
| seen_urls = set() | |
| for results_list in search_results_lists: | |
| if not isinstance(results_list, Exception) and results_list: | |
| for result in results_list: | |
| if result.url not in seen_urls and result.url.startswith('http'): | |
| all_results.append(result) | |
| seen_urls.add(result.url) | |
| status_updates.append(f"Found {len(all_results)} unique results") | |
| if not all_results: | |
| return "No search results found. This might be due to rate limiting or network issues. Please try again.", "\n".join(status_updates) | |
| # Step 3: Content Scraping with intelligent retry and fallback | |
| status_updates.append("π Scraping article content...") | |
| # Prioritize results and scrape intelligently | |
| target_successful = min(max_results, len(all_results)) | |
| scraped_results = await self.content_scraper.scrape_multiple( | |
| all_results[:max_results * 2], # Try more URLs to ensure we get enough content | |
| max_successful=target_successful | |
| ) | |
| # Include ALL results with any content (no filtering) | |
| results_with_content = [r for r in scraped_results if r.content.strip()] | |
| status_updates.append(f"Successfully scraped {len(results_with_content)} articles with content") | |
| # Debug: Show what content we actually got | |
| for i, result in enumerate(results_with_content[:3]): | |
| print(f"Result {i+1}: {result.title}") | |
| print(f"Content length: {len(result.content)}") | |
| print(f"Content preview: {result.content[:200]}...") | |
| print("---") | |
| # If we don't have enough content, try to get some from snippets | |
| if len(results_with_content) < 3: | |
| status_updates.append("Using search snippets as fallback content...") | |
| for result in scraped_results: | |
| if not result.content.strip() and result.snippet.strip(): | |
| result.content = result.snippet | |
| results_with_content.append(result) | |
| if len(results_with_content) >= 5: # Reasonable minimum | |
| break | |
| if not results_with_content: | |
| return "No article content could be extracted. This might be due to anti-bot protections. Please try a different query or try again later.", "\n".join(status_updates) | |
| # Step 4: Optional Embedding-based Filtering | |
| if use_embeddings and results_with_content: | |
| status_updates.append("π§ Filtering results using embeddings...") | |
| try: | |
| filtered_results = self.embedding_filter.filter_by_relevance(query, results_with_content) | |
| if filtered_results: | |
| results_with_content = filtered_results | |
| status_updates.append(f"Filtered to {len(filtered_results)} most relevant results") | |
| else: | |
| status_updates.append("Embedding filter returned no results, using all scraped content") | |
| except Exception as e: | |
| status_updates.append(f"Embedding filtering failed, using all results: {str(e)}") | |
| if not results_with_content: | |
| return "No relevant results found after filtering", "\n".join(status_updates) | |
| # Step 5: LLM Summarization - now sends ALL content without validation filtering | |
| status_updates.append(f"π€ Generating summary using {model} (processing all scraped content)...") | |
| try: | |
| if model.startswith("Groq"): | |
| summary = await self.llm_summarizer.summarize_with_groq( | |
| query, results_with_content, temperature, max_tokens | |
| ) | |
| else: # OpenRouter | |
| summary = await self.llm_summarizer.summarize_with_openrouter( | |
| query, results_with_content, temperature, max_tokens | |
| ) | |
| # Check if summarization failed | |
| if summary.startswith("Error") or summary.startswith("Groq API error") or summary.startswith("OpenRouter API error"): | |
| # Provide a basic summary from the content | |
| basic_summary = self.create_basic_summary(query, results_with_content) | |
| summary = f"AI summarization failed, but here's what I found:\n\n{basic_summary}\n\n---\nβ οΈ Original error: {summary}" | |
| except Exception as e: | |
| # Fallback to basic summary | |
| basic_summary = self.create_basic_summary(query, results_with_content) | |
| summary = f"AI summarization encountered an error, but here's what I found:\n\n{basic_summary}\n\n---\nβ οΈ Error: {str(e)}" | |
| # Add metadata | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| metadata = f"\n\n---\n**Search Metadata:**\n" | |
| metadata += f"- Processing time: {processing_time:.2f} seconds\n" | |
| metadata += f"- Results found: {len(all_results)}\n" | |
| metadata += f"- Articles scraped: {len(results_with_content)}\n" | |
| metadata += f"- Search engines: {', '.join(search_engines)}\n" | |
| metadata += f"- Model: {model}\n" | |
| metadata += f"- Embeddings used: {use_embeddings}\n" | |
| metadata += f"- Content filtering: DISABLED (all content sent to LLM)\n" | |
| final_summary = summary + metadata | |
| status_updates.append(f"β Summary generated in {processing_time:.2f}s") | |
| return final_summary, "\n".join(status_updates) | |
| except Exception as e: | |
| error_msg = f"Error in search pipeline: {str(e)}" | |
| status_updates.append(f"β {error_msg}") | |
| return error_msg, "\n".join(status_updates) | |
| finally: | |
| # Cleanup - but don't close sessions immediately to allow reuse | |
| try: | |
| # Don't close sessions here as they might be reused | |
| pass | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def create_basic_summary(self, query: str, results: List[SearchResult]) -> str: | |
| """Create a basic summary when AI summarization fails""" | |
| summary_parts = [f"Based on search results for: **{query}**\n"] | |
| for i, result in enumerate(results[:5], 1): | |
| content_preview = result.content[:300] + "..." if len(result.content) > 300 else result.content | |
| summary_parts.append(f"**{i}. {result.title}**") | |
| summary_parts.append(f"Source: {result.url}") | |
| if result.publication_date: | |
| summary_parts.append(f"Date: {result.publication_date}") | |
| summary_parts.append(f"Content: {content_preview}") | |
| summary_parts.append("") | |
| return "\n".join(summary_parts) | |
| # Global search engine instance | |
| search_engine = None | |
| async def initialize_search_engine(groq_key: str, openrouter_key: str): | |
| """Initialize the search engine with API keys""" | |
| global search_engine | |
| search_engine = AISearchEngine(groq_key, openrouter_key) | |
| return search_engine | |
| async def perform_search(query: str, | |
| search_engines: List[str], | |
| model: str, | |
| use_embeddings: bool, | |
| temperature: float, | |
| max_results: int, | |
| max_tokens: int, | |
| groq_key: str, | |
| openrouter_key: str): | |
| """Perform search with given parameters""" | |
| global search_engine | |
| if search_engine is None: | |
| search_engine = await initialize_search_engine(groq_key, openrouter_key) | |
| return await search_engine.search_and_summarize( | |
| query, search_engines, model, use_embeddings, | |
| temperature, max_results, max_tokens | |
| ) | |
| async def chat_inference(message, history, groq_key, openrouter_key, model_choice, search_engines, use_embeddings, temperature, max_results, max_tokens): | |
| """Main chat inference function for ChatInterface with additional inputs""" | |
| try: | |
| if not message.strip(): | |
| yield "Please enter a search query." | |
| return | |
| if not groq_key and not openrouter_key: | |
| yield "β Please provide at least one API key (Groq or OpenRouter) to use the AI summarization features." | |
| return | |
| if not search_engines: | |
| yield "β Please select at least one search engine." | |
| return | |
| # Initialize search engine | |
| global search_engine | |
| if search_engine is None: | |
| search_engine = await initialize_search_engine(groq_key, openrouter_key) | |
| else: | |
| # Update API keys if they changed | |
| search_engine.llm_summarizer.groq_api_key = groq_key | |
| search_engine.llm_summarizer.openrouter_api_key = openrouter_key | |
| # Start with status updates | |
| yield "π Enhancing query and searching across multiple engines..." | |
| # Small delay to show the initial status | |
| await asyncio.sleep(0.1) | |
| # Update status | |
| yield "π Fetching results from search engines..." | |
| await asyncio.sleep(0.1) | |
| # Update status | |
| yield "π Scraping article content..." | |
| await asyncio.sleep(0.1) | |
| if use_embeddings: | |
| yield "π§ Filtering results using embeddings..." | |
| await asyncio.sleep(0.1) | |
| yield "π€ Generating AI-powered summary (processing all scraped content)..." | |
| await asyncio.sleep(0.1) | |
| # Perform the actual search and summarization | |
| summary, status = await search_engine.search_and_summarize( | |
| message, | |
| search_engines, | |
| model_choice, | |
| use_embeddings, | |
| temperature, | |
| max_results, | |
| max_tokens | |
| ) | |
| # Stream the final result | |
| yield summary | |
| except Exception as e: | |
| yield f"β Search failed: {str(e)}\n\nPlease check your API keys and try again." | |
| def create_gradio_interface(): | |
| """Create the modern Gradio ChatInterface""" | |
| # Define additional inputs for the accordion | |
| additional_inputs = [ | |
| gr.Textbox( | |
| label="π Groq API Key", | |
| type="password", | |
| placeholder="Enter your Groq API key (get from: https://console.groq.com/)", | |
| info="Required for Groq Llama-4 model" | |
| ), | |
| gr.Textbox( | |
| label="π OpenRouter API Key", | |
| type="password", | |
| placeholder="Enter your OpenRouter API key (get from: https://openrouter.ai/)", | |
| info="Required for OpenRouter DeepSeek-R1 model" | |
| ), | |
| gr.Dropdown( | |
| choices=["Groq (Llama-4)", "OpenRouter (DeepSeek-R1)"], | |
| value="Groq (Llama-4)", | |
| label="π€ AI Model", | |
| info="Choose the AI model for summarization" | |
| ), | |
| gr.CheckboxGroup( | |
| choices=["Google", "Bing", "Yahoo"], | |
| value=["Google", "Bing"], | |
| label="π Search Engines", | |
| info="Select which search engines to use (multiple recommended)" | |
| ), | |
| gr.Checkbox( | |
| value=True, | |
| label="π§ Use Embedding-based Filtering", | |
| info="Filter results by relevance using TF-IDF similarity (recommended)" | |
| ), | |
| gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.1, | |
| label="π‘οΈ Temperature", | |
| info="Higher = more creative, Lower = more focused (0.1-0.3 recommended for factual queries)" | |
| ), | |
| gr.Slider( | |
| minimum=5, | |
| maximum=20, | |
| value=10, | |
| step=1, | |
| label="π Max Results per Engine", | |
| info="Number of search results to fetch from each engine" | |
| ), | |
| gr.Slider( | |
| minimum=1000, | |
| maximum=8000, | |
| value=8000, | |
| step=500, | |
| label="π Max Completion Tokens", | |
| info="Maximum length of the AI-generated summary (Groq: up to 8000, OpenRouter: up to 4000)" | |
| ) | |
| ] | |
| # Create the main ChatInterface | |
| chat_interface = gr.ChatInterface( | |
| fn=chat_inference, | |
| additional_inputs=additional_inputs, | |
| additional_inputs_accordion=gr.Accordion("βοΈ Configuration & Advanced Parameters", open=True), | |
| title="π AI-Powered Search Engine - No Content Filtering", | |
| description=""" | |
| **Search across Google, Bing, and Yahoo, then get AI-powered summaries!** | |
| β¨ **Features:** Multi-engine search β’ Query enhancement β’ Parallel scraping β’ AI summarization β’ Embedding filtering | |
| π **Updated:** All scraped content is now sent to the LLM without filtering β’ Increased Groq token limits (up to 8K) | |
| π **Quick Start:** 1) Add your API key below 2) Select search engines 3) Ask any question! | |
| """, | |
| cache_examples=False, | |
| submit_btn="π Search & Summarize", | |
| stop_btn="βΉοΈ Stop", | |
| chatbot=gr.Chatbot( | |
| show_copy_button=True, | |
| layout="bubble", | |
| height=600, | |
| placeholder="π Ready to search! All scraped content will be sent to the LLM for comprehensive analysis.", | |
| show_share_button=True | |
| ), | |
| theme=gr.themes.Soft(), | |
| analytics_enabled=False, | |
| type="messages" # Use the modern message format | |
| ) | |
| return chat_interface | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| demo.launch(share=True) |