rag-api-node-1 / src /infrastructure /adapters /duckduckgo_adapter.py
Peterase's picture
feat: implement semantic TL;DR citations and live image support
d8f8038
"""
DuckDuckGo Live Search Adapter
Provides real-time news search using DuckDuckGo's news search API.
Integrates with the hybrid RAG pipeline to supplement database results
with fresh, live content.
Features:
- Async execution with timeout (2s default)
- Ethiopia-focused filtering
- Error handling and graceful fallbacks
- Result normalization for hybrid ranking
"""
import logging
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime
import traceback
logger = logging.getLogger(__name__)
try:
from ddgs import DDGS
HAS_DDGS = True
except ImportError:
# Fallback to old package name for backward compatibility
try:
from duckduckgo_search import DDGS
HAS_DDGS = True
except ImportError:
HAS_DDGS = False
logger.warning("ddgs (duckduckgo-search) not installed. Live search disabled.")
class DuckDuckGoAdapter:
"""
Adapter for DuckDuckGo news search.
Provides real-time news results to complement database search.
Designed to be fast (2s timeout) and resilient (graceful fallbacks).
"""
def __init__(self, timeout: float = 1.5, max_results: int = 15):
"""
Initialize DuckDuckGo adapter.
Args:
timeout: Maximum time to wait for results (seconds)
max_results: Maximum number of results to return
"""
self.timeout = timeout
self.max_results = max_results
self.ddgs = DDGS() if HAS_DDGS else None
self.retry_count = 1 # Retry once on failure
# Ethiopia context detection - multi-tier approach
# Tier 1: Direct Ethiopia mentions
self.ethiopia_direct = {
"ethiopia", "ethiopian", "ethiopians", "addis ababa"
}
# Tier 2: Ethiopian regions (strong Ethiopia context)
self.ethiopia_regions = {
"amhara", "tigray", "oromia", "somali region", "afar",
"sidama", "snnpr", "benishangul", "gambela", "harari", "dire dawa"
}
# Tier 3: Ethiopian political entities (strong Ethiopia context)
self.ethiopia_political = {
"abiy ahmed", "endf", "tplf", "fano", "oneg", "olf",
"prosperity party", "eprdf", "ethiopian government"
}
# Tier 4: Horn of Africa context (weak Ethiopia context - needs boost)
self.horn_africa = {
"horn of africa", "east africa", "nile dam", "gerd", "renaissance dam"
}
# Tier 5: Neighboring countries (NO Ethiopia context - don't add filter)
self.neighboring_countries = {
"somalia", "somali", "kenya", "kenyan", "sudan", "sudanese",
"south sudan", "eritrea", "eritrean", "djibouti"
}
if not HAS_DDGS:
logger.error(
"DuckDuckGo search unavailable. "
"Install with: pip install duckduckgo-search"
)
def _analyze_ethiopia_context(self, query: str) -> Dict[str, Any]:
"""
Analyze query to determine Ethiopia context and optimal search strategy.
Returns:
{
"has_ethiopia_context": bool,
"context_strength": str, # "strong", "medium", "weak", "none"
"should_add_filter": bool,
"search_modifier": str, # What to add to query
"reason": str
}
"""
query_lower = query.lower()
# Tier 1: Direct Ethiopia mention - STRONG context, no filter needed
if any(term in query_lower for term in self.ethiopia_direct):
return {
"has_ethiopia_context": True,
"context_strength": "strong",
"should_add_filter": False,
"search_modifier": "",
"reason": "Direct Ethiopia mention detected"
}
# Tier 2: Ethiopian regions - STRONG context, no filter needed
if any(region in query_lower for region in self.ethiopia_regions):
return {
"has_ethiopia_context": True,
"context_strength": "strong",
"should_add_filter": False,
"search_modifier": "",
"reason": f"Ethiopian region detected"
}
# Tier 3: Ethiopian political entities - STRONG context, no filter needed
if any(entity in query_lower for entity in self.ethiopia_political):
return {
"has_ethiopia_context": True,
"context_strength": "strong",
"should_add_filter": False,
"search_modifier": "",
"reason": "Ethiopian political entity detected"
}
# Tier 4: Horn of Africa - MEDIUM context, add Ethiopia for specificity
if any(term in query_lower for term in self.horn_africa):
return {
"has_ethiopia_context": True,
"context_strength": "medium",
"should_add_filter": True,
"search_modifier": "Ethiopia",
"reason": "Horn of Africa context - adding Ethiopia for specificity"
}
# Tier 5: Neighboring countries - NO Ethiopia context, don't add filter
if any(country in query_lower for country in self.neighboring_countries):
return {
"has_ethiopia_context": False,
"context_strength": "none",
"should_add_filter": False,
"search_modifier": "",
"reason": "Neighboring country detected - respecting user intent"
}
# Default: No Ethiopia context - WEAK, add filter for Ethiopia focus
return {
"has_ethiopia_context": False,
"context_strength": "weak",
"should_add_filter": True,
"search_modifier": "Ethiopia OR \"Horn of Africa\"",
"reason": "No Ethiopia context - adding broad filter"
}
async def search(
self,
query: str,
max_results: Optional[int] = None,
region: str = "et-en", # Ethiopia English
add_ethiopia_filter: bool = None # Auto-detect if None
) -> List[Dict[str, Any]]:
"""
Search DuckDuckGo news for the given query with smart Ethiopia filtering.
Args:
query: Search query
max_results: Override default max_results
region: DuckDuckGo region code (et-en = Ethiopia English)
add_ethiopia_filter: Override auto-detection (None = auto-detect)
Returns:
List of normalized search results
"""
if not self.ddgs:
logger.warning("DuckDuckGo unavailable - returning empty results")
return []
max_results = max_results or self.max_results
# Smart Ethiopia filtering with context analysis
if add_ethiopia_filter is None:
# Auto-detect using multi-tier analysis
context = self._analyze_ethiopia_context(query)
logger.info(
f"[DDG] Context analysis: {context['context_strength']} "
f"({context['reason']})"
)
if context["should_add_filter"]:
search_query = f"{query} {context['search_modifier']}"
logger.info(f"[DDG] Enhanced query: '{search_query}'")
else:
search_query = query
logger.info(f"[DDG] Using original query (sufficient context)")
else:
# Manual override
search_query = f"{query} Ethiopia" if add_ethiopia_filter else query
logger.info(f"[DDG] Manual filter override: {add_ethiopia_filter}")
# Try search with retry
for attempt in range(self.retry_count + 1):
try:
# Run sync DuckDuckGo search in thread pool with timeout
loop = asyncio.get_event_loop()
results = await asyncio.wait_for(
loop.run_in_executor(
None,
self._search_sync,
search_query,
max_results,
region
),
timeout=self.timeout
)
logger.info(
f"[DDG] Search completed: '{query[:50]}' → {len(results)} results "
f"(attempt {attempt + 1}/{self.retry_count + 1})"
)
return results
except asyncio.TimeoutError:
if attempt < self.retry_count:
logger.warning(
f"[DDG] Timeout ({self.timeout}s) - retrying ({attempt + 1}/{self.retry_count})"
)
await asyncio.sleep(0.5) # Brief delay before retry
continue
else:
logger.warning(
f"[DDG] Search timeout ({self.timeout}s) after {self.retry_count + 1} attempts"
)
return []
except Exception as e:
if attempt < self.retry_count:
logger.warning(
f"[DDG] Error: {e} - retrying ({attempt + 1}/{self.retry_count})"
)
await asyncio.sleep(0.5)
continue
else:
logger.error(
f"[DDG] Search error after {self.retry_count + 1} attempts: {e}\n"
f"{traceback.format_exc()}"
)
return []
return []
def _search_sync(
self,
query: str,
max_results: int,
region: str
) -> List[Dict[str, Any]]:
"""
Synchronous DuckDuckGo search (runs in thread pool).
Args:
query: Search query
max_results: Maximum results to return
region: DuckDuckGo region code
Returns:
List of normalized results
"""
results = []
try:
# DuckDuckGo news search (ddgs package uses query as first positional arg)
raw_results = self.ddgs.news(
query, # First positional argument
region=region,
max_results=max_results
)
# Normalize results to common format
for r in raw_results:
normalized = self._normalize_result(r)
if normalized:
results.append(normalized)
except Exception as e:
# Handle specific DuckDuckGo errors gracefully
error_msg = str(e)
if "DecodeError" in error_msg or "Body collection error" in error_msg:
logger.warning(f"DuckDuckGo decode error (likely rate limit or API issue): {e}")
# Return empty results instead of raising - system will use database fallback
return []
elif "No results found" in error_msg:
logger.debug(f"DuckDuckGo: No results for query '{query[:50]}'")
return []
else:
logger.error(f"DuckDuckGo API error: {e}")
# Return empty results for graceful degradation
return []
return results
def _normalize_result(self, raw_result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Normalize DuckDuckGo result to common format.
Args:
raw_result: Raw result from DuckDuckGo API
Returns:
Normalized result dict or None if invalid
"""
try:
# Extract fields (DuckDuckGo news format)
title = raw_result.get("title", "").strip()
url = raw_result.get("url", "").strip()
snippet = raw_result.get("body", "").strip()
source = raw_result.get("source", "").strip()
date_str = raw_result.get("date")
# Validate required fields
if not title or not url:
logger.debug(f"Skipping invalid result: missing title or URL")
return None
# Parse date
published_at = self._parse_date(date_str)
# Calculate freshness score (live results are freshest)
freshness_score = self._calculate_freshness(published_at)
image_url = raw_result.get("image") or raw_result.get("thumbnail")
return {
"title": title,
"url": url,
"content": snippet or title, # Use title if no snippet
"snippet": snippet,
"source": source or self._extract_domain(url),
"published_at": published_at,
"image_url": image_url,
"source_type": "live",
"is_live": True,
"freshness_score": freshness_score,
"language": "en", # DuckDuckGo returns English
"metadata": {
"title": title,
"url": url,
"source": source,
"published_at": published_at,
"image_url": image_url,
"search_engine": "duckduckgo"
}
}
except Exception as e:
logger.warning(f"Failed to normalize result: {e}")
return None
def _parse_date(self, date_str: Optional[str]) -> str:
"""
Parse date string to ISO format.
Args:
date_str: Date string from DuckDuckGo
Returns:
ISO format date string or current time if parsing fails
"""
if not date_str:
return datetime.utcnow().isoformat()
try:
# DuckDuckGo returns ISO-like format
# Try parsing common formats
from dateutil import parser
parsed = parser.parse(date_str)
return parsed.isoformat()
except:
# Fallback to current time
return datetime.utcnow().isoformat()
def _calculate_freshness(self, published_at: str) -> float:
"""
Calculate freshness score based on article age.
Args:
published_at: ISO format date string
Returns:
Freshness score (0.0 to 1.0)
"""
try:
pub_date = datetime.fromisoformat(published_at.replace('Z', '+00:00'))
age = datetime.utcnow() - pub_date.replace(tzinfo=None)
age_minutes = age.total_seconds() / 60
# Live results are very fresh
if age_minutes < 10:
return 1.0 # < 10 min
elif age_minutes < 60:
return 0.95 # < 1 hour
elif age_minutes < 360:
return 0.9 # < 6 hours
elif age_minutes < 1440:
return 0.85 # < 24 hours
else:
return 0.8 # Older but still from live search
except:
return 1.0 # Default to fresh for live results
def _extract_domain(self, url: str) -> str:
"""
Extract domain name from URL.
Args:
url: Full URL
Returns:
Domain name (e.g., "bbc.com")
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc
# Remove www. prefix
if domain.startswith("www."):
domain = domain[4:]
return domain
except:
return "unknown"
def is_available(self) -> bool:
"""
Check if DuckDuckGo search is available.
Returns:
True if available, False otherwise
"""
return HAS_DDGS and self.ddgs is not None
# Module-level singleton for easy import
_default_adapter = None
def get_duckduckgo_adapter(timeout: float = 1.5, max_results: int = 15) -> DuckDuckGoAdapter:
"""
Get or create the default DuckDuckGo adapter instance.
Args:
timeout: Search timeout in seconds
max_results: Maximum results to return
Returns:
DuckDuckGoAdapter instance
"""
global _default_adapter
if _default_adapter is None:
_default_adapter = DuckDuckGoAdapter(timeout=timeout, max_results=max_results)
return _default_adapter