Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from openai import OpenAI | |
| import requests | |
| import json | |
| import httpx | |
| import os | |
| import logging | |
| from fake_useragent import UserAgent | |
| from typing import Optional, List, Dict, Tuple | |
| from itertools import cycle | |
| from datetime import datetime | |
| from bs4 import BeautifulSoup | |
| from googlesearch import search | |
| from newsapi import NewsApiClient | |
| import markdown | |
| import re | |
| import time | |
| import random | |
| from tenacity import retry, wait_exponential, stop_after_attempt | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class RaindropSearchBot: | |
| def __init__(self): | |
| self.openai_api_key = os.getenv('openaikey') | |
| self.raindrop_api_token = os.getenv('raindroptoken') | |
| self.newsapi_key = os.getenv('newsapikey') | |
| if not all([self.openai_api_key, self.raindrop_api_token, self.newsapi_key]): | |
| raise EnvironmentError( | |
| "Missing required environment variables. Please ensure all API keys are set." | |
| ) | |
| # Updated OpenAI client initialization | |
| self.client = OpenAI( | |
| api_key=self.openai_api_key, | |
| http_client=httpx.Client( | |
| timeout=60.0, | |
| follow_redirects=True | |
| ) | |
| ) | |
| self.newsapi = NewsApiClient(api_key=self.newsapi_key) | |
| self.min_delay = 5 # Increased minimum delay | |
| self.max_delay = 15 # Increased maximum delay | |
| self.ua = UserAgent() | |
| self.setup_proxies() | |
| def get_next_proxy(self) -> dict: | |
| """Get next proxy from the rotation""" | |
| try: | |
| proxy = next(self.proxy_cycle) | |
| return { | |
| 'http': proxy, | |
| 'https': proxy | |
| } | |
| except StopIteration: | |
| logger.warning("No proxies available, returning empty proxy dict") | |
| return {} | |
| def get_alternative_search_results(self, query: str) -> List[Dict]: | |
| """Implement alternative search engine if Google fails""" | |
| try: | |
| from duckduckgo_search import DDGS | |
| self.random_delay() | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=5)) | |
| return [{ | |
| 'title': result.get('title', ''), | |
| 'link': result.get('link', ''), | |
| 'snippet': result.get('body', '') | |
| } for result in results] | |
| except Exception as e: | |
| logger.error(f"Alternative search failed: {e}") | |
| return [] | |
| def search_with_fallback(self, query: str) -> List[Dict]: | |
| """Search with fallback to alternative search engines""" | |
| try: | |
| return self.get_google_results(query) | |
| except Exception as e: | |
| logger.warning(f"Google search failed: {e}") | |
| try: | |
| # Implement alternative search engine here | |
| # For example: DuckDuckGo, Bing, etc. | |
| return self.get_alternative_search_results(query) | |
| except Exception as e: | |
| logger.error(f"All search attempts failed: {e}") | |
| return [] | |
| def setup_proxies(self): | |
| """Setup proxy rotation""" | |
| # Free proxy list - replace with your paid proxy service for better reliability | |
| self.proxies = [ | |
| 'http://proxy1.example.com:8080', | |
| 'http://proxy2.example.com:8080', | |
| # Add more proxies here | |
| ] | |
| self.proxy_cycle = cycle(self.proxies) | |
| def random_delay(self): | |
| """Enhanced random delay with jitter""" | |
| base_delay = random.uniform(self.min_delay, self.max_delay) | |
| jitter = random.uniform(-1, 1) # Add/subtract up to 1 second | |
| delay = max(0, base_delay + jitter) | |
| time.sleep(delay) | |
| def get_google_results(self, query: str, num_results: int = 5) -> List[Dict]: | |
| """Get Google search results with improved handling""" | |
| try: | |
| search_results = [] | |
| session = self.create_session() | |
| # Break the search into smaller chunks | |
| chunk_size = 3 | |
| for i in range(0, num_results, chunk_size): | |
| # Add substantial random delay between chunks | |
| self.random_delay() | |
| try: | |
| chunk_results = list(search( | |
| query, | |
| num_results=min(chunk_size, num_results - i), | |
| advanced=True, | |
| lang="en", | |
| sleep_interval=random.uniform(5, 10), # Random delay between requests | |
| timeout=30 | |
| )) | |
| for result in chunk_results: | |
| search_results.append({ | |
| 'title': result.title, | |
| 'link': result.url, | |
| 'snippet': result.description | |
| }) | |
| # Add random delay between chunks | |
| time.sleep(random.uniform(8, 15)) | |
| except Exception as e: | |
| logger.warning(f"Error in search chunk {i}: {e}") | |
| continue | |
| return search_results | |
| except Exception as e: | |
| logger.error(f"Google search error: {e}") | |
| raise | |
| def get_news_results(self, query: str, num_results: int = 5) -> List[Dict]: | |
| """Get news articles using NewsAPI with retry and delay.""" | |
| try: | |
| # Add random delay before making the request | |
| self.random_delay() | |
| news_results = self.newsapi.get_everything( | |
| q=query, | |
| language='en', | |
| sort_by='relevancy', | |
| page_size=num_results | |
| ) | |
| return news_results.get('articles', []) | |
| except Exception as e: | |
| logger.error(f"News API error: {e}") | |
| return [] | |
| def extract_content_from_url(self, url: str) -> Optional[str]: | |
| """Extract main content from a URL using BeautifulSoup with retry and delay.""" | |
| try: | |
| # Add random delay before making the request | |
| self.random_delay() | |
| headers = { | |
| 'User-Agent': self.get_random_user_agent(), | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove unwanted elements | |
| for element in soup(['script', 'style', 'nav', 'header', 'footer', 'iframe']): | |
| element.decompose() | |
| # Get title | |
| title = soup.title.string if soup.title else '' | |
| # Get main content | |
| # First try common content containers | |
| content_containers = soup.select('article, main, .content, .post-content, .entry-content') | |
| if content_containers: | |
| content = content_containers[0].get_text(separator='\n', strip=True) | |
| else: | |
| # Fallback to all paragraphs | |
| paragraphs = soup.find_all('p') | |
| content = '\n'.join(p.get_text(strip=True) for p in paragraphs) | |
| # Combine and clean | |
| full_content = f"{title}\n\n{content}" | |
| # Clean up the text | |
| full_content = re.sub(r'\n\s*\n', '\n\n', full_content) # Remove extra newlines | |
| full_content = re.sub(r'\s+', ' ', full_content) # Normalize whitespace | |
| return full_content if full_content.strip() else None | |
| except Exception as e: | |
| logger.error(f"Error extracting content from {url}: {e}") | |
| return None | |
| def get_random_user_agent(self) -> str: | |
| """Get random user agent using fake-useragent""" | |
| return self.ua.random | |
| def create_session(self) -> requests.Session: | |
| """Create a session with random user agent and proxy""" | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': self.get_random_user_agent(), | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'Sec-Fetch-User': '?1', | |
| 'Cache-Control': 'max-age=0' | |
| }) | |
| session.proxies = self.get_next_proxy() | |
| return session | |
| def get_content_and_summary(self, request: str, item: Dict, source_type: str) -> Dict: | |
| """Get content and generate summary for a single item.""" | |
| try: | |
| # Get URL based on source type | |
| url = item.get('link') or item.get('url') | |
| if not url: | |
| logger.warning(f"No URL found in item from {source_type}") | |
| return item | |
| # For Raindrop items, use existing excerpt if available | |
| if source_type == 'raindrop' and item.get('excerpt'): | |
| content = item['excerpt'] | |
| else: | |
| content = self.extract_content_from_url(url) | |
| if not content: | |
| logger.warning(f"No content extracted from {url}") | |
| item['detailed_summary'] = "Content extraction failed." | |
| return item | |
| # Generate summary focused on the query topic | |
| try: | |
| prompt = f""" | |
| Analyze this content and provide a detailed summary focusing on key points related to the user request: | |
| {request} | |
| Content: {content[:4000]} # Limit content length for token constraints | |
| Requirements: | |
| 1. Focus on the most important facts and findings related to the topic | |
| 2. Include specific data points and quotes if relevant | |
| 3. Organize the information logically | |
| 4. Keep the summary to 2-3 paragraphs | |
| 5. Highlight any unique insights from this source | |
| 6. No need to add a conclusion | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3, | |
| max_tokens=300 | |
| ) | |
| item['detailed_summary'] = response.choices[0].message.content | |
| item['processed_content'] = content[:1000] # Store truncated content for later use | |
| except Exception as e: | |
| logger.error(f"Error generating summary: {e}") | |
| item['detailed_summary'] = "Summary generation failed." | |
| return item | |
| except Exception as e: | |
| logger.error(f"Error processing item from {source_type}: {e}") | |
| return item | |
| def search_raindrop(self, search_query: str) -> List[Dict]: | |
| """Search Raindrop.io with enhanced error handling and logging.""" | |
| logger.info(f"Searching Raindrop with query: {search_query}") | |
| headers = { | |
| "Authorization": f"Bearer {self.raindrop_api_token}" | |
| } | |
| # Test API connection first | |
| try: | |
| test_response = requests.get( | |
| "https://api.raindrop.io/rest/v1/user", | |
| headers=headers | |
| ) | |
| if test_response.status_code != 200: | |
| logger.error(f"API test failed: {test_response.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"API connection error: {e}") | |
| return [] | |
| # Perform search | |
| try: | |
| params = { | |
| "search": search_query, | |
| "perpage": 50, | |
| "sort": "-created", | |
| "page": 0 | |
| } | |
| response = requests.get( | |
| "https://api.raindrop.io/rest/v1/raindrops/0", | |
| headers=headers, | |
| params=params | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| items = data.get("items", []) | |
| logger.info(f"Found {len(items)} results") | |
| return items | |
| else: | |
| logger.error(f"Search failed: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| return [] | |
| def process_all_results(self, request, raindrop_results: List[Dict], | |
| google_results: List[Dict], | |
| news_results: List[Dict]) -> Tuple[List[Dict], List[Dict], List[Dict]]: | |
| """Process and enrich all results with content and summaries.""" | |
| processed_raindrop = [] | |
| for item in raindrop_results: | |
| processed_item = self.get_content_and_summary(request, item, 'raindrop') | |
| if processed_item.get('detailed_summary'): | |
| processed_raindrop.append(processed_item) | |
| # Add delay between processing items | |
| self.random_delay() | |
| processed_google = [] | |
| for item in google_results: | |
| processed_item = self.get_content_and_summary(request, item, 'google') | |
| if processed_item.get('detailed_summary'): | |
| processed_google.append(processed_item) | |
| # Add delay between processing items | |
| self.random_delay() | |
| processed_news = [] | |
| for item in news_results: | |
| processed_item = self.get_content_and_summary(request, item, 'news') | |
| if processed_item.get('detailed_summary'): | |
| processed_news.append(processed_item) | |
| # Add delay between processing items | |
| self.random_delay() | |
| return processed_raindrop, processed_google, processed_news | |
| def generate_essay_response(self, results: Tuple[List[Dict], List[Dict], List[Dict]], | |
| user_query: str) -> str: | |
| """Generate a structured essay-style response with references.""" | |
| raindrop_results, google_results, news_results = results | |
| # Collect all content for analysis | |
| all_content = "" | |
| reference_map = {} | |
| ref_counter = 1 | |
| def get_url(item): | |
| """Helper function to get URL from item regardless of field name""" | |
| if 'link' in item: | |
| return item['link'] | |
| elif 'url' in item: | |
| return item['url'] | |
| return None | |
| # Process Raindrop results | |
| for item in raindrop_results: | |
| url = get_url(item) | |
| if url and item.get('detailed_summary'): | |
| all_content += f"\n{item['detailed_summary']}\n" | |
| reference_map[url] = ref_counter | |
| ref_counter += 1 | |
| # Process Google results | |
| for item in google_results: | |
| url = get_url(item) | |
| if url and item.get('detailed_summary'): | |
| all_content += f"\n{item['detailed_summary']}\n" | |
| reference_map[url] = ref_counter | |
| ref_counter += 1 | |
| # Process News results | |
| for item in news_results: | |
| url = get_url(item) | |
| if url and item.get('detailed_summary'): | |
| all_content += f"\n{item['detailed_summary']}\n" | |
| reference_map[url] = ref_counter | |
| ref_counter += 1 | |
| try: | |
| prompt = f""" | |
| Create a comprehensive essay-style analysis about: {user_query} | |
| Use this content as your reference source material: | |
| {all_content} | |
| Requirements: | |
| 1. Structure the response in clear sections with markdown headers | |
| 2. Include an introduction and conclusion | |
| 3. Use reference numbers [n] to cite sources | |
| 4. Make connections between different sources | |
| 5. Highlight key findings and trends | |
| 6. Address any contradictions or gaps | |
| 7. Use markdown formatting for better readability | |
| Format the response as a proper academic essay with sections and sources. | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.5, | |
| max_tokens=1500 | |
| ) | |
| essay = response.choices[0].message.content | |
| # Replace reference placeholders with actual reference numbers | |
| for url, ref_num in reference_map.items(): | |
| essay = essay.replace(f'[URL:{url}]', f'[{ref_num}]') | |
| return essay | |
| except Exception as e: | |
| logger.error(f"Error generating essay: {e}") | |
| return "Error generating analysis." | |
| def format_results(self, results: Tuple[List[Dict], List[Dict], List[Dict]], | |
| essay: str) -> str: | |
| """Format the essay and results with detailed summaries.""" | |
| raindrop_results, google_results, news_results = results | |
| output = f"{essay}\n\n" | |
| output += "---\n\n" | |
| output += "# References and Detailed Summaries\n\n" | |
| ref_counter = 1 | |
| # Format Raindrop results | |
| if raindrop_results: | |
| output += "## ๐ Bookmarked Sources\n\n" | |
| for item in raindrop_results: | |
| output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
| output += f"**Link**: {item.get('link')}\n" | |
| if item.get('tags'): | |
| output += f"**Tags**: {', '.join(item['tags'])}\n" | |
| if item.get('created'): | |
| output += f"**Created**: {item['created'][:10]}\n" | |
| output += "\n**Summary**:\n" | |
| output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
| ref_counter += 1 | |
| # Format Google results | |
| if google_results: | |
| output += "## ๐ Web Sources\n\n" | |
| for item in google_results: | |
| output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
| output += f"**Link**: {item.get('link')}\n" | |
| output += "\n**Summary**:\n" | |
| output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
| ref_counter += 1 | |
| # Format News results | |
| if news_results: | |
| output += "## ๐ฐ Recent News\n\n" | |
| for item in news_results: | |
| output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
| output += f"**Link**: {item.get('url')}\n" | |
| if item.get('source', {}).get('name'): | |
| output += f"**Source**: {item['source']['name']}\n" | |
| if item.get('publishedAt'): | |
| output += f"**Published**: {item['publishedAt'][:10]}\n" | |
| output += "\n**Summary**:\n" | |
| output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
| ref_counter += 1 | |
| return output | |
| def process_request(self, user_request: str) -> str: | |
| """Process user request with improved error handling and query generation.""" | |
| try: | |
| # Generate optimized search query | |
| search_query = self.generate_search_queries(user_request) | |
| logger.info(f"Processing request: {search_query}") | |
| # Get search results with fallback | |
| google_results = self.search_with_fallback(search_query) | |
| # Add delay before news API call | |
| self.random_delay() | |
| # Get news results | |
| news_results = self.get_news_results(search_query) | |
| # Process all results - Fix: Pass the user_request as first argument | |
| processed_results = self.process_all_results( | |
| request=user_request, | |
| raindrop_results=[], # Empty list for raindrop results | |
| google_results=google_results, | |
| news_results=news_results | |
| ) | |
| # Generate response | |
| essay = self.generate_essay_response(processed_results, user_request) | |
| # Format and return results | |
| return self.format_results(processed_results, essay) | |
| except Exception as e: | |
| logger.error(f"Error processing request: {e}") | |
| return f""" | |
| An error occurred while processing your request: {str(e)} | |
| Please try again with a different search query or contact support if the problem persists. | |
| """ | |
| def generate_search_queries(self, user_request: str) -> str: | |
| """ | |
| Generate optimized search queries from user request. | |
| Args: | |
| user_request (str): The original user query | |
| Returns: | |
| str: Optimized search query | |
| """ | |
| try: | |
| # Clean and preprocess the user request | |
| cleaned_request = self.preprocess_query(user_request) | |
| # Generate search query using GPT | |
| prompt = f""" | |
| Convert this search request into an optimized search query using proper search operators. | |
| Request: {cleaned_request} | |
| Guidelines: | |
| - Focus on key concepts and synonyms | |
| - Use combination of keywords that would appear in titles or descriptions | |
| - Return only the search terms, no explanation | |
| - Include alternative phrasings | |
| - Keep it concise (max 6-8 key terms/phrases) | |
| - use the formatting authorised in raindrop search: | |
| o use " for exact search (ex: "artificial intelligence") | |
| o use - to exclude some terms (ex: -math) // Do not exclude terms that are potentially relevant | |
| o use match:OR for alternatives (ex: apple match:OR banana ) | |
| o use match:AND for inclusion of both cases systematically (ex: apple match:AND banana ) | |
| o use parenthesis for combinations ( ex: sugar match:AND (banana match:OR apple) ) | |
| Example elaborate request: ("artificial intelligence" match:OR AI) -"machine learning" | |
| Use your judgement, think step by steps. | |
| Return only the search query terms. | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3, | |
| max_tokens=100 | |
| ) | |
| optimized_query = response.choices[0].message.content.strip() | |
| logger.info(f"Generated search query: {optimized_query}") | |
| return optimized_query | |
| except Exception as e: | |
| logger.error(f"Error generating search queries: {e}") | |
| # Fallback to using the original request if query generation fails | |
| return user_request | |
| def preprocess_query(self, query: str) -> str: | |
| """ | |
| Preprocess the user query to remove unnecessary elements and standardize format. | |
| Args: | |
| query (str): Original query string | |
| Returns: | |
| str: Cleaned query string | |
| """ | |
| try: | |
| # Convert to lowercase | |
| query = query.lower() | |
| # Remove extra whitespace | |
| query = ' '.join(query.split()) | |
| # Remove special characters except basic punctuation | |
| query = re.sub(r'[^a-z0-9\s\'".,?!-]', '', query) | |
| # Remove multiple punctuation marks | |
| query = re.sub(r'([.,?!])\1+', r'\1', query) | |
| # Ensure proper spacing around quotes | |
| query = re.sub(r'(?<=[^\s])"', ' "', query) | |
| query = re.sub(r'"(?=[^\s])', '" ', query) | |
| return query | |
| except Exception as e: | |
| logger.error(f"Error preprocessing query: {e}") | |
| return query | |
| # Initialize bot | |
| bot = RaindropSearchBot() | |
| # Create Gradio interface | |
| def chatbot_interface(user_input: str) -> str: | |
| return bot.process_request(user_input) | |
| def convert_to_markdown(output_text: str) -> gr.Markdown: | |
| try: | |
| # Create a new Gradio Markdown component with the output text | |
| output_textMarkdown = gr.Markdown( | |
| value=output_text, | |
| render=True, | |
| visible=True | |
| ) | |
| return output_textMarkdown | |
| except Exception as e: | |
| logger.error(f"Error converting to markdown: {e}") | |
| # Return error message as markdown if conversion fails | |
| return gr.Markdown( | |
| value="Error converting content to markdown format. Please try again.", | |
| visible=True | |
| ) | |
| # Define and launch the interface | |
| with gr.Blocks(title="Enhanced Search Assistant", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # ๐ Enhanced Search Assistant | |
| Enter your search request in natural language, and I'll find and analyze information from multiple sources: | |
| - Your bookmarked content | |
| - Web search results | |
| - Recent news articles | |
| """) | |
| with gr.Row(): | |
| input_text = gr.Textbox( | |
| label="What would you like to search for?", | |
| placeholder="Enter your search query here...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| searchbutton = gr.Button("๐ Search", variant="primary") | |
| with gr.Column(): | |
| with gr.Accordion("Editable version", open=False): | |
| with gr.Column(): | |
| output_text = gr.Textbox( | |
| label="Analysis and Results - editable", | |
| lines=20, | |
| interactive=True | |
| ) | |
| refreshbutton = gr.Button("Refresh", variant="primary") | |
| output_textMarkdown = gr.Markdown( | |
| label="Analysis and Results", | |
| height=600, | |
| max_height=800 | |
| ) | |
| searchbutton.click( | |
| fn=chatbot_interface, | |
| inputs=input_text, | |
| outputs=output_text | |
| ).then( | |
| fn=convert_to_markdown, | |
| inputs=output_text, | |
| outputs=output_textMarkdown) | |
| refreshbutton.click( | |
| fn=convert_to_markdown, | |
| inputs=output_text, | |
| outputs=output_textMarkdown) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |