NewsLetter / services /research_engine.py
SmartHeal's picture
Update services/research_engine.py
ac2933e verified
import os
import time
import logging
import random
import platform
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils.web_scraper import get_website_text_content
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
class ResearchEngine:
"""Enhanced research engine with improved scraping and error handling"""
_MAX_SCRAPERS: int = 5
def __init__(self):
self.google_api_key = os.getenv("GOOGLE_API_KEY", "").strip().replace("\\n", "").replace("\n", "")
self.google_cx = os.getenv("GOOGLE_CX", "").strip().replace("\\n", "").replace("\n", "")
if not (self.google_api_key and self.google_cx):
raise EnvironmentError("GOOGLE_API_KEY and GOOGLE_CX must be set (no trailing newlines)")
try:
self.google_service = build("customsearch", "v1", developerKey=self.google_api_key)
logging.info("Google Search service initialized successfully")
except Exception as e:
logging.error("Error initializing Google Custom Search: %s", e)
raise
self.session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[413, 429, 500, 502, 503, 504],
respect_retry_after_header=True
)
adapter = HTTPAdapter(pool_connections=self._MAX_SCRAPERS,
pool_maxsize=self._MAX_SCRAPERS,
max_retries=retries, pool_block=True)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0"
]
def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]:
now = time.time()
results = {
'google_results': [],
'news_results': [],
'scraped_content': [],
'sources': [],
'metadata': {}
}
try:
data = self._search_google(query, context)
results['google_results'] = data['items']
results['sources'].extend(data['sources'])
logging.info("Google search gave %d results", len(data['items']))
except Exception as e:
logging.error("Google Search API error: %s", e)
raise RuntimeError("Google Search API failure")
news = self._search_news_api(query)
if news:
results['news_results'] = news.get('articles', [])
results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')])
logging.info("Added %d news sources", len(results['news_results']))
unique_urls = list(dict.fromkeys(results['sources']))
urls = unique_urls[:self._MAX_SCRAPERS]
results['scraped_content'] = list(self._parallel_scrape(urls))
results['metadata'] = {
'search_timestamp': now,
'total_sources': len(results['sources']),
'scraped_count': len(results['scraped_content'])
}
return results
def _search_google(self, query: str, context: str) -> Dict[str, Any]:
professional_query = f"{query} {context}"
resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute()
items = []
sources = []
for x in resp.get("items", []):
title = x.get("title", "")
snippet = x.get("snippet", "")
link = x.get("link")
display = x.get("displayLink", "")
if link:
items.append({'title': title, 'snippet': snippet, 'link': link, 'displayLink': display})
sources.append(link)
return {'items': items, 'sources': sources}
def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]:
api_key = os.getenv("NEWS_API_KEY")
if not api_key:
logging.warning("No NEWS_API_KEY – skipping News search")
return None
try:
url = "https://newsapi.org/v2/everything"
params = {
'q': query,
'apiKey': api_key,
'sortBy': 'relevancy',
'pageSize': 20,
'language': 'en',
'from': time.strftime('%Y-%m-%d', time.gmtime(time.time() - 30 * 24 * 3600))
}
r = self.session.get(url, params=params, timeout=(3, 10))
r.raise_for_status()
return r.json()
except Exception as e:
logging.error("News API error: %s", e)
return None
def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]:
out = []
future_to_url = {}
def crawl(u: str):
headers = {
"User-Agent": random.choice(self.user_agents),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
try:
resp = self.session.get(u, headers=headers, timeout=(3, 10))
if resp.status_code == 403:
logging.warning("Scraping blocked (403) for %s", u)
return None
resp.raise_for_status()
if resp.encoding.lower() in ("iso-8859-1", "latin-1"):
resp.encoding = resp.apparent_encoding
text = resp.text or ""
if len(text) < 100:
return None
return {'url': u, 'content': text[:2000], 'timestamp': time.time()}
except requests.exceptions.ReadTimeout:
logging.warning("Timeout scraping %s", u)
except UnicodeEncodeError:
return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()}
except Exception as e:
logging.warning("Scraping failure for %s: %s", u, e)
return None
with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex:
for u in urls:
future_to_url[ex.submit(crawl, u)] = u
for f in as_completed(future_to_url):
res = f.result()
if res:
out.append(res)
time.sleep(random.uniform(0.5, 1.0))
return out
def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]:
data_points = []
for itm in research_results.get('google_results', []):
val = self._extract_numbers_and_stats(itm.get('snippet', ""))
if val:
data_points.append({
'value': val,
'source': itm.get('displayLink', ""),
'context': itm.get('snippet', ""),
'type': 'statistic'
})
for cnt in research_results.get('scraped_content', []):
val = self._extract_numbers_and_stats(cnt.get('content', ""))
if val:
data_points.append({
'value': val,
'source': cnt.get('url', ""),
'context': cnt.get('content', "")[:200],
'type': 'detailed_analysis'
})
return data_points[:10]
def _extract_numbers_and_stats(self, text: str) -> Optional[str]:
import re
patlist = [
r'\$[\d,]+(?:\.\d+)?(?:\s*(?:billion|million|trillion))?',
r'\d+(?:\.\d+)?%',
r'\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:\s*(?:billion|million|thousand))?'
]
for pat in patlist:
m = re.search(pat, text, re.IGNORECASE)
if m:
return m.group()
return None