import os import time import logging import random import platform from typing import Dict, List, Optional, Any from concurrent.futures import ThreadPoolExecutor, as_completed from utils.web_scraper import get_website_text_content import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from googleapiclient.discovery import build from googleapiclient.errors import HttpError class ResearchEngine: """Enhanced research engine with improved scraping and error handling""" _MAX_SCRAPERS: int = 5 def __init__(self): self.google_api_key = os.getenv("GOOGLE_API_KEY", "").strip().replace("\\n", "").replace("\n", "") self.google_cx = os.getenv("GOOGLE_CX", "").strip().replace("\\n", "").replace("\n", "") if not (self.google_api_key and self.google_cx): raise EnvironmentError("GOOGLE_API_KEY and GOOGLE_CX must be set (no trailing newlines)") try: self.google_service = build("customsearch", "v1", developerKey=self.google_api_key) logging.info("Google Search service initialized successfully") except Exception as e: logging.error("Error initializing Google Custom Search: %s", e) raise self.session = requests.Session() retries = Retry( total=3, backoff_factor=0.5, status_forcelist=[413, 429, 500, 502, 503, 504], respect_retry_after_header=True ) adapter = HTTPAdapter(pool_connections=self._MAX_SCRAPERS, pool_maxsize=self._MAX_SCRAPERS, max_retries=retries, pool_block=True) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0" ] def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]: now = time.time() results = { 'google_results': [], 'news_results': [], 'scraped_content': [], 'sources': [], 'metadata': {} } try: data = self._search_google(query, context) results['google_results'] = data['items'] results['sources'].extend(data['sources']) logging.info("Google search gave %d results", len(data['items'])) except Exception as e: logging.error("Google Search API error: %s", e) raise RuntimeError("Google Search API failure") news = self._search_news_api(query) if news: results['news_results'] = news.get('articles', []) results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')]) logging.info("Added %d news sources", len(results['news_results'])) unique_urls = list(dict.fromkeys(results['sources'])) urls = unique_urls[:self._MAX_SCRAPERS] results['scraped_content'] = list(self._parallel_scrape(urls)) results['metadata'] = { 'search_timestamp': now, 'total_sources': len(results['sources']), 'scraped_count': len(results['scraped_content']) } return results def _search_google(self, query: str, context: str) -> Dict[str, Any]: professional_query = f"{query} {context}" resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute() items = [] sources = [] for x in resp.get("items", []): title = x.get("title", "") snippet = x.get("snippet", "") link = x.get("link") display = x.get("displayLink", "") if link: items.append({'title': title, 'snippet': snippet, 'link': link, 'displayLink': display}) sources.append(link) return {'items': items, 'sources': sources} def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]: api_key = os.getenv("NEWS_API_KEY") if not api_key: logging.warning("No NEWS_API_KEY – skipping News search") return None try: url = "https://newsapi.org/v2/everything" params = { 'q': query, 'apiKey': api_key, 'sortBy': 'relevancy', 'pageSize': 20, 'language': 'en', 'from': time.strftime('%Y-%m-%d', time.gmtime(time.time() - 30 * 24 * 3600)) } r = self.session.get(url, params=params, timeout=(3, 10)) r.raise_for_status() return r.json() except Exception as e: logging.error("News API error: %s", e) return None def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]: out = [] future_to_url = {} def crawl(u: str): headers = { "User-Agent": random.choice(self.user_agents), "Accept-Language": "en-US,en;q=0.9", "Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8", "Connection": "keep-alive" } try: resp = self.session.get(u, headers=headers, timeout=(3, 10)) if resp.status_code == 403: logging.warning("Scraping blocked (403) for %s", u) return None resp.raise_for_status() if resp.encoding.lower() in ("iso-8859-1", "latin-1"): resp.encoding = resp.apparent_encoding text = resp.text or "" if len(text) < 100: return None return {'url': u, 'content': text[:2000], 'timestamp': time.time()} except requests.exceptions.ReadTimeout: logging.warning("Timeout scraping %s", u) except UnicodeEncodeError: return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()} except Exception as e: logging.warning("Scraping failure for %s: %s", u, e) return None with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex: for u in urls: future_to_url[ex.submit(crawl, u)] = u for f in as_completed(future_to_url): res = f.result() if res: out.append(res) time.sleep(random.uniform(0.5, 1.0)) return out def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]: data_points = [] for itm in research_results.get('google_results', []): val = self._extract_numbers_and_stats(itm.get('snippet', "")) if val: data_points.append({ 'value': val, 'source': itm.get('displayLink', ""), 'context': itm.get('snippet', ""), 'type': 'statistic' }) for cnt in research_results.get('scraped_content', []): val = self._extract_numbers_and_stats(cnt.get('content', "")) if val: data_points.append({ 'value': val, 'source': cnt.get('url', ""), 'context': cnt.get('content', "")[:200], 'type': 'detailed_analysis' }) return data_points[:10] def _extract_numbers_and_stats(self, text: str) -> Optional[str]: import re patlist = [ r'\$[\d,]+(?:\.\d+)?(?:\s*(?:billion|million|trillion))?', r'\d+(?:\.\d+)?%', r'\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:\s*(?:billion|million|thousand))?' ] for pat in patlist: m = re.search(pat, text, re.IGNORECASE) if m: return m.group() return None