import asyncio import aiohttp import time import json import math import certifi import ssl from cache import connect_to_sheet, load_cache_dict, append_to_cache import re from urllib.parse import urlparse # --- CONFIGURATION --- BASE_URL = "https://google.serper.dev/news" RESULTS_PER_PAGE = 100 # Serper max per request MAX_CONCURRENCY = 10 # Avoid 429 errors # --- WORKER: Fetch articles for one topic --- async def fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name=""): articles = [] headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'} time_filter = f"qdr:d{days_back}" # Calculate how many pages we need based on max_articles # e.g., if max_articles=50, we need 1 page. If 150, we need 2 pages. required_pages = math.ceil(max_articles / RESULTS_PER_PAGE) async with sem: print(f"--> Starting: {topic}") if country_name and country_name != "Global": query = f"{topic} {country_name}" else: query = topic for page in range(1, required_pages + 1): payload = { "q": query, "gl": geo_code, "tbs": time_filter, "num": RESULTS_PER_PAGE, "page": page } try: async with session.post(BASE_URL, headers=headers, json=payload) as resp: if resp.status != 200: print(f" x Error {topic} (Page {page}): Status {resp.status}") break data = await resp.json() new_news = data.get("news", []) if not new_news: break # No more results articles.extend(new_news) # Stop if we have reached the requested limit for this topic if len(articles) >= max_articles: articles = articles[:max_articles] break except Exception as e: print(f" x Exception {topic}: {e}") break print(f"✅ Finished: {topic} ({len(articles)} articles)") return articles # --- MAIN ORCHESTRATOR --- async def start_async_search(topics: list, geo_code: str, days_back: int, max_articles: int, api_key: str, country_name: str): start_time = time.time() # 1. Setup Concurrency sem = asyncio.Semaphore(MAX_CONCURRENCY) ssl_context = ssl.create_default_context(cafile=certifi.where()) connector = aiohttp.TCPConnector(ssl=ssl_context) # connector = aiohttp.TCPConnector(ssl=False) # Ignore SSL errors if necessary # 2. Run Tasks async with aiohttp.ClientSession(connector=connector) as session: tasks = [ fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name) for topic in topics ] results = await asyncio.gather(*tasks) # 3. Flatten and Deduplicate # We use a dictionary keyed by URL to ensure every article is unique unique_articles_map = {} for topic_articles in results: for article in topic_articles: link = article.get('link') if link and link not in unique_articles_map: unique_articles_map[link] = article final_articles_list = list(unique_articles_map.values()) # 4. Optional: Save to file for debug with open("unique_articles.json", "w", encoding="utf-8") as f: json.dump(final_articles_list, f, indent=2, ensure_ascii=False) print("\n" + "=" * 40) print(f"Total Time: {time.time() - start_time:.2f} seconds") print(f"Total Unique Articles: {len(final_articles_list)}") print("=" * 40) return final_articles_list def search_news(topic_list, geo_code, days_back, max_news, SERPER_API_KEY, country_name): articles = asyncio.run(start_async_search( topics=topic_list, geo_code=geo_code, days_back=days_back, max_articles=max_news, api_key=SERPER_API_KEY, country_name=country_name )) # Verify output print(f"Search_news captured {len(articles)} articles.") if articles: print(f"Sample title: {articles[0].get('title')}") return articles # ************** Company URL part async def fetch_url_from_serper(session, company_name, api_key): """ Async worker: specific search for one company. """ url = "https://google.serper.dev/search" payload = json.dumps({"q": f"{company_name} official website", "num": 1}) headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'} # try: # # We use the session passed from the parent, which now has SSL configured # async with session.post(url, headers=headers, data=payload) as response: # if response.status == 200: # data = await response.json() # if "organic" in data and len(data["organic"]) > 0: # return data["organic"][0].get("link", "") # except Exception as e: # print(f"⚠️ Serper error for {company_name}: {e}") # # return "" # Helper to clean names for comparison (e.g. "99 Startups" -> "99startups") def clean(text): return re.sub(r'\W+', '', text).lower() target_name = clean(company_name) # Domains to ignore if they appear as the main link blacklist = ["wikipedia", "linkedin", "bloomberg", "crunchbase", "facebook", "instagram", "youtube"] try: async with session.post(url, headers=headers, data=payload) as response: if response.status == 200: data = await response.json() if "organic" not in data: return "" results = data["organic"] # --- STRATEGY 1: Check High-Quality Matches in Links --- for res in results: link = res.get("link", "") domain = urlparse(link).netloc.lower() # Skip blacklisted profile sites if any(b in domain for b in blacklist): continue # If the domain contains the company name strictly (e.g. 'sabre.com' contains 'sabre') # This fixes the "Generic Name" issue if the official site ranks high if target_name in clean(domain): return link # --- STRATEGY 2: Snippet Hunting (The "99 Startups" Fix) --- # If Strategy 1 failed, look for URLs hidden inside the text snippet for res in results: snippet = res.get("snippet", "") # Find potential URLs in the text (e.g. "Website: www.99startups.com") hidden_urls = re.findall(r'(?:www\.|https?://)[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', snippet) for hidden in hidden_urls: # If this hidden URL matches our company name, it's likely the real one if target_name in clean(hidden): # Ensure it has a schema if not hidden.startswith("http"): return f"https://{hidden}" return hidden # --- STRATEGY 3: Fallback (Best Guess) --- # If no perfect match found, return the first non-blacklisted result for res in results: link = res.get("link", "") if not any(b in link for b in blacklist): return link except Exception as e: print(f"⚠️ Serper error for {company_name}: {e}") return "" async def run_batch_search(company_names, api_key): """ Orchestrator: runs all searches in parallel with SECURE SSL CONTEXT. """ results = {} # --- SSL FIX START --- # Create an SSL context that uses certifi's trusted CA bundle ssl_context = ssl.create_default_context(cafile=certifi.where()) connector = aiohttp.TCPConnector(ssl=ssl_context) # --- SSL FIX END --- # Pass the connector to the session async with aiohttp.ClientSession(connector=connector) as session: tasks = [] for name in company_names: tasks.append(fetch_url_from_serper(session, name, api_key)) # Run them all at once urls = await asyncio.gather(*tasks) for name, url in zip(company_names, urls): results[name] = url return results def fill_missing_urls(data_list, sheet_name, serper_api_key): """ Main function to process the data list. 1. Checks Cache 2. Searches Serper for missing 3. Updates Data 4. Saves new finds to Cache """ # A. Identify targets # We only care about rows where url is 'SEARCH_REQUIRED' target_indices = [i for i, row in enumerate(data_list) if row.get('company_url') == 'SEARCH_REQUIRED'] if not target_indices: print("✅ No searches required.") return data_list print(f"🔍 Processing {len(target_indices)} missing URLs...") # Get unique company names needing search (Deduplication) companies_to_resolve = {data_list[i]['company_name'] for i in target_indices} # B. Connect & Check Cache try: sheet = connect_to_sheet(sheet_name) cache_dict = load_cache_dict(sheet) except Exception as e: print(f"⚠️ Cache connection failed, skipping cache: {e}") sheet = None cache_dict = {} # Separate into Found vs Missing found_in_cache = {} missing_from_cache = [] for company in companies_to_resolve: norm_name = company.lower().strip() if norm_name in cache_dict: found_in_cache[company] = cache_dict[norm_name] else: missing_from_cache.append(company) print(f" - Found in Cache: {len(found_in_cache)}") print(f" - Need API Search: {len(missing_from_cache)}") # C. Perform API Search (if any missing) search_results = {} if missing_from_cache: print(f"🌍 Searching internet for {len(missing_from_cache)} companies...") search_results = asyncio.run(run_batch_search(missing_from_cache, serper_api_key)) # D. Update the Original Data List # Combine all known URLs (Cache + Search) full_knowledge_base = {**found_in_cache, **search_results} for i in target_indices: comp_name = data_list[i]['company_name'] # Look up URL (default to empty string if search failed) url = full_knowledge_base.get(comp_name, "") data_list[i]['company_url'] = url # E. Update Cache (Save only what we just searched) if sheet and search_results: # Prepare list for GSheet [{'Company': 'Name', 'Website': 'URL'}] new_cache_entries = [ {'Company': name, 'Website': url} for name, url in search_results.items() if url # Only cache if we actually found a URL ] append_to_cache(sheet, new_cache_entries) else: print('Nothing appended to cache') return data_list