SmartHeal commited on
Commit
ac2933e
·
verified ·
1 Parent(s): f38704f

Update services/research_engine.py

Browse files
Files changed (1) hide show
  1. services/research_engine.py +10 -24
services/research_engine.py CHANGED
@@ -16,7 +16,6 @@ from googleapiclient.errors import HttpError
16
  class ResearchEngine:
17
  """Enhanced research engine with improved scraping and error handling"""
18
 
19
- # Global number of simultaneous scrapes (must match adapter pool_maxsize)
20
  _MAX_SCRAPERS: int = 5
21
 
22
  def __init__(self):
@@ -33,7 +32,6 @@ class ResearchEngine:
33
  logging.error("Error initializing Google Custom Search: %s", e)
34
  raise
35
 
36
- # Prepare a single requests.Session with connection pool and retry/backoff:
37
  self.session = requests.Session()
38
  retries = Retry(
39
  total=3,
@@ -47,17 +45,13 @@ class ResearchEngine:
47
  self.session.mount("http://", adapter)
48
  self.session.mount("https://", adapter)
49
 
50
- # Preload a small list of realistic user‑agents
51
  self.user_agents = [
52
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
53
- "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
54
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) "
55
- "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
56
  "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0"
57
  ]
58
 
59
  def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]:
60
- """Search multiple sources and aggregate results"""
61
  now = time.time()
62
  results = {
63
  'google_results': [],
@@ -67,7 +61,6 @@ class ResearchEngine:
67
  'metadata': {}
68
  }
69
 
70
- # Google Search
71
  try:
72
  data = self._search_google(query, context)
73
  results['google_results'] = data['items']
@@ -77,15 +70,13 @@ class ResearchEngine:
77
  logging.error("Google Search API error: %s", e)
78
  raise RuntimeError("Google Search API failure")
79
 
80
- # News API (if available)
81
  news = self._search_news_api(query)
82
  if news:
83
  results['news_results'] = news.get('articles', [])
84
  results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')])
85
  logging.info("Added %d news sources", len(results['news_results']))
86
 
87
- # Scrape top N sources concurrently
88
- unique_urls = list(dict.fromkeys(results['sources'])) # drop duplicates
89
  urls = unique_urls[:self._MAX_SCRAPERS]
90
  results['scraped_content'] = list(self._parallel_scrape(urls))
91
 
@@ -97,11 +88,8 @@ class ResearchEngine:
97
  return results
98
 
99
  def _search_google(self, query: str, context: str) -> Dict[str, Any]:
100
- """Call Google Custom Search API for given query+context"""
101
  professional_query = f"{query} {context}"
102
- resp = self.google_service.cse() \
103
- .list(q=professional_query, cx=self.google_cx, num=10) \
104
- .execute()
105
 
106
  items = []
107
  sources = []
@@ -118,7 +106,6 @@ class ResearchEngine:
118
  return {'items': items, 'sources': sources}
119
 
120
  def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]:
121
- """Optionally call NewsAPI for current events (last 30 days relevancy)"""
122
  api_key = os.getenv("NEWS_API_KEY")
123
  if not api_key:
124
  logging.warning("No NEWS_API_KEY – skipping News search")
@@ -143,7 +130,6 @@ class ResearchEngine:
143
  return None
144
 
145
  def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]:
146
- """Fetch page content in parallel with rotating UA, timeouts, and quick failure on 403 or errors."""
147
  out = []
148
  future_to_url = {}
149
 
@@ -152,7 +138,7 @@ class ResearchEngine:
152
  "User-Agent": random.choice(self.user_agents),
153
  "Accept-Language": "en-US,en;q=0.9",
154
  "Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8",
155
- "Connection": "keepalive"
156
  }
157
  try:
158
  resp = self.session.get(u, headers=headers, timeout=(3, 10))
@@ -160,12 +146,16 @@ class ResearchEngine:
160
  logging.warning("Scraping blocked (403) for %s", u)
161
  return None
162
  resp.raise_for_status()
 
 
163
  text = resp.text or ""
164
  if len(text) < 100:
165
  return None
166
  return {'url': u, 'content': text[:2000], 'timestamp': time.time()}
167
  except requests.exceptions.ReadTimeout:
168
  logging.warning("Timeout scraping %s", u)
 
 
169
  except Exception as e:
170
  logging.warning("Scraping failure for %s: %s", u, e)
171
  return None
@@ -173,17 +163,15 @@ class ResearchEngine:
173
  with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex:
174
  for u in urls:
175
  future_to_url[ex.submit(crawl, u)] = u
176
-
177
  for f in as_completed(future_to_url):
178
  res = f.result()
179
  if res:
180
  out.append(res)
181
- time.sleep(random.uniform(0.5, 1.0)) # polite pacing
182
 
183
  return out
184
 
185
  def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]:
