File size: 8,316 Bytes
a19173c
 
 
f38704f
 
 
 
a19173c
 
f38704f
 
 
 
 
b98cf69
a19173c
f38704f
 
 
b98cf69
a19173c
f38704f
 
 
 
 
b98cf69
f38704f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac2933e
 
f38704f
 
 
 
 
a19173c
 
b0776ed
a19173c
 
b0776ed
a19173c
b98cf69
f38704f
 
 
 
 
 
 
 
b98cf69
f38704f
 
 
 
 
b98cf69
ac2933e
f38704f
 
b98cf69
a19173c
f38704f
a19173c
 
 
 
b98cf69
f38704f
 
ac2933e
f38704f
 
 
 
 
 
 
 
 
 
 
b0776ed
b98cf69
f38704f
b98cf69
f38704f
a19173c
 
f38704f
a19173c
b98cf69
a19173c
 
 
 
 
 
 
 
f38704f
a19173c
f38704f
 
 
 
a19173c
f38704f
 
b98cf69
f38704f
 
 
 
 
 
 
 
 
ac2933e
f38704f
b98cf69
f38704f
 
 
 
 
ac2933e
 
f38704f
 
b98cf69
f38704f
 
 
ac2933e
 
b98cf69
f38704f
b98cf69
 
f38704f
 
 
 
 
 
 
ac2933e
b98cf69
f38704f
 
 
a19173c
b0776ed
 
 
a19173c
b0776ed
 
 
a19173c
 
b0776ed
 
 
a19173c
b0776ed
 
 
a19173c
 
b98cf69
 
a19173c
 
f38704f
b98cf69
 
f38704f
a19173c
f38704f
b0776ed
 
 
b98cf69
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import os
import time
import logging
import random
import platform
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils.web_scraper import get_website_text_content

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

