| import asyncio |
| import aiohttp |
| import time |
| import json |
| import math |
| import certifi |
| import ssl |
| from cache import connect_to_sheet, load_cache_dict, append_to_cache |
| import re |
| from urllib.parse import urlparse |
|
|
| |
| BASE_URL = "https://google.serper.dev/news" |
| RESULTS_PER_PAGE = 100 |
| MAX_CONCURRENCY = 10 |
|
|
|
|
| |
| async def fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name=""): |
| articles = [] |
| headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'} |
| time_filter = f"qdr:d{days_back}" |
|
|
| |
| |
| required_pages = math.ceil(max_articles / RESULTS_PER_PAGE) |
|
|
| async with sem: |
| print(f"--> Starting: {topic}") |
|
|
| if country_name and country_name != "Global": |
| query = f"{topic} {country_name}" |
| else: |
| query = topic |
| for page in range(1, required_pages + 1): |
| payload = { |
| "q": query, |
| "gl": geo_code, |
| "tbs": time_filter, |
| "num": RESULTS_PER_PAGE, |
| "page": page |
| } |
|
|
| try: |
| async with session.post(BASE_URL, headers=headers, json=payload) as resp: |
| if resp.status != 200: |
| print(f" x Error {topic} (Page {page}): Status {resp.status}") |
| break |
|
|
| data = await resp.json() |
| new_news = data.get("news", []) |
|
|
| if not new_news: |
| break |
|
|
| articles.extend(new_news) |
|
|
| |
| if len(articles) >= max_articles: |
| articles = articles[:max_articles] |
| break |
|
|
| except Exception as e: |
| print(f" x Exception {topic}: {e}") |
| break |
|
|
| print(f"โ
Finished: {topic} ({len(articles)} articles)") |
| return articles |
|
|
|
|
| |
| async def start_async_search(topics: list, geo_code: str, days_back: int, max_articles: int, api_key: str, |
| country_name: str): |
| start_time = time.time() |
|
|
| |
| sem = asyncio.Semaphore(MAX_CONCURRENCY) |
| ssl_context = ssl.create_default_context(cafile=certifi.where()) |
| connector = aiohttp.TCPConnector(ssl=ssl_context) |
|
|
| |
|
|
| |
| async with aiohttp.ClientSession(connector=connector) as session: |
| tasks = [ |
| fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name) |
| for topic in topics |
| ] |
| results = await asyncio.gather(*tasks) |
|
|
| |
| |
| unique_articles_map = {} |
|
|
| for topic_articles in results: |
| for article in topic_articles: |
| link = article.get('link') |
| if link and link not in unique_articles_map: |
| unique_articles_map[link] = article |
|
|
| final_articles_list = list(unique_articles_map.values()) |
|
|
| |
| with open("unique_articles.json", "w", encoding="utf-8") as f: |
| json.dump(final_articles_list, f, indent=2, ensure_ascii=False) |
|
|
| print("\n" + "=" * 40) |
| print(f"Total Time: {time.time() - start_time:.2f} seconds") |
| print(f"Total Unique Articles: {len(final_articles_list)}") |
| print("=" * 40) |
|
|
| return final_articles_list |
|
|
|
|
| def search_news(topic_list, geo_code, days_back, max_news, SERPER_API_KEY, country_name): |
| articles = asyncio.run(start_async_search( |
| topics=topic_list, |
| geo_code=geo_code, |
| days_back=days_back, |
| max_articles=max_news, |
| api_key=SERPER_API_KEY, |
| country_name=country_name |
| )) |
|
|
| |
| print(f"Search_news captured {len(articles)} articles.") |
| if articles: |
| print(f"Sample title: {articles[0].get('title')}") |
|
|
| return articles |
|
|
|
|
| |
| async def fetch_url_from_serper(session, company_name, api_key): |
| """ |
| Async worker: specific search for one company. |
| """ |
| url = "https://google.serper.dev/search" |
| payload = json.dumps({"q": f"{company_name} official website", "num": 1}) |
|
|
| headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'} |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| def clean(text): |
| return re.sub(r'\W+', '', text).lower() |
|
|
| target_name = clean(company_name) |
|
|
| |
| blacklist = ["wikipedia", "linkedin", "bloomberg", "crunchbase", "facebook", "instagram", "youtube"] |
|
|
| try: |
| async with session.post(url, headers=headers, data=payload) as response: |
| if response.status == 200: |
| data = await response.json() |
| if "organic" not in data: |
| return "" |
|
|
| results = data["organic"] |
|
|
| |
| for res in results: |
| link = res.get("link", "") |
| domain = urlparse(link).netloc.lower() |
|
|
| |
| if any(b in domain for b in blacklist): |
| continue |
|
|
| |
| |
| if target_name in clean(domain): |
| return link |
|
|
| |
| |
| for res in results: |
| snippet = res.get("snippet", "") |
| |
| hidden_urls = re.findall(r'(?:www\.|https?://)[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', snippet) |
|
|
| for hidden in hidden_urls: |
| |
| if target_name in clean(hidden): |
| |
| if not hidden.startswith("http"): |
| return f"https://{hidden}" |
| return hidden |
|
|
| |
| |
| for res in results: |
| link = res.get("link", "") |
| if not any(b in link for b in blacklist): |
| return link |
|
|
| except Exception as e: |
| print(f"โ ๏ธ Serper error for {company_name}: {e}") |
|
|
| return "" |
|
|
|
|
| async def run_batch_search(company_names, api_key): |
| """ |
| Orchestrator: runs all searches in parallel with SECURE SSL CONTEXT. |
| """ |
| results = {} |
|
|
| |
| |
| ssl_context = ssl.create_default_context(cafile=certifi.where()) |
| connector = aiohttp.TCPConnector(ssl=ssl_context) |
| |
|
|
| |
| async with aiohttp.ClientSession(connector=connector) as session: |
| tasks = [] |
| for name in company_names: |
| tasks.append(fetch_url_from_serper(session, name, api_key)) |
|
|
| |
| urls = await asyncio.gather(*tasks) |
|
|
| for name, url in zip(company_names, urls): |
| results[name] = url |
|
|
| return results |
|
|
|
|
| def fill_missing_urls(data_list, sheet_name, serper_api_key): |
| """ |
| Main function to process the data list. |
| 1. Checks Cache |
| 2. Searches Serper for missing |
| 3. Updates Data |
| 4. Saves new finds to Cache |
| """ |
|
|
| |
| |
| target_indices = [i for i, row in enumerate(data_list) if row.get('company_url') == 'SEARCH_REQUIRED'] |
|
|
| if not target_indices: |
| print("โ
No searches required.") |
| return data_list |
|
|
| print(f"๐ Processing {len(target_indices)} missing URLs...") |
|
|
| |
| companies_to_resolve = {data_list[i]['company_name'] for i in target_indices} |
|
|
| |
| try: |
| sheet = connect_to_sheet(sheet_name) |
| cache_dict = load_cache_dict(sheet) |
| except Exception as e: |
| print(f"โ ๏ธ Cache connection failed, skipping cache: {e}") |
| sheet = None |
| cache_dict = {} |
|
|
| |
| found_in_cache = {} |
| missing_from_cache = [] |
|
|
| for company in companies_to_resolve: |
| norm_name = company.lower().strip() |
| if norm_name in cache_dict: |
| found_in_cache[company] = cache_dict[norm_name] |
| else: |
| missing_from_cache.append(company) |
|
|
| print(f" - Found in Cache: {len(found_in_cache)}") |
| print(f" - Need API Search: {len(missing_from_cache)}") |
|
|
| |
| search_results = {} |
| if missing_from_cache: |
| print(f"๐ Searching internet for {len(missing_from_cache)} companies...") |
| search_results = asyncio.run(run_batch_search(missing_from_cache, serper_api_key)) |
|
|
| |
| |
| full_knowledge_base = {**found_in_cache, **search_results} |
|
|
| for i in target_indices: |
| comp_name = data_list[i]['company_name'] |
| |
| url = full_knowledge_base.get(comp_name, "") |
| data_list[i]['company_url'] = url |
|
|
| |
| if sheet and search_results: |
| |
| new_cache_entries = [ |
| {'Company': name, 'Website': url} |
| for name, url in search_results.items() |
| if url |
| ] |
| append_to_cache(sheet, new_cache_entries) |
| else: |
| print('Nothing appended to cache') |
|
|
| return data_list |
|
|