newFinderAgent_v2 / src /search.py
chummchumm's picture
Upload 6 files
8b425b2 verified
import asyncio
import aiohttp
import time
import json
import math
import certifi
import ssl
from cache import connect_to_sheet, load_cache_dict, append_to_cache
import re
from urllib.parse import urlparse
# --- CONFIGURATION ---
BASE_URL = "https://google.serper.dev/news"
RESULTS_PER_PAGE = 100 # Serper max per request
MAX_CONCURRENCY = 10 # Avoid 429 errors
# --- WORKER: Fetch articles for one topic ---
async def fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name=""):
articles = []
headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'}
time_filter = f"qdr:d{days_back}"
# Calculate how many pages we need based on max_articles
# e.g., if max_articles=50, we need 1 page. If 150, we need 2 pages.
required_pages = math.ceil(max_articles / RESULTS_PER_PAGE)
async with sem:
print(f"--> Starting: {topic}")
if country_name and country_name != "Global":
query = f"{topic} {country_name}"
else:
query = topic
for page in range(1, required_pages + 1):
payload = {
"q": query,
"gl": geo_code,
"tbs": time_filter,
"num": RESULTS_PER_PAGE,
"page": page
}
try:
async with session.post(BASE_URL, headers=headers, json=payload) as resp:
if resp.status != 200:
print(f" x Error {topic} (Page {page}): Status {resp.status}")
break
data = await resp.json()
new_news = data.get("news", [])
if not new_news:
break # No more results
articles.extend(new_news)
# Stop if we have reached the requested limit for this topic
if len(articles) >= max_articles:
articles = articles[:max_articles]
break
except Exception as e:
print(f" x Exception {topic}: {e}")
break
print(f"โœ… Finished: {topic} ({len(articles)} articles)")
return articles
# --- MAIN ORCHESTRATOR ---
async def start_async_search(topics: list, geo_code: str, days_back: int, max_articles: int, api_key: str,
country_name: str):
start_time = time.time()
# 1. Setup Concurrency
sem = asyncio.Semaphore(MAX_CONCURRENCY)
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
# connector = aiohttp.TCPConnector(ssl=False) # Ignore SSL errors if necessary
# 2. Run Tasks
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
fetch_topic(session, topic, sem, geo_code, days_back, max_articles, api_key, country_name)
for topic in topics
]
results = await asyncio.gather(*tasks)
# 3. Flatten and Deduplicate
# We use a dictionary keyed by URL to ensure every article is unique
unique_articles_map = {}
for topic_articles in results:
for article in topic_articles:
link = article.get('link')
if link and link not in unique_articles_map:
unique_articles_map[link] = article
final_articles_list = list(unique_articles_map.values())
# 4. Optional: Save to file for debug
with open("unique_articles.json", "w", encoding="utf-8") as f:
json.dump(final_articles_list, f, indent=2, ensure_ascii=False)
print("\n" + "=" * 40)
print(f"Total Time: {time.time() - start_time:.2f} seconds")
print(f"Total Unique Articles: {len(final_articles_list)}")
print("=" * 40)
return final_articles_list
def search_news(topic_list, geo_code, days_back, max_news, SERPER_API_KEY, country_name):
articles = asyncio.run(start_async_search(
topics=topic_list,
geo_code=geo_code,
days_back=days_back,
max_articles=max_news,
api_key=SERPER_API_KEY,
country_name=country_name
))
# Verify output
print(f"Search_news captured {len(articles)} articles.")
if articles:
print(f"Sample title: {articles[0].get('title')}")
return articles
# ************** Company URL part
async def fetch_url_from_serper(session, company_name, api_key):
"""
Async worker: specific search for one company.
"""
url = "https://google.serper.dev/search"
payload = json.dumps({"q": f"{company_name} official website", "num": 1})
headers = {'X-API-KEY': api_key, 'Content-Type': 'application/json'}
# try:
# # We use the session passed from the parent, which now has SSL configured
# async with session.post(url, headers=headers, data=payload) as response:
# if response.status == 200:
# data = await response.json()
# if "organic" in data and len(data["organic"]) > 0:
# return data["organic"][0].get("link", "")
# except Exception as e:
# print(f"โš ๏ธ Serper error for {company_name}: {e}")
#
# return ""
# Helper to clean names for comparison (e.g. "99 Startups" -> "99startups")
def clean(text):
return re.sub(r'\W+', '', text).lower()
target_name = clean(company_name)
# Domains to ignore if they appear as the main link
blacklist = ["wikipedia", "linkedin", "bloomberg", "crunchbase", "facebook", "instagram", "youtube"]
try:
async with session.post(url, headers=headers, data=payload) as response:
if response.status == 200:
data = await response.json()
if "organic" not in data:
return ""
results = data["organic"]
# --- STRATEGY 1: Check High-Quality Matches in Links ---
for res in results:
link = res.get("link", "")
domain = urlparse(link).netloc.lower()
# Skip blacklisted profile sites
if any(b in domain for b in blacklist):
continue
# If the domain contains the company name strictly (e.g. 'sabre.com' contains 'sabre')
# This fixes the "Generic Name" issue if the official site ranks high
if target_name in clean(domain):
return link
# --- STRATEGY 2: Snippet Hunting (The "99 Startups" Fix) ---
# If Strategy 1 failed, look for URLs hidden inside the text snippet
for res in results:
snippet = res.get("snippet", "")
# Find potential URLs in the text (e.g. "Website: www.99startups.com")
hidden_urls = re.findall(r'(?:www\.|https?://)[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', snippet)
for hidden in hidden_urls:
# If this hidden URL matches our company name, it's likely the real one
if target_name in clean(hidden):
# Ensure it has a schema
if not hidden.startswith("http"):
return f"https://{hidden}"
return hidden
# --- STRATEGY 3: Fallback (Best Guess) ---
# If no perfect match found, return the first non-blacklisted result
for res in results:
link = res.get("link", "")
if not any(b in link for b in blacklist):
return link
except Exception as e:
print(f"โš ๏ธ Serper error for {company_name}: {e}")
return ""
async def run_batch_search(company_names, api_key):
"""
Orchestrator: runs all searches in parallel with SECURE SSL CONTEXT.
"""
results = {}
# --- SSL FIX START ---
# Create an SSL context that uses certifi's trusted CA bundle
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
# --- SSL FIX END ---
# Pass the connector to the session
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for name in company_names:
tasks.append(fetch_url_from_serper(session, name, api_key))
# Run them all at once
urls = await asyncio.gather(*tasks)
for name, url in zip(company_names, urls):
results[name] = url
return results
def fill_missing_urls(data_list, sheet_name, serper_api_key):
"""
Main function to process the data list.
1. Checks Cache
2. Searches Serper for missing
3. Updates Data
4. Saves new finds to Cache
"""
# A. Identify targets
# We only care about rows where url is 'SEARCH_REQUIRED'
target_indices = [i for i, row in enumerate(data_list) if row.get('company_url') == 'SEARCH_REQUIRED']
if not target_indices:
print("โœ… No searches required.")
return data_list
print(f"๐Ÿ” Processing {len(target_indices)} missing URLs...")
# Get unique company names needing search (Deduplication)
companies_to_resolve = {data_list[i]['company_name'] for i in target_indices}
# B. Connect & Check Cache
try:
sheet = connect_to_sheet(sheet_name)
cache_dict = load_cache_dict(sheet)
except Exception as e:
print(f"โš ๏ธ Cache connection failed, skipping cache: {e}")
sheet = None
cache_dict = {}
# Separate into Found vs Missing
found_in_cache = {}
missing_from_cache = []
for company in companies_to_resolve:
norm_name = company.lower().strip()
if norm_name in cache_dict:
found_in_cache[company] = cache_dict[norm_name]
else:
missing_from_cache.append(company)
print(f" - Found in Cache: {len(found_in_cache)}")
print(f" - Need API Search: {len(missing_from_cache)}")
# C. Perform API Search (if any missing)
search_results = {}
if missing_from_cache:
print(f"๐ŸŒ Searching internet for {len(missing_from_cache)} companies...")
search_results = asyncio.run(run_batch_search(missing_from_cache, serper_api_key))
# D. Update the Original Data List
# Combine all known URLs (Cache + Search)
full_knowledge_base = {**found_in_cache, **search_results}
for i in target_indices:
comp_name = data_list[i]['company_name']
# Look up URL (default to empty string if search failed)
url = full_knowledge_base.get(comp_name, "")
data_list[i]['company_url'] = url
# E. Update Cache (Save only what we just searched)
if sheet and search_results:
# Prepare list for GSheet [{'Company': 'Name', 'Website': 'URL'}]
new_cache_entries = [
{'Company': name, 'Website': url}
for name, url in search_results.items()
if url # Only cache if we actually found a URL
]
append_to_cache(sheet, new_cache_entries)
else:
print('Nothing appended to cache')
return data_list