Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import logging | |
| import random | |
| import platform | |
| from typing import Dict, List, Optional, Any | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from utils.web_scraper import get_website_text_content | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| class ResearchEngine: | |
| """Enhanced research engine with improved scraping and error handling""" | |
| _MAX_SCRAPERS: int = 5 | |
| def __init__(self): | |
| self.google_api_key = os.getenv("GOOGLE_API_KEY", "").strip().replace("\\n", "").replace("\n", "") | |
| self.google_cx = os.getenv("GOOGLE_CX", "").strip().replace("\\n", "").replace("\n", "") | |
| if not (self.google_api_key and self.google_cx): | |
| raise EnvironmentError("GOOGLE_API_KEY and GOOGLE_CX must be set (no trailing newlines)") | |
| try: | |
| self.google_service = build("customsearch", "v1", developerKey=self.google_api_key) | |
| logging.info("Google Search service initialized successfully") | |
| except Exception as e: | |
| logging.error("Error initializing Google Custom Search: %s", e) | |
| raise | |
| self.session = requests.Session() | |
| retries = Retry( | |
| total=3, | |
| backoff_factor=0.5, | |
| status_forcelist=[413, 429, 500, 502, 503, 504], | |
| respect_retry_after_header=True | |
| ) | |
| adapter = HTTPAdapter(pool_connections=self._MAX_SCRAPERS, | |
| pool_maxsize=self._MAX_SCRAPERS, | |
| max_retries=retries, pool_block=True) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| self.user_agents = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0" | |
| ] | |
| def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]: | |
| now = time.time() | |
| results = { | |
| 'google_results': [], | |
| 'news_results': [], | |
| 'scraped_content': [], | |
| 'sources': [], | |
| 'metadata': {} | |
| } | |
| try: | |
| data = self._search_google(query, context) | |
| results['google_results'] = data['items'] | |
| results['sources'].extend(data['sources']) | |
| logging.info("Google search gave %d results", len(data['items'])) | |
| except Exception as e: | |
| logging.error("Google Search API error: %s", e) | |
| raise RuntimeError("Google Search API failure") | |
| news = self._search_news_api(query) | |
| if news: | |
| results['news_results'] = news.get('articles', []) | |
| results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')]) | |
| logging.info("Added %d news sources", len(results['news_results'])) | |
| unique_urls = list(dict.fromkeys(results['sources'])) | |
| urls = unique_urls[:self._MAX_SCRAPERS] | |
| results['scraped_content'] = list(self._parallel_scrape(urls)) | |
| results['metadata'] = { | |
| 'search_timestamp': now, | |
| 'total_sources': len(results['sources']), | |
| 'scraped_count': len(results['scraped_content']) | |
| } | |
| return results | |
| def _search_google(self, query: str, context: str) -> Dict[str, Any]: | |
| professional_query = f"{query} {context}" | |
| resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute() | |
| items = [] | |
| sources = [] | |
| for x in resp.get("items", []): | |
| title = x.get("title", "") | |
| snippet = x.get("snippet", "") | |
| link = x.get("link") | |
| display = x.get("displayLink", "") | |
| if link: | |
| items.append({'title': title, 'snippet': snippet, 'link': link, 'displayLink': display}) | |
| sources.append(link) | |
| return {'items': items, 'sources': sources} | |
| def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]: | |
| api_key = os.getenv("NEWS_API_KEY") | |
| if not api_key: | |
| logging.warning("No NEWS_API_KEY – skipping News search") | |
| return None | |
| try: | |
| url = "https://newsapi.org/v2/everything" | |
| params = { | |
| 'q': query, | |
| 'apiKey': api_key, | |
| 'sortBy': 'relevancy', | |
| 'pageSize': 20, | |
| 'language': 'en', | |
| 'from': time.strftime('%Y-%m-%d', time.gmtime(time.time() - 30 * 24 * 3600)) | |
| } | |
| r = self.session.get(url, params=params, timeout=(3, 10)) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| logging.error("News API error: %s", e) | |
| return None | |
| def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]: | |
| out = [] | |
| future_to_url = {} | |
| def crawl(u: str): | |
| headers = { | |
| "User-Agent": random.choice(self.user_agents), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Connection": "keep-alive" | |
| } | |
| try: | |
| resp = self.session.get(u, headers=headers, timeout=(3, 10)) | |
| if resp.status_code == 403: | |
| logging.warning("Scraping blocked (403) for %s", u) | |
| return None | |
| resp.raise_for_status() | |
| if resp.encoding.lower() in ("iso-8859-1", "latin-1"): | |
| resp.encoding = resp.apparent_encoding | |
| text = resp.text or "" | |
| if len(text) < 100: | |
| return None | |
| return {'url': u, 'content': text[:2000], 'timestamp': time.time()} | |
| except requests.exceptions.ReadTimeout: | |
| logging.warning("Timeout scraping %s", u) | |
| except UnicodeEncodeError: | |
| return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()} | |
| except Exception as e: | |
| logging.warning("Scraping failure for %s: %s", u, e) | |
| return None | |
| with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex: | |
| for u in urls: | |
| future_to_url[ex.submit(crawl, u)] = u | |
| for f in as_completed(future_to_url): | |
| res = f.result() | |
| if res: | |
| out.append(res) | |
| time.sleep(random.uniform(0.5, 1.0)) | |
| return out | |
| def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| data_points = [] | |
| for itm in research_results.get('google_results', []): | |
| val = self._extract_numbers_and_stats(itm.get('snippet', "")) | |
| if val: | |
| data_points.append({ | |
| 'value': val, | |
| 'source': itm.get('displayLink', ""), | |
| 'context': itm.get('snippet', ""), | |
| 'type': 'statistic' | |
| }) | |
| for cnt in research_results.get('scraped_content', []): | |
| val = self._extract_numbers_and_stats(cnt.get('content', "")) | |
| if val: | |
| data_points.append({ | |
| 'value': val, | |
| 'source': cnt.get('url', ""), | |
| 'context': cnt.get('content', "")[:200], | |
| 'type': 'detailed_analysis' | |
| }) | |
| return data_points[:10] | |
| def _extract_numbers_and_stats(self, text: str) -> Optional[str]: | |
| import re | |
| patlist = [ | |
| r'\$[\d,]+(?:\.\d+)?(?:\s*(?:billion|million|trillion))?', | |
| r'\d+(?:\.\d+)?%', | |
| r'\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:\s*(?:billion|million|thousand))?' | |
| ] | |
| for pat in patlist: | |
| m = re.search(pat, text, re.IGNORECASE) | |
| if m: | |
| return m.group() | |
| return None |