Spaces:
Running
Running
| """ | |
| Top Stories API Endpoint | |
| Provides fresh news headlines for the landing page. | |
| Hybrid approach: 3 from Kafka news.processed (pipeline-fresh) + 3 from DuckDuckGo (live). | |
| Fast, cached, and optimized for frontend display. | |
| """ | |
| import logging | |
| import asyncio | |
| import json | |
| from typing import List, Optional | |
| from fastapi import APIRouter, Query, Depends | |
| from pydantic import BaseModel | |
| from datetime import datetime | |
| from src.api.dependencies import get_cache_port, get_live_search_port | |
| from src.core.ports.cache_port import CachePort | |
| from src.infrastructure.adapters.duckduckgo_adapter import DuckDuckGoAdapter | |
| try: | |
| import msgpack | |
| HAS_MSGPACK = True | |
| except ImportError: | |
| HAS_MSGPACK = False | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| class TopStory(BaseModel): | |
| """Single news story for frontend display""" | |
| title: str | |
| url: str | |
| source: str | |
| published_at: str | |
| category: str = "NEWS" | |
| excerpt: Optional[str] = None | |
| image_url: Optional[str] = None | |
| origin: str = "kafka" # "kafka" or "live" | |
| class TopStoriesResponse(BaseModel): | |
| """Response with top stories""" | |
| stories: List[TopStory] | |
| fetched_at: str | |
| cache_hit: bool = False | |
| kafka_count: int = 0 | |
| live_count: int = 0 | |
| # Default TTL for top stories (15 minutes β balanced for performance) | |
| _cache_ttl = 900 | |
| # ββ Kafka: read latest N messages from news.processed ββββββββββββββββββββββββ | |
| def _fetch_kafka_stories_sync(n: int = 3) -> List[TopStory]: | |
| """ | |
| Read the N most recent messages from the news.processed Kafka topic. | |
| Uses a temporary consumer that seeks to the end of each partition, | |
| then reads backwards to get the latest messages. | |
| Runs synchronously (called via executor). | |
| """ | |
| import os | |
| from confluent_kafka import Consumer, TopicPartition | |
| bootstrap = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "") | |
| topic = os.getenv("KAFKA_TOPIC_PROCESSED", "news.processed") | |
| if not bootstrap: | |
| logger.warning("KAFKA_BOOTSTRAP_SERVERS not set β skipping Kafka top stories") | |
| return [] | |
| # SSL certs: support both env-var content and file paths | |
| # Priority: env var content β file path β skip SSL | |
| def _write_cert(env_content_key: str, env_path_key: str, tmp_path: str) -> bool: | |
| content = os.getenv(env_content_key, "") | |
| if content: | |
| with open(tmp_path, "w") as f: | |
| f.write(content.replace("\\n", "\n")) | |
| return True | |
| file_path = os.getenv(env_path_key, "") | |
| if file_path and os.path.exists(file_path): | |
| import shutil | |
| shutil.copy(file_path, tmp_path) | |
| return True | |
| # Try default cert locations (HF Spaces mounts certs here) | |
| default_paths = [ | |
| f"/app/certs/{os.path.basename(tmp_path)}", | |
| f"certs/{os.path.basename(tmp_path)}", | |
| ] | |
| for dp in default_paths: | |
| if os.path.exists(dp): | |
| import shutil | |
| shutil.copy(dp, tmp_path) | |
| return True | |
| return False | |
| has_ca = _write_cert("KAFKA_SSL_CA", "KAFKA_SSL_CA_PATH", "/tmp/ca.pem") | |
| has_cert = _write_cert("KAFKA_SSL_CERT", "KAFKA_SSL_CERT_PATH", "/tmp/service.cert") | |
| has_key = _write_cert("KAFKA_SSL_KEY", "KAFKA_SSL_KEY_PATH", "/tmp/service.key") | |
| conf = { | |
| "bootstrap.servers": bootstrap, | |
| "group.id": "top-stories-reader", | |
| "auto.offset.reset": "latest", | |
| "enable.auto.commit": False, | |
| "log_level": 0, | |
| "session.timeout.ms": 10000, | |
| } | |
| if has_ca and has_cert and has_key: | |
| conf["security.protocol"] = "SSL" | |
| conf["ssl.ca.location"] = "/tmp/ca.pem" | |
| conf["ssl.certificate.location"] = "/tmp/service.cert" | |
| conf["ssl.key.location"] = "/tmp/service.key" | |
| logger.info("Kafka SSL configured for top stories consumer") | |
| else: | |
| logger.warning("Kafka SSL certs not found β connecting without SSL") | |
| consumer = Consumer(conf) | |
| stories: List[TopStory] = [] | |
| try: | |
| # Get partition metadata | |
| meta = consumer.list_topics(topic, timeout=5) | |
| if topic not in meta.topics: | |
| logger.warning(f"Kafka topic '{topic}' not found") | |
| return [] | |
| partitions = [ | |
| TopicPartition(topic, p) | |
| for p in meta.topics[topic].partitions.keys() | |
| ] | |
| # Get high watermarks and seek to (high - n) per partition | |
| assigned = [] | |
| for tp in partitions: | |
| low, high = consumer.get_watermark_offsets(tp, timeout=5) | |
| if high > 0: | |
| start = max(low, high - n) | |
| assigned.append(TopicPartition(topic, tp.partition, start)) | |
| if not assigned: | |
| return [] | |
| consumer.assign(assigned) | |
| # Poll until we have n messages or timeout | |
| import time | |
| deadline = time.time() + 5.0 | |
| raw_messages = [] | |
| while len(raw_messages) < n and time.time() < deadline: | |
| msg = consumer.poll(timeout=1.0) | |
| if msg is None: | |
| break | |
| if msg.error(): | |
| continue | |
| raw_messages.append(msg) | |
| # Parse messages | |
| seen_titles: set = set() | |
| for msg in raw_messages: | |
| try: | |
| value = msg.value() | |
| try: | |
| event = msgpack.unpackb(value, raw=False) if HAS_MSGPACK else None | |
| if event is None: | |
| raise ValueError("msgpack not available") | |
| except Exception: | |
| event = json.loads(value.decode("utf-8", errors="ignore")) | |
| title = event.get("title") or event.get("content", "")[:80] | |
| url = event.get("url") or event.get("link") or "" | |
| source = event.get("source") or event.get("publisher") or "ARKI" | |
| pub_at = event.get("published_at") or event.get("pub_date") or datetime.utcnow().isoformat() | |
| content = event.get("content") or event.get("text") or "" | |
| excerpt = content[:150] if content else None | |
| if not title or title in seen_titles: | |
| continue | |
| seen_titles.add(title) | |
| stories.append(TopStory( | |
| title=title.strip()[:200], | |
| url=url.strip(), | |
| source=source.strip(), | |
| published_at=pub_at, | |
| category="NEWS", | |
| excerpt=excerpt, | |
| image_url=event.get("image_url") or event.get("thumbnail"), | |
| origin="kafka", | |
| )) | |
| except Exception as e: | |
| logger.debug(f"Failed to parse Kafka message: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Kafka top stories error: {e}") | |
| finally: | |
| consumer.close() | |
| logger.info(f"Kafka top stories: fetched {len(stories)} from '{topic}'") | |
| return stories[:n] | |
| async def fetch_kafka_stories(n: int = 3) -> List[TopStory]: | |
| """Async wrapper β runs Kafka consumer in thread pool""" | |
| loop = asyncio.get_event_loop() | |
| try: | |
| return await asyncio.wait_for( | |
| loop.run_in_executor(None, _fetch_kafka_stories_sync, n), | |
| timeout=6.0 | |
| ) | |
| except asyncio.TimeoutError: | |
| logger.warning("Kafka top stories timeout") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Kafka top stories async error: {e}") | |
| return [] | |
| # ββ DuckDuckGo: fetch N live stories βββββββββββββββββββββββββββββββββββββββββ | |
| async def fetch_live_stories(n: int = 6, adapter: DuckDuckGoAdapter = None) -> List[TopStory]: | |
| """Fetch N live stories from DuckDuckGo using the dedicated adapter""" | |
| if not adapter: | |
| return [] | |
| async def fetch_live_stories(n: int = 6, adapter: DuckDuckGoAdapter = None) -> List[TopStory]: | |
| """Fetch N live stories from DuckDuckGo using multi-region queries for maximum yield""" | |
| if not adapter: | |
| return [] | |
| try: | |
| # We run 4 parallel searches with different regional focuses | |
| search_configs = [ | |
| {"q": "Ethiopia news breaking today", "reg": "et-en"}, # Local Focus | |
| {"q": "Ethiopia latest breaking news", "reg": "wt-wt"}, # Global Focus (CNN, BBC, etc) | |
| {"q": "Addis Ababa news updates", "reg": "et-en"}, # Capital Focus | |
| {"q": "Ethiopia world news reporting", "reg": "us-en"} # International Perspective | |
| ] | |
| search_tasks = [ | |
| adapter.search(conf["q"], region=conf["reg"], max_results=10) | |
| for conf in search_configs | |
| ] | |
| all_results_lists = await asyncio.gather(*search_tasks) | |
| # Flatten and deduplicate | |
| stories = [] | |
| seen_urls = set() | |
| seen_titles = set() | |
| for results in all_results_lists: | |
| for r in results: | |
| url = r.get("url", "#") | |
| title = r.get("title", "Untitled") | |
| title_key = title.lower().strip()[:60] | |
| # Check for duplicates or empty titles | |
| if url in seen_urls or title_key in seen_titles or len(title) < 10: | |
| continue | |
| seen_urls.add(url) | |
| seen_titles.add(title_key) | |
| stories.append(TopStory( | |
| title=title, | |
| url=url, | |
| source=r.get("source", "Live News"), | |
| published_at=r.get("published_at", datetime.utcnow().isoformat()), | |
| category="BREAKING", | |
| excerpt=r.get("content", "")[:150], | |
| image_url=r.get("image_url") or r.get("thumbnail"), | |
| origin="live", | |
| )) | |
| # Sorting: Prioritize those with images, then by freshness | |
| stories.sort(key=lambda s: (1 if s.image_url else 0, s.published_at), reverse=True) | |
| logger.info(f"Multi-region search: collected {len(stories)} unique stories") | |
| return stories[:n] | |
| except Exception as e: | |
| logger.error(f"Live top stories error: {e}") | |
| return [] | |
| # ββ Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_top_stories( | |
| force_refresh: bool = Query(default=False, description="Force cache refresh"), | |
| cache: CachePort = Depends(get_cache_port), | |
| adapter: DuckDuckGoAdapter = Depends(get_live_search_port) | |
| ): | |
| """ | |
| Get top 6 news stories for the landing page. | |
| Combines pipeline-fresh Kafka news with live-search results. | |
| Uses Redis for global caching. | |
| """ | |
| cache_key = "arki_top_stories_v2" | |
| if not force_refresh: | |
| cached = cache.get(cache_key) | |
| if cached: | |
| try: | |
| data = json.loads(cached) | |
| logger.info("Top stories Redis cache HIT") | |
| return TopStoriesResponse( | |
| stories=[TopStory(**s) for s in data["stories"]], | |
| fetched_at=data["fetched_at"], | |
| cache_hit=True, | |
| kafka_count=data["kafka_count"], | |
| live_count=data["live_count"], | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to parse top stories cache: {e}") | |
| # Fetch both sources in parallel | |
| kafka_stories, live_stories = await asyncio.gather( | |
| fetch_kafka_stories(4), | |
| fetch_live_stories(6, adapter), | |
| ) | |
| # Merge and deduplicate | |
| all_stories: List[TopStory] = [] | |
| seen_titles: set = set() | |
| for story in live_stories + kafka_stories: # Prioritize live | |
| title_key = story.title.lower().strip()[:60] | |
| if title_key not in seen_titles: | |
| seen_titles.add(title_key) | |
| all_stories.append(story) | |
| # Ensure exactly 6 | |
| final_stories = all_stories[:6] | |
| logger.info(f"Final top stories count: {len(final_stories)}") | |
| now_iso = datetime.utcnow().isoformat() | |
| payload = { | |
| "stories": [s.dict() for s in final_stories], | |
| "fetched_at": now_iso, | |
| "kafka_count": len(kafka_stories), | |
| "live_count": len(live_stories), | |
| } | |
| # Store in Redis | |
| cache.set(cache_key, json.dumps(payload), expiration=_cache_ttl) | |
| return TopStoriesResponse( | |
| stories=final_stories, | |
| fetched_at=now_iso, | |
| cache_hit=False, | |
| kafka_count=len(kafka_stories), | |
| live_count=len(live_stories), | |
| ) | |
| async def refresh_top_stories(): | |
| """Clear the top stories cache""" | |
| global _cache | |
| _cache.clear() | |
| return {"success": True, "cleared_at": datetime.utcnow().isoformat()} | |
| async def get_categories(): | |
| return { | |
| "categories": [ | |
| {"id": "news", "name": "News", "query": "Ethiopia"}, | |
| {"id": "politics", "name": "Politics", "query": "Ethiopia politics"}, | |
| {"id": "economy", "name": "Economy", "query": "Ethiopia economy"}, | |
| {"id": "sports", "name": "Sports", "query": "Ethiopia sports"}, | |
| ] | |
| } | |