rag-api-node-1 / src /infrastructure /adapters /newsapi_adapter.py
Peterase's picture
fix: NewsAPI query precision + non-news domain filtering
54dfb7e
"""
NewsAPI.org Adapter
Provides real-time news from 80,000+ sources worldwide.
Best for temporal queries requiring fresh, breaking news.
Features:
- Real-time updates (15-minute refresh)
- 80,000+ sources including African outlets
- Structured data (title, description, content, source, publishedAt)
- Free tier: 100 requests/day
- Paid tier: $449/month for production
Get API key: https://newsapi.org/register
"""
import logging
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime
import httpx
logger = logging.getLogger(__name__)
class NewsAPIAdapter:
"""
Adapter for NewsAPI.org real-time news search.
Provides fresh news results to complement database search.
Designed to be fast (2s timeout) and resilient (graceful fallbacks).
"""
BASE_URL = "https://newsapi.org/v2"
def __init__(
self,
api_key: str,
timeout: float = 2.0,
max_results: int = 20
):
"""
Initialize NewsAPI adapter.
Args:
api_key: NewsAPI.org API key
timeout: Maximum time to wait for results (seconds)
max_results: Maximum number of results to return
"""
self.api_key = api_key
self.timeout = timeout
self.max_results = max_results
self.client = None
if not api_key or api_key == "your-newsapi-key-here":
logger.warning("NewsAPI key not configured - adapter disabled")
self.api_key = None
else:
logger.info(f"NewsAPI adapter initialized (timeout={timeout}s, max={max_results})")
async def _ensure_client(self):
"""Lazy initialization of HTTP client"""
if self.client is None:
self.client = httpx.AsyncClient(
timeout=self.timeout,
headers={
"X-Api-Key": self.api_key,
"User-Agent": "ARKI-AI-RAG/2.5 (Ethiopia News Assistant)"
}
)
# Domains that are NOT news sources β€” filter these out
_NON_NEWS_DOMAINS = {
"pypi.org", "github.com", "stackoverflow.com", "reddit.com",
"wikipedia.org", "arxiv.org", "researchgate.net", "academia.edu",
"linkedin.com", "facebook.com", "twitter.com", "x.com",
"youtube.com", "instagram.com", "tiktok.com",
"amazon.com", "ebay.com", "etsy.com",
"plos.org", "pubmed.ncbi.nlm.nih.gov", "springer.com",
"stemlynsblog.org",
}
async def search(
self,
query: str,
language: str = "en",
sort_by: str = "publishedAt",
from_date: Optional[str] = None,
max_results: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Search NewsAPI for the given query.
Always anchors to Ethiopia/Africa context for single-word queries.
Filters out non-news domains (pypi, github, academic, social media).
"""
if not self.api_key:
logger.warning("NewsAPI unavailable - returning empty results")
return []
await self._ensure_client()
max_results = max_results or self.max_results
# Build search query β€” always ensure Ethiopia/Africa context
words = query.strip().split()
if len(words) == 1:
# Single word: anchor to Ethiopia news explicitly
search_q = f'"{query}" AND ("Ethiopia" OR "Africa" OR "Horn of Africa")'
elif len(words) <= 3:
# Short query: AND all terms
search_q = " AND ".join(f'"{w}"' for w in words)
else:
# Longer query: use first 3 key terms
search_q = " AND ".join(f'"{w}"' for w in words[:3])
try:
url = f"{self.BASE_URL}/everything"
params = {
"q": search_q,
"language": language,
"sortBy": sort_by,
"pageSize": min(max_results * 2, 100), # Fetch extra to allow filtering
}
if from_date:
params["from"] = from_date
logger.info(f"[NewsAPI] Searching: '{search_q}' (lang={language})")
response = await self.client.get(url, params=params)
if response.status_code == 200:
data = response.json()
if data.get("status") != "ok":
logger.warning(f"NewsAPI error: {data.get('message', 'unknown')}")
return []
articles = data.get("articles", [])
results = []
filtered_out = 0
for article in articles:
# Filter non-news domains
url_str = article.get("url", "")
domain = self._extract_domain(url_str)
if any(nd in domain for nd in self._NON_NEWS_DOMAINS):
filtered_out += 1
logger.debug(f"[NewsAPI] Filtered non-news: {domain}")
continue
normalized = self._normalize_result(article)
if normalized:
results.append(normalized)
if len(results) >= max_results:
break
if filtered_out:
logger.info(f"[NewsAPI] Filtered {filtered_out} non-news articles")
logger.info(
f"[NewsAPI] '{query[:50]}' β†’ {len(results)} results "
f"(total available: {data.get('totalResults', 0)})"
)
return results
elif response.status_code == 401:
logger.error("NewsAPI: Invalid API key")
return []
elif response.status_code == 429:
logger.warning("NewsAPI: Rate limit exceeded (100 requests/day on free tier)")
return []
elif response.status_code == 426:
logger.warning("NewsAPI: Upgrade required (free tier limitations)")
return []
else:
logger.warning(f"NewsAPI returned status {response.status_code}: {response.text[:200]}")
return []
except asyncio.TimeoutError:
logger.warning(f"NewsAPI timeout ({self.timeout}s)")
return []
except Exception as e:
logger.error(f"NewsAPI search error: {e}")
return []
async def search_top_headlines(
self,
country: str = "us",
category: Optional[str] = None,
max_results: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Get top headlines from NewsAPI.
Args:
country: Country code (us, gb, etc.) - Note: Ethiopia (et) not supported
category: Category (business, entertainment, general, health, science, sports, technology)
max_results: Override default max_results
Returns:
List of normalized search results
"""
if not self.api_key:
logger.warning("NewsAPI unavailable - returning empty results")
return []
await self._ensure_client()
max_results = max_results or self.max_results
try:
url = f"{self.BASE_URL}/top-headlines"
params = {
"country": country,
"pageSize": max_results
}
if category:
params["category"] = category
logger.info(f"[NewsAPI] Fetching top headlines (country={country}, category={category})")
response = await self.client.get(url, params=params)
if response.status_code == 200:
data = response.json()
articles = data.get("articles", [])
results = []
for article in articles:
normalized = self._normalize_result(article)
if normalized:
results.append(normalized)
logger.info(f"[NewsAPI] Top headlines: {len(results)} results")
return results
else:
logger.warning(f"NewsAPI top headlines returned status {response.status_code}")
return []
except Exception as e:
logger.error(f"NewsAPI top headlines error: {e}")
return []
def _normalize_result(self, article: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Normalize NewsAPI result to common format.
Args:
article: Raw article from NewsAPI
Returns:
Normalized result dict or None if invalid
"""
try:
# Extract fields
title = article.get("title", "").strip()
url = article.get("url", "").strip()
description = article.get("description", "").strip()
content = article.get("content", "").strip()
source_name = article.get("source", {}).get("name", "").strip()
published_at = article.get("publishedAt", "")
author = article.get("author", "")
url_to_image = article.get("urlToImage", "")
# Validate required fields
if not title or not url:
logger.debug(f"Skipping invalid result: missing title or URL")
return None
# Combine description + content for better context
full_content = description
if content and content != description:
# NewsAPI truncates content with [+X chars]
# We'll use Jina Reader to get full article later
full_content = f"{description}\n\n{content}"
# Calculate freshness score
freshness_score = self._calculate_freshness(published_at)
return {
"title": title,
"url": url,
"content": full_content or title, # Use title if no content
"snippet": description,
"source": source_name or self._extract_domain(url),
"published_at": published_at,
"author": author,
"image_url": url_to_image,
"source_type": "live",
"is_live": True,
"freshness_score": freshness_score,
"language": "en", # NewsAPI returns language in query
"metadata": {
"title": title,
"url": url,
"source": source_name,
"published_at": published_at,
"author": author,
"search_engine": "newsapi"
}
}
except Exception as e:
logger.warning(f"Failed to normalize NewsAPI result: {e}")
return None
def _calculate_freshness(self, published_at: str) -> float:
"""
Calculate freshness score based on article age.
Args:
published_at: ISO format date string
Returns:
Freshness score (0.0 to 1.0)
"""
if not published_at:
return 0.8 # Unknown age, assume recent
try:
pub_date = datetime.fromisoformat(published_at.replace('Z', '+00:00'))
age = datetime.utcnow() - pub_date.replace(tzinfo=None)
age_minutes = age.total_seconds() / 60
# NewsAPI results are very fresh
if age_minutes < 10:
return 1.0 # < 10 min
elif age_minutes < 60:
return 0.98 # < 1 hour
elif age_minutes < 360:
return 0.95 # < 6 hours
elif age_minutes < 1440:
return 0.9 # < 24 hours
else:
return 0.85 # Older but still from live search
except:
return 0.8 # Default to recent
def _extract_domain(self, url: str) -> str:
"""
Extract domain name from URL.
Args:
url: Full URL
Returns:
Domain name (e.g., "bbc.com")
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc
# Remove www. prefix
if domain.startswith("www."):
domain = domain[4:]
return domain
except:
return "unknown"
def is_available(self) -> bool:
"""
Check if NewsAPI is available.
Returns:
True if API key is configured, False otherwise
"""
return self.api_key is not None
async def close(self):
"""Close HTTP client"""
if self.client:
await self.client.aclose()
self.client = None
logger.debug("NewsAPI client closed")
# ═══════════════════════════════════════════════════════════════════════════
# SINGLETON INSTANCE
# ═══════════════════════════════════════════════════════════════════════════
_default_adapter = None
def get_newsapi_adapter(
api_key: str,
timeout: float = 2.0,
max_results: int = 20
) -> NewsAPIAdapter:
"""
Get or create the default NewsAPI adapter instance.
Args:
api_key: NewsAPI.org API key
timeout: Search timeout in seconds
max_results: Maximum results to return
Returns:
NewsAPIAdapter instance
"""
global _default_adapter
if _default_adapter is None:
_default_adapter = NewsAPIAdapter(
api_key=api_key,
timeout=timeout,
max_results=max_results
)
return _default_adapter