Spaces:
Build error
Build error
| from typing import Dict, List, Any | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from transformers import pipeline | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import time | |
| import json | |
| import os | |
| from urllib.parse import urlparse, quote_plus | |
| import logging | |
| import random | |
| logger = logging.getLogger(__name__) | |
| class SearchResult: | |
| def __init__(self, title: str, link: str, snippet: str): | |
| self.title = title | |
| self.link = link | |
| self.snippet = snippet | |
| class ModelManager: | |
| """Manages different AI models for specific tasks""" | |
| def __init__(self): | |
| self.device = "cpu" | |
| self.models = {} | |
| self.load_models() | |
| def load_models(self): | |
| # Use smaller models for CPU deployment | |
| self.models['summarizer'] = pipeline( | |
| "summarization", | |
| model="facebook/bart-base", | |
| device=self.device | |
| ) | |
| self.models['embeddings'] = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": self.device} | |
| ) | |
| class ContentProcessor: | |
| """Processes and analyzes different types of content""" | |
| def __init__(self): | |
| self.model_manager = ModelManager() | |
| def clean_text(self, text: str) -> str: | |
| """Clean and normalize text content""" | |
| # Remove extra whitespace and normalize | |
| lines = [line.strip() for line in text.splitlines()] | |
| text = ' '.join(line for line in lines if line) | |
| # Remove redundant spaces | |
| text = ' '.join(text.split()) | |
| # Remove common navigation elements | |
| nav_patterns = [ | |
| "skip to content", | |
| "search", | |
| "menu", | |
| "navigation", | |
| "subscribe", | |
| "sign in", | |
| "log in" | |
| ] | |
| for pattern in nav_patterns: | |
| text = text.replace(pattern, "") | |
| return text | |
| def extract_key_points(self, text: str, max_points: int = 5) -> List[str]: | |
| """Extract key points from text using the summarizer""" | |
| try: | |
| # Split text into chunks of ~1000 characters | |
| chunks = [text[i:i + 1000] for i in range(0, len(text), 1000)] | |
| all_points = [] | |
| for chunk in chunks[:3]: # Process first 3 chunks only | |
| summary = self.model_manager.models['summarizer']( | |
| chunk, | |
| max_length=100, | |
| min_length=30, | |
| do_sample=False | |
| )[0]['summary_text'] | |
| # Split into sentences and add as points | |
| sentences = [s.strip() for s in summary.split('.') if s.strip()] | |
| all_points.extend(sentences) | |
| # Return unique points, limited to max_points | |
| unique_points = list(dict.fromkeys(all_points)) | |
| return unique_points[:max_points] | |
| except Exception as e: | |
| logger.error(f"Error extracting key points: {str(e)}") | |
| return [] | |
| def process_content(self, content: str) -> Dict: | |
| """Process content and generate insights""" | |
| try: | |
| # Clean the text | |
| cleaned_text = self.clean_text(content) | |
| # Extract key points | |
| key_points = self.extract_key_points(cleaned_text) | |
| # Generate a concise summary | |
| summary = self.model_manager.models['summarizer']( | |
| cleaned_text[:1024], | |
| max_length=150, | |
| min_length=50, | |
| do_sample=False | |
| )[0]['summary_text'] | |
| # Extract potential topics/keywords | |
| topics = [] | |
| common_topics = [ | |
| "quantum computing", "quantum processors", "quantum bits", | |
| "quantum algorithms", "quantum supremacy", "quantum advantage", | |
| "error correction", "quantum hardware", "quantum software", | |
| "quantum research", "quantum applications" | |
| ] | |
| for topic in common_topics: | |
| if topic.lower() in cleaned_text.lower(): | |
| topics.append(topic) | |
| return { | |
| 'summary': summary, | |
| 'key_points': key_points, | |
| 'topics': topics[:5], # Limit to top 5 topics | |
| 'content': cleaned_text | |
| } | |
| except Exception as e: | |
| return { | |
| 'summary': f"Error processing content: {str(e)}", | |
| 'key_points': [], | |
| 'topics': [], | |
| 'content': content | |
| } | |
| class WebSearchEngine: | |
| """Main search engine class""" | |
| def __init__(self): | |
| self.processor = ContentProcessor() | |
| self.session = requests.Session() | |
| self.request_delay = 2.0 | |
| self.last_request_time = 0 | |
| self.max_retries = 3 | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: | |
| """Make a GET request with retries and error handling""" | |
| for i in range(max_retries): | |
| try: | |
| # Add delay between requests | |
| current_time = time.time() | |
| time_since_last = current_time - self.last_request_time | |
| if time_since_last < self.request_delay: | |
| time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) | |
| response = self.session.get(url, headers=self.headers, timeout=10) | |
| self.last_request_time = time.time() | |
| if response.status_code == 200: | |
| return response | |
| elif response.status_code == 429: # Rate limit | |
| wait_time = (i + 1) * 5 | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| response.raise_for_status() | |
| except Exception as e: | |
| if i == max_retries - 1: | |
| raise | |
| time.sleep((i + 1) * 2) | |
| raise Exception(f"Failed to fetch URL after {max_retries} attempts") | |
| def is_valid_url(self, url: str) -> bool: | |
| """Check if URL is valid for crawling""" | |
| try: | |
| parsed = urlparse(url) | |
| return bool(parsed.netloc and parsed.scheme) | |
| except: | |
| return False | |
| def get_metadata(self, soup: BeautifulSoup) -> Dict: | |
| """Extract metadata from page""" | |
| title = soup.title.string if soup.title else "No title" | |
| description = "" | |
| if soup.find("meta", attrs={"name": "description"}): | |
| description = soup.find("meta", attrs={"name": "description"}).get("content", "") | |
| return { | |
| 'title': title, | |
| 'description': description | |
| } | |
| def process_url(self, url: str) -> Dict: | |
| """Process a single URL""" | |
| if not self.is_valid_url(url): | |
| return {'error': f"Invalid URL: {url}"} | |
| try: | |
| response = self.safe_get(url) | |
| soup = BeautifulSoup(response.text, 'lxml') | |
| # Extract text content | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| content = ' '.join(chunk for chunk in chunks if chunk) | |
| # Get metadata | |
| metadata = self.get_metadata(soup) | |
| # Process content | |
| processed = self.processor.process_content(content) | |
| return { | |
| 'url': url, | |
| 'title': metadata['title'], | |
| 'description': metadata['description'], | |
| 'summary': processed['summary'], | |
| 'key_points': processed['key_points'], | |
| 'topics': processed['topics'], | |
| 'content': processed['content'] | |
| } | |
| except Exception as e: | |
| return {'error': f"Error processing {url}: {str(e)}"} | |
| def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: | |
| """Search DuckDuckGo and parse HTML results""" | |
| search_results = [] | |
| try: | |
| # Encode query for URL | |
| encoded_query = quote_plus(query) | |
| # DuckDuckGo HTML search URL | |
| search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
| # Get search results page | |
| response = self.safe_get(search_url) | |
| soup = BeautifulSoup(response.text, 'lxml') | |
| # Find all result elements | |
| results = soup.find_all('div', {'class': 'result'}) | |
| for result in results[:max_results]: | |
| try: | |
| # Extract link | |
| link_elem = result.find('a', {'class': 'result__a'}) | |
| if not link_elem: | |
| continue | |
| link = link_elem.get('href', '') | |
| if not link or not self.is_valid_url(link): | |
| continue | |
| # Extract title | |
| title = link_elem.get_text(strip=True) | |
| # Extract snippet | |
| snippet_elem = result.find('a', {'class': 'result__snippet'}) | |
| snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" | |
| search_results.append({ | |
| 'link': link, | |
| 'title': title, | |
| 'snippet': snippet | |
| }) | |
| # Add delay between processing results | |
| time.sleep(random.uniform(0.2, 0.5)) | |
| except Exception as e: | |
| logger.warning(f"Error processing search result: {str(e)}") | |
| continue | |
| return search_results | |
| except Exception as e: | |
| logger.error(f"Error during DuckDuckGo search: {str(e)}") | |
| return [] | |
| def search(self, query: str, max_results: int = 5) -> Dict: | |
| """Perform search and process results""" | |
| try: | |
| # Search using DuckDuckGo HTML | |
| search_results = self.search_duckduckgo(query, max_results) | |
| if not search_results: | |
| return {'error': 'No results found'} | |
| results = [] | |
| all_key_points = [] | |
| all_topics = set() | |
| for result in search_results: | |
| if 'link' in result: | |
| processed = self.process_url(result['link']) | |
| if 'error' not in processed: | |
| results.append(processed) | |
| # Collect key points and topics | |
| if 'key_points' in processed: | |
| all_key_points.extend(processed['key_points']) | |
| if 'topics' in processed: | |
| all_topics.update(processed.get('topics', [])) | |
| time.sleep(random.uniform(0.5, 1.0)) | |
| if not results: | |
| return {'error': 'Failed to process any search results'} | |
| # Combine all summaries | |
| all_summaries = " ".join([r['summary'] for r in results if 'summary' in r]) | |
| # Generate a meta-summary of all content | |
| meta_summary = self.processor.model_manager.models['summarizer']( | |
| all_summaries[:1024], | |
| max_length=200, | |
| min_length=100, | |
| do_sample=False | |
| )[0]['summary_text'] | |
| # Get unique key points | |
| unique_key_points = list(dict.fromkeys(all_key_points)) | |
| return { | |
| 'results': results, | |
| 'insights': { | |
| 'summary': meta_summary, | |
| 'key_points': unique_key_points[:7], # Top 7 key points | |
| 'topics': list(all_topics)[:5] # Top 5 topics | |
| }, | |
| 'follow_up_questions': [ | |
| f"What are the recent breakthroughs in {', '.join(list(all_topics)[:2])}?", | |
| f"How do these developments impact the future of quantum computing?", | |
| f"What are the practical applications of these quantum computing advances?" | |
| ] | |
| } | |
| except Exception as e: | |
| return {'error': f"Search failed: {str(e)}"} | |
| # Main search function | |
| def search(query: str, max_results: int = 5) -> Dict: | |
| """Main search function""" | |
| engine = WebSearchEngine() | |
| return engine.search(query, max_results) | |