class ResearchEngine:
    """Enhanced research engine with improved scraping and error handling"""

    _MAX_SCRAPERS: int = 5

    def __init__(self):
        self.google_api_key = os.getenv("GOOGLE_API_KEY", "").strip().replace("\\n", "").replace("\n", "")
        self.google_cx = os.getenv("GOOGLE_CX", "").strip().replace("\\n", "").replace("\n", "")

        if not (self.google_api_key and self.google_cx):
            raise EnvironmentError("GOOGLE_API_KEY and GOOGLE_CX must be set (no trailing newlines)")

        try:
            self.google_service = build("customsearch", "v1", developerKey=self.google_api_key)
            logging.info("Google Search service initialized successfully")
        except Exception as e:
            logging.error("Error initializing Google Custom Search: %s", e)
            raise

        self.session = requests.Session()
        retries = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=[413, 429, 500, 502, 503, 504],
            respect_retry_after_header=True
        )
        adapter = HTTPAdapter(pool_connections=self._MAX_SCRAPERS,
                              pool_maxsize=self._MAX_SCRAPERS,
                              max_retries=retries, pool_block=True)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
            "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:126.0) Gecko/20100101 Firefox/126.0"
        ]

    def search_multiple_sources(self, query: str, context: str) -> Dict[str, Any]:
        now = time.time()
        results = {
            'google_results': [],
            'news_results': [],
            'scraped_content': [],
            'sources': [],
            'metadata': {}
        }

        try:
            data = self._search_google(query, context)
            results['google_results'] = data['items']
            results['sources'].extend(data['sources'])
            logging.info("Google search gave %d results", len(data['items']))
        except Exception as e:
            logging.error("Google Search API error: %s", e)
            raise RuntimeError("Google Search API failure")

        news = self._search_news_api(query)
        if news:
            results['news_results'] = news.get('articles', [])
            results['sources'].extend([a.get('url') for a in news['articles'] if a.get('url')])
            logging.info("Added %d news sources", len(results['news_results']))

        unique_urls = list(dict.fromkeys(results['sources']))
        urls = unique_urls[:self._MAX_SCRAPERS]
        results['scraped_content'] = list(self._parallel_scrape(urls))

        results['metadata'] = {
            'search_timestamp': now,
            'total_sources': len(results['sources']),
            'scraped_count': len(results['scraped_content'])
        }
        return results

    def _search_google(self, query: str, context: str) -> Dict[str, Any]:
        professional_query = f"{query} {context}"
        resp = self.google_service.cse().list(q=professional_query, cx=self.google_cx, num=10).execute()

        items = []
        sources = []

        for x in resp.get("items", []):
            title = x.get("title", "")
            snippet = x.get("snippet", "")
            link = x.get("link")
            display = x.get("displayLink", "")
            if link:
                items.append({'title': title, 'snippet': snippet, 'link': link, 'displayLink': display})
                sources.append(link)

        return {'items': items, 'sources': sources}

    def _search_news_api(self, query: str) -> Optional[Dict[str, Any]]:
        api_key = os.getenv("NEWS_API_KEY")
        if not api_key:
            logging.warning("No NEWS_API_KEY – skipping News search")
            return None

        try:
            url = "https://newsapi.org/v2/everything"
            params = {
                'q': query,
                'apiKey': api_key,
                'sortBy': 'relevancy',
                'pageSize': 20,
                'language': 'en',
                'from': time.strftime('%Y-%m-%d', time.gmtime(time.time() - 30 * 24 * 3600))
            }
            r = self.session.get(url, params=params, timeout=(3, 10))
            r.raise_for_status()
            return r.json()

        except Exception as e:
            logging.error("News API error: %s", e)
            return None

    def _parallel_scrape(self, urls: List[str]) -> List[Dict[str, Any]]:
        out = []
        future_to_url = {}

        def crawl(u: str):
            headers = {
                "User-Agent": random.choice(self.user_agents),
                "Accept-Language": "en-US,en;q=0.9",
                "Accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8",
                "Connection": "keep-alive"
            }
            try:
                resp = self.session.get(u, headers=headers, timeout=(3, 10))
                if resp.status_code == 403:
                    logging.warning("Scraping blocked (403) for %s", u)
                    return None
                resp.raise_for_status()
                if resp.encoding.lower() in ("iso-8859-1", "latin-1"):
                    resp.encoding = resp.apparent_encoding
                text = resp.text or ""
                if len(text) < 100:
                    return None
                return {'url': u, 'content': text[:2000], 'timestamp': time.time()}
            except requests.exceptions.ReadTimeout:
                logging.warning("Timeout scraping %s", u)
            except UnicodeEncodeError:
                return {'url': u, 'content': resp.content.decode('utf-8', errors='replace')[:2000], 'timestamp': time.time()}
            except Exception as e:
                logging.warning("Scraping failure for %s: %s", u, e)
            return None

        with ThreadPoolExecutor(max_workers=self._MAX_SCRAPERS) as ex:
            for u in urls:
                future_to_url[ex.submit(crawl, u)] = u
            for f in as_completed(future_to_url):
                res = f.result()
                if res:
                    out.append(res)
                time.sleep(random.uniform(0.5, 1.0))

        return out

    def extract_key_data_points(self, research_results: Dict[str, Any]) -> List[Dict[str, Any]]:
        data_points = []
        for itm in research_results.get('google_results', []):
            val = self._extract_numbers_and_stats(itm.get('snippet', ""))
            if val:
                data_points.append({
                    'value': val,
                    'source': itm.get('displayLink', ""),
                    'context': itm.get('snippet', ""),
                    'type': 'statistic'
                })
        for cnt in research_results.get('scraped_content', []):
            val = self._extract_numbers_and_stats(cnt.get('content', ""))
            if val:
                data_points.append({
                    'value': val,
                    'source': cnt.get('url', ""),
                    'context': cnt.get('content', "")[:200],
                    'type': 'detailed_analysis'
                })
        return data_points[:10]

    def _extract_numbers_and_stats(self, text: str) -> Optional[str]:
        import re
        patlist = [
            r'\$[\d,]+(?:\.\d+)?(?:\s*(?:billion|million|trillion))?',
            r'\d+(?:\.\d+)?%',
            r'\d{1,3}(?:,\d{3})*(?:\.\d+)?(?:\s*(?:billion|million|thousand))?'
        ]
        for pat in patlist:
            m = re.search(pat, text, re.IGNORECASE)
            if m:
                return m.group()
        return None