Spaces:
Sleeping
Sleeping
File size: 8,316 Bytes
a19173c f38704f a19173c f38704f b98cf69 a19173c f38704f b98cf69 a19173c f38704f b98cf69 f38704f ac2933e f38704f a19173c b0776ed a19173c b0776ed a19173c b98cf69 f38704f b98cf69 f38704f b98cf69 ac2933e f38704f b98cf69 a19173c f38704f a19173c b98cf69 f38704f ac2933e f38704f b0776ed b98cf69 f38704f b98cf69 f38704f a19173c f38704f a19173c b98cf69 a19173c f38704f a19173c f38704f a19173c f38704f b98cf69 f38704f ac2933e f38704f b98cf69 f38704f ac2933e f38704f b98cf69 f38704f ac2933e b98cf69 f38704f b98cf69 f38704f ac2933e b98cf69 f38704f a19173c b0776ed a19173c b0776ed a19173c b0776ed a19173c b0776ed a19173c b98cf69 a19173c f38704f b98cf69 f38704f a19173c f38704f b0776ed b98cf69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | import os
import time
import logging
import random
import platform
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils.web_scraper import get_website_text_content
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
class ResearchEngine:
"""Enhanced research engine with improved scraping and error handling"""
_MAX_SCRAPERS: int = 5
def __init__(self):
self.google_api_key = os.getenv("GOOGLE_API_KEY", "").strip().replace("\\n", "").replace("\n", "")
self.google_cx = os.getenv("GOOGLE_CX", "").strip().replace("\\n", "").replace("\n", "")
if not (self.google_api_key and self.google_cx):
raise EnvironmentError("GOOGLE_API_KEY and GOOGLE_CX must be set (no trailing newlines)")
try:
self.google_service = build("customsearch", "v1", developerKey=self.google_api_key)
logging.info("Google Search service initialized successfully")
except Exception as e:
logging.error("Error initializing Google Custom Search: %s", e)
raise
self.session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[413, 429, 500, 502, 503, 504],
respect_retry_after_header=True
)
adapter = HTTPAdapter(pool_connections=self._MAX_SCRAPERS,
pool_maxsize=self._MAX_SCRAPERS,
max_retries=retries, pool_block=True)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0"
]
def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]:
now = time.time()
results = {
'google_results': [],
'news_results': [],
'scraped_content': [],
'sources': [],
'metadata': {}
}
try:
data = self._search_google(query, context)
results['google_results'] = data['items']
results['sources'].extend(data['sources'])
logging.info("Google search gave %d results", len(data['items']))
except Exception as e:
logging.error("Google Search API error: %s", e)
raise RuntimeError("Google Search API failure")
news = self._search_news_api(query)
if news:
results['news_results'] = news.get('articles', [])
results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')])
logging.info("Added %d news sources", len(results['news_results']))
unique_urls = list(dict.fromkeys(results['sources']))
urls = unique_urls[:self._MAX_SCRAPERS]
results['scraped_content'] = list(self._parallel_scrape(urls))
results['metadata'] = {
'search_timestamp': now,
'total_sources': len(results['sources']),
'scraped_count': len(results['scraped_content'])
}
return results
def _search_google(self, query: str, context: str) -> Dict[str, Any]:
professional_query = f"{query} {context}"
resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute()
items = []
sources = []
for x in resp.get("items", []):
title = x.get("title", "")
snippet = x.get("snippet", "")
link = x.get("link")
display = x.get("displayLink", "")
if link:
items.append({'title': title, 'snippet': snippet, 'link': link, 'displayLink': display})
sources.append(link)
return {'items': items, 'sources': sources}
def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]:
api_key = os.getenv("NEWS_API_KEY")
if not api_key:
logging.warning("No NEWS_API_KEY – skipping News search")
return None
try:
url = "https://newsapi.org/v2/everything"
params = {
'q': query,
'apiKey': api_key,
'sortBy': 'relevancy',
'pageSize': 20,
'language': 'en',
'from': time.strftime('%Y-%m-%d', time.gmtime(time.time() - 30 * 24 * 3600))
}
r = self.session.get(url, params=params, timeout=(3, 10))
r.raise_for_status()
return r.json()
except Exception as e:
logging.error("News API error: %s", e)
return None
def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]:
out = []
future_to_url = {}
def crawl(u: str):
headers = {
"User-Agent": random.choice(self.user_agents),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
try:
resp = self.session.get(u, headers=headers, timeout=(3, 10))
if resp.status_code == 403:
logging.warning("Scraping blocked (403) for %s", u)
return None
resp.raise_for_status()
if resp.encoding.lower() in ("iso-8859-1", "latin-1"):
resp.encoding = resp.apparent_encoding
text = resp.text or ""
if len(text) < 100:
return None
return {'url': u, 'content': text[:2000], 'timestamp': time.time()}
except requests.exceptions.ReadTimeout:
logging.warning("Timeout scraping %s", u)
except UnicodeEncodeError:
return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()}
except Exception as e:
logging.warning("Scraping failure for %s: %s", u, e)
return None
with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex:
for u in urls:
future_to_url[ex.submit(crawl, u)] = u
for f in as_completed(future_to_url):
res = f.result()
if res:
out.append(res)
time.sleep(random.uniform(0.5, 1.0))
return out
def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]:
data_points = []
for itm in research_results.get('google_results', []):
val = self._extract_numbers_and_stats(itm.get('snippet', ""))
if val:
data_points.append({
'value': val,
'source': itm.get('displayLink', ""),
'context': itm.get('snippet', ""),
'type': 'statistic'
})
for cnt in research_results.get('scraped_content', []):
val = self._extract_numbers_and_stats(cnt.get('content', ""))
if val:
data_points.append({
'value': val,
'source': cnt.get('url', ""),
'context': cnt.get('content', "")[:200],
'type': 'detailed_analysis'
})
return data_points[:10]
def _extract_numbers_and_stats(self, text: str) -> Optional[str]:
import re
patlist = [
r'\$[\d,]+(?:\.\d+)?(?:\s*(?:billion|million|trillion))?',
r'\d+(?:\.\d+)?%',
r'\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:\s*(?:billion|million|thousand))?'
]
for pat in patlist:
m = re.search(pat, text, re.IGNORECASE)
if m:
return m.group()
return None |