186
- """Extract top 10 value‑type stats from snippets and content (monetary, % etc.)"""
187
  data_points = []
188
  for itm in research_results.get('google_results', []):
189
  val = self._extract_numbers_and_stats(itm.get('snippet', ""))
@@ -194,7 +182,6 @@ class ResearchEngine:
194
  'context': itm.get('snippet', ""),
195
  'type': 'statistic'
196
  })
197
-
198
  for cnt in research_results.get('scraped_content', []):
199
  val = self._extract_numbers_and_stats(cnt.get('content', ""))
200
  if val:
@@ -204,7 +191,6 @@ class ResearchEngine:
204
  'context': cnt.get('content', "")[:200],
205
  'type': 'detailed_analysis'
206
  })
207
-
208
  return data_points[:10]
209
 
210
  def _extract_numbers_and_stats(self, text: str) -> Optional[str]:
 
16
  class ResearchEngine:
17
  """Enhanced research engine with improved scraping and error handling"""
18
 
 
19
  _MAX_SCRAPERS: int = 5
20
 
21
  def __init__(self):
 
32
  logging.error("Error initializing Google Custom Search: %s", e)
33
  raise
34
 
 
35
  self.session = requests.Session()
36
  retries = Retry(
37
  total=3,
 
45
  self.session.mount("http://", adapter)
46
  self.session.mount("https://", adapter)
47
 
 
48
  self.user_agents = [
49
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
50
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
 
 
51
  "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0"
52
  ]
53
 
54
  def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]:
 
55
  now = time.time()
56
  results = {
57
  'google_results': [],
 
61
  'metadata': {}
62
  }
63
 
 
64
  try:
65
  data = self._search_google(query, context)
66
  results['google_results'] = data['items']
 
70
  logging.error("Google Search API error: %s", e)
71
  raise RuntimeError("Google Search API failure")
72
 
 
73
  news = self._search_news_api(query)
74
  if news:
75
  results['news_results'] = news.get('articles', [])
76
  results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')])
77
  logging.info("Added %d news sources", len(results['news_results']))
78
 
79
+ unique_urls = list(dict.fromkeys(results['sources']))
 
80
  urls = unique_urls[:self._MAX_SCRAPERS]
81
  results['scraped_content'] = list(self._parallel_scrape(urls))
82
 
 
88
  return results
89
 
90
  def _search_google(self, query: str, context: str) -> Dict[str, Any]:
 
91
  professional_query = f"{query} {context}"
92
+ resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute()
 
 
93
 
94
  items = []
95
  sources = []
 
106
  return {'items': items, 'sources': sources}
107
 
108
  def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]:
 
109
  api_key = os.getenv("NEWS_API_KEY")
110
  if not api_key:
111
  logging.warning("No NEWS_API_KEY – skipping News search")
 
130
  return None
131
 
132
  def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]:
 
133
  out = []
134
  future_to_url = {}
135
 
 
138
  "User-Agent": random.choice(self.user_agents),
139
  "Accept-Language": "en-US,en;q=0.9",
140
  "Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8",
141
+ "Connection": "keep-alive"
142
  }
143
  try:
144
  resp = self.session.get(u, headers=headers, timeout=(3, 10))
 
146
  logging.warning("Scraping blocked (403) for %s", u)
147
  return None
148
  resp.raise_for_status()
149
+ if resp.encoding.lower() in ("iso-8859-1", "latin-1"):
150
+ resp.encoding = resp.apparent_encoding
151
  text = resp.text or ""
152
  if len(text) < 100:
153
  return None
154
  return {'url': u, 'content': text[:2000], 'timestamp': time.time()}
155
  except requests.exceptions.ReadTimeout:
156
  logging.warning("Timeout scraping %s", u)
157
+ except UnicodeEncodeError:
158
+ return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()}
159
  except Exception as e:
160
  logging.warning("Scraping failure for %s: %s", u, e)
161
  return None
 
163
  with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex:
164
  for u in urls:
165
  future_to_url[ex.submit(crawl, u)] = u
 
166
  for f in as_completed(future_to_url):
167
  res = f.result()
168
  if res:
169
  out.append(res)
170
+ time.sleep(random.uniform(0.5, 1.0))
171
 
172
  return out
173
 
174
  def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]:
 
175
  data_points = []
176
  for itm in research_results.get('google_results', []):
177
  val = self._extract_numbers_and_stats(itm.get('snippet', ""))
 
182
  'context': itm.get('snippet', ""),
183
  'type': 'statistic'
184
  })
 
185
  for cnt in research_results.get('scraped_content', []):
186
  val = self._extract_numbers_and_stats(cnt.get('content', ""))
187
  if val:
 
191
  'context': cnt.get('content', "")[:200],
192
  'type': 'detailed_analysis'
193
  })
 
194
  return data_points[:10]
195
 
196
  def _extract_numbers_and_stats(self, text: str) -> Optional[str]: