Spaces:
Running
Running
| """ | |
| DuckDuckGo Live Search Adapter | |
| Provides real-time news search using DuckDuckGo's news search API. | |
| Integrates with the hybrid RAG pipeline to supplement database results | |
| with fresh, live content. | |
| Features: | |
| - Async execution with timeout (2s default) | |
| - Ethiopia-focused filtering | |
| - Error handling and graceful fallbacks | |
| - Result normalization for hybrid ranking | |
| """ | |
| import logging | |
| import asyncio | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import traceback | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from ddgs import DDGS | |
| HAS_DDGS = True | |
| except ImportError: | |
| # Fallback to old package name for backward compatibility | |
| try: | |
| from duckduckgo_search import DDGS | |
| HAS_DDGS = True | |
| except ImportError: | |
| HAS_DDGS = False | |
| logger.warning("ddgs (duckduckgo-search) not installed. Live search disabled.") | |
| class DuckDuckGoAdapter: | |
| """ | |
| Adapter for DuckDuckGo news search. | |
| Provides real-time news results to complement database search. | |
| Designed to be fast (2s timeout) and resilient (graceful fallbacks). | |
| """ | |
| def __init__(self, timeout: float = 1.5, max_results: int = 15): | |
| """ | |
| Initialize DuckDuckGo adapter. | |
| Args: | |
| timeout: Maximum time to wait for results (seconds) | |
| max_results: Maximum number of results to return | |
| """ | |
| self.timeout = timeout | |
| self.max_results = max_results | |
| self.ddgs = DDGS() if HAS_DDGS else None | |
| self.retry_count = 1 # Retry once on failure | |
| # Ethiopia context detection - multi-tier approach | |
| # Tier 1: Direct Ethiopia mentions | |
| self.ethiopia_direct = { | |
| "ethiopia", "ethiopian", "ethiopians", "addis ababa" | |
| } | |
| # Tier 2: Ethiopian regions (strong Ethiopia context) | |
| self.ethiopia_regions = { | |
| "amhara", "tigray", "oromia", "somali region", "afar", | |
| "sidama", "snnpr", "benishangul", "gambela", "harari", "dire dawa" | |
| } | |
| # Tier 3: Ethiopian political entities (strong Ethiopia context) | |
| self.ethiopia_political = { | |
| "abiy ahmed", "endf", "tplf", "fano", "oneg", "olf", | |
| "prosperity party", "eprdf", "ethiopian government" | |
| } | |
| # Tier 4: Horn of Africa context (weak Ethiopia context - needs boost) | |
| self.horn_africa = { | |
| "horn of africa", "east africa", "nile dam", "gerd", "renaissance dam" | |
| } | |
| # Tier 5: Neighboring countries (NO Ethiopia context - don't add filter) | |
| self.neighboring_countries = { | |
| "somalia", "somali", "kenya", "kenyan", "sudan", "sudanese", | |
| "south sudan", "eritrea", "eritrean", "djibouti" | |
| } | |
| if not HAS_DDGS: | |
| logger.error( | |
| "DuckDuckGo search unavailable. " | |
| "Install with: pip install duckduckgo-search" | |
| ) | |
| def _analyze_ethiopia_context(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Analyze query to determine Ethiopia context and optimal search strategy. | |
| Returns: | |
| { | |
| "has_ethiopia_context": bool, | |
| "context_strength": str, # "strong", "medium", "weak", "none" | |
| "should_add_filter": bool, | |
| "search_modifier": str, # What to add to query | |
| "reason": str | |
| } | |
| """ | |
| query_lower = query.lower() | |
| # Tier 1: Direct Ethiopia mention - STRONG context, no filter needed | |
| if any(term in query_lower for term in self.ethiopia_direct): | |
| return { | |
| "has_ethiopia_context": True, | |
| "context_strength": "strong", | |
| "should_add_filter": False, | |
| "search_modifier": "", | |
| "reason": "Direct Ethiopia mention detected" | |
| } | |
| # Tier 2: Ethiopian regions - STRONG context, no filter needed | |
| if any(region in query_lower for region in self.ethiopia_regions): | |
| return { | |
| "has_ethiopia_context": True, | |
| "context_strength": "strong", | |
| "should_add_filter": False, | |
| "search_modifier": "", | |
| "reason": f"Ethiopian region detected" | |
| } | |
| # Tier 3: Ethiopian political entities - STRONG context, no filter needed | |
| if any(entity in query_lower for entity in self.ethiopia_political): | |
| return { | |
| "has_ethiopia_context": True, | |
| "context_strength": "strong", | |
| "should_add_filter": False, | |
| "search_modifier": "", | |
| "reason": "Ethiopian political entity detected" | |
| } | |
| # Tier 4: Horn of Africa - MEDIUM context, add Ethiopia for specificity | |
| if any(term in query_lower for term in self.horn_africa): | |
| return { | |
| "has_ethiopia_context": True, | |
| "context_strength": "medium", | |
| "should_add_filter": True, | |
| "search_modifier": "Ethiopia", | |
| "reason": "Horn of Africa context - adding Ethiopia for specificity" | |
| } | |
| # Tier 5: Neighboring countries - NO Ethiopia context, don't add filter | |
| if any(country in query_lower for country in self.neighboring_countries): | |
| return { | |
| "has_ethiopia_context": False, | |
| "context_strength": "none", | |
| "should_add_filter": False, | |
| "search_modifier": "", | |
| "reason": "Neighboring country detected - respecting user intent" | |
| } | |
| # Default: No Ethiopia context - WEAK, add filter for Ethiopia focus | |
| return { | |
| "has_ethiopia_context": False, | |
| "context_strength": "weak", | |
| "should_add_filter": True, | |
| "search_modifier": "Ethiopia OR \"Horn of Africa\"", | |
| "reason": "No Ethiopia context - adding broad filter" | |
| } | |
| async def search( | |
| self, | |
| query: str, | |
| max_results: Optional[int] = None, | |
| region: str = "et-en", # Ethiopia English | |
| add_ethiopia_filter: bool = None # Auto-detect if None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search DuckDuckGo news for the given query with smart Ethiopia filtering. | |
| Args: | |
| query: Search query | |
| max_results: Override default max_results | |
| region: DuckDuckGo region code (et-en = Ethiopia English) | |
| add_ethiopia_filter: Override auto-detection (None = auto-detect) | |
| Returns: | |
| List of normalized search results | |
| """ | |
| if not self.ddgs: | |
| logger.warning("DuckDuckGo unavailable - returning empty results") | |
| return [] | |
| max_results = max_results or self.max_results | |
| # Smart Ethiopia filtering with context analysis | |
| if add_ethiopia_filter is None: | |
| # Auto-detect using multi-tier analysis | |
| context = self._analyze_ethiopia_context(query) | |
| logger.info( | |
| f"[DDG] Context analysis: {context['context_strength']} " | |
| f"({context['reason']})" | |
| ) | |
| if context["should_add_filter"]: | |
| search_query = f"{query} {context['search_modifier']}" | |
| logger.info(f"[DDG] Enhanced query: '{search_query}'") | |
| else: | |
| search_query = query | |
| logger.info(f"[DDG] Using original query (sufficient context)") | |
| else: | |
| # Manual override | |
| search_query = f"{query} Ethiopia" if add_ethiopia_filter else query | |
| logger.info(f"[DDG] Manual filter override: {add_ethiopia_filter}") | |
| # Try search with retry | |
| for attempt in range(self.retry_count + 1): | |
| try: | |
| # Run sync DuckDuckGo search in thread pool with timeout | |
| loop = asyncio.get_event_loop() | |
| results = await asyncio.wait_for( | |
| loop.run_in_executor( | |
| None, | |
| self._search_sync, | |
| search_query, | |
| max_results, | |
| region | |
| ), | |
| timeout=self.timeout | |
| ) | |
| logger.info( | |
| f"[DDG] Search completed: '{query[:50]}' → {len(results)} results " | |
| f"(attempt {attempt + 1}/{self.retry_count + 1})" | |
| ) | |
| return results | |
| except asyncio.TimeoutError: | |
| if attempt < self.retry_count: | |
| logger.warning( | |
| f"[DDG] Timeout ({self.timeout}s) - retrying ({attempt + 1}/{self.retry_count})" | |
| ) | |
| await asyncio.sleep(0.5) # Brief delay before retry | |
| continue | |
| else: | |
| logger.warning( | |
| f"[DDG] Search timeout ({self.timeout}s) after {self.retry_count + 1} attempts" | |
| ) | |
| return [] | |
| except Exception as e: | |
| if attempt < self.retry_count: | |
| logger.warning( | |
| f"[DDG] Error: {e} - retrying ({attempt + 1}/{self.retry_count})" | |
| ) | |
| await asyncio.sleep(0.5) | |
| continue | |
| else: | |
| logger.error( | |
| f"[DDG] Search error after {self.retry_count + 1} attempts: {e}\n" | |
| f"{traceback.format_exc()}" | |
| ) | |
| return [] | |
| return [] | |
| def _search_sync( | |
| self, | |
| query: str, | |
| max_results: int, | |
| region: str | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Synchronous DuckDuckGo search (runs in thread pool). | |
| Args: | |
| query: Search query | |
| max_results: Maximum results to return | |
| region: DuckDuckGo region code | |
| Returns: | |
| List of normalized results | |
| """ | |
| results = [] | |
| try: | |
| # DuckDuckGo news search (ddgs package uses query as first positional arg) | |
| raw_results = self.ddgs.news( | |
| query, # First positional argument | |
| region=region, | |
| max_results=max_results | |
| ) | |
| # Normalize results to common format | |
| for r in raw_results: | |
| normalized = self._normalize_result(r) | |
| if normalized: | |
| results.append(normalized) | |
| except Exception as e: | |
| # Handle specific DuckDuckGo errors gracefully | |
| error_msg = str(e) | |
| if "DecodeError" in error_msg or "Body collection error" in error_msg: | |
| logger.warning(f"DuckDuckGo decode error (likely rate limit or API issue): {e}") | |
| # Return empty results instead of raising - system will use database fallback | |
| return [] | |
| elif "No results found" in error_msg: | |
| logger.debug(f"DuckDuckGo: No results for query '{query[:50]}'") | |
| return [] | |
| else: | |
| logger.error(f"DuckDuckGo API error: {e}") | |
| # Return empty results for graceful degradation | |
| return [] | |
| return results | |
| def _normalize_result(self, raw_result: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """ | |
| Normalize DuckDuckGo result to common format. | |
| Args: | |
| raw_result: Raw result from DuckDuckGo API | |
| Returns: | |
| Normalized result dict or None if invalid | |
| """ | |
| try: | |
| # Extract fields (DuckDuckGo news format) | |
| title = raw_result.get("title", "").strip() | |
| url = raw_result.get("url", "").strip() | |
| snippet = raw_result.get("body", "").strip() | |
| source = raw_result.get("source", "").strip() | |
| date_str = raw_result.get("date") | |
| # Validate required fields | |
| if not title or not url: | |
| logger.debug(f"Skipping invalid result: missing title or URL") | |
| return None | |
| # Parse date | |
| published_at = self._parse_date(date_str) | |
| # Calculate freshness score (live results are freshest) | |
| freshness_score = self._calculate_freshness(published_at) | |
| image_url = raw_result.get("image") or raw_result.get("thumbnail") | |
| return { | |
| "title": title, | |
| "url": url, | |
| "content": snippet or title, # Use title if no snippet | |
| "snippet": snippet, | |
| "source": source or self._extract_domain(url), | |
| "published_at": published_at, | |
| "image_url": image_url, | |
| "source_type": "live", | |
| "is_live": True, | |
| "freshness_score": freshness_score, | |
| "language": "en", # DuckDuckGo returns English | |
| "metadata": { | |
| "title": title, | |
| "url": url, | |
| "source": source, | |
| "published_at": published_at, | |
| "image_url": image_url, | |
| "search_engine": "duckduckgo" | |
| } | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to normalize result: {e}") | |
| return None | |
| def _parse_date(self, date_str: Optional[str]) -> str: | |
| """ | |
| Parse date string to ISO format. | |
| Args: | |
| date_str: Date string from DuckDuckGo | |
| Returns: | |
| ISO format date string or current time if parsing fails | |
| """ | |
| if not date_str: | |
| return datetime.utcnow().isoformat() | |
| try: | |
| # DuckDuckGo returns ISO-like format | |
| # Try parsing common formats | |
| from dateutil import parser | |
| parsed = parser.parse(date_str) | |
| return parsed.isoformat() | |
| except: | |
| # Fallback to current time | |
| return datetime.utcnow().isoformat() | |
| def _calculate_freshness(self, published_at: str) -> float: | |
| """ | |
| Calculate freshness score based on article age. | |
| Args: | |
| published_at: ISO format date string | |
| Returns: | |
| Freshness score (0.0 to 1.0) | |
| """ | |
| try: | |
| pub_date = datetime.fromisoformat(published_at.replace('Z', '+00:00')) | |
| age = datetime.utcnow() - pub_date.replace(tzinfo=None) | |
| age_minutes = age.total_seconds() / 60 | |
| # Live results are very fresh | |
| if age_minutes < 10: | |
| return 1.0 # < 10 min | |
| elif age_minutes < 60: | |
| return 0.95 # < 1 hour | |
| elif age_minutes < 360: | |
| return 0.9 # < 6 hours | |
| elif age_minutes < 1440: | |
| return 0.85 # < 24 hours | |
| else: | |
| return 0.8 # Older but still from live search | |
| except: | |
| return 1.0 # Default to fresh for live results | |
| def _extract_domain(self, url: str) -> str: | |
| """ | |
| Extract domain name from URL. | |
| Args: | |
| url: Full URL | |
| Returns: | |
| Domain name (e.g., "bbc.com") | |
| """ | |
| try: | |
| from urllib.parse import urlparse | |
| parsed = urlparse(url) | |
| domain = parsed.netloc | |
| # Remove www. prefix | |
| if domain.startswith("www."): | |
| domain = domain[4:] | |
| return domain | |
| except: | |
| return "unknown" | |
| def is_available(self) -> bool: | |
| """ | |
| Check if DuckDuckGo search is available. | |
| Returns: | |
| True if available, False otherwise | |
| """ | |
| return HAS_DDGS and self.ddgs is not None | |
| # Module-level singleton for easy import | |
| _default_adapter = None | |
| def get_duckduckgo_adapter(timeout: float = 1.5, max_results: int = 15) -> DuckDuckGoAdapter: | |
| """ | |
| Get or create the default DuckDuckGo adapter instance. | |
| Args: | |
| timeout: Search timeout in seconds | |
| max_results: Maximum results to return | |
| Returns: | |
| DuckDuckGoAdapter instance | |
| """ | |
| global _default_adapter | |
| if _default_adapter is None: | |
| _default_adapter = DuckDuckGoAdapter(timeout=timeout, max_results=max_results) | |
| return _default_adapter | |