""" Top Stories API Endpoint Provides fresh news headlines for the landing page. Hybrid approach: 3 from Kafka news.processed (pipeline-fresh) + 3 from DuckDuckGo (live). Fast, cached, and optimized for frontend display. """ import logging import asyncio import json from typing import List, Optional from fastapi import APIRouter, Query, Depends from pydantic import BaseModel from datetime import datetime from src.api.dependencies import get_cache_port, get_live_search_port from src.core.ports.cache_port import CachePort from src.infrastructure.adapters.duckduckgo_adapter import DuckDuckGoAdapter try: import msgpack HAS_MSGPACK = True except ImportError: HAS_MSGPACK = False logger = logging.getLogger(__name__) router = APIRouter() class TopStory(BaseModel): """Single news story for frontend display""" title: str url: str source: str published_at: str category: str = "NEWS" excerpt: Optional[str] = None image_url: Optional[str] = None origin: str = "kafka" # "kafka" or "live" class TopStoriesResponse(BaseModel): """Response with top stories""" stories: List[TopStory] fetched_at: str cache_hit: bool = False kafka_count: int = 0 live_count: int = 0 # Default TTL for top stories (15 minutes — balanced for performance) _cache_ttl = 900 # ── Kafka: read latest N messages from news.processed ──────────────────────── def _fetch_kafka_stories_sync(n: int = 3) -> List[TopStory]: """ Read the N most recent messages from the news.processed Kafka topic. Uses a temporary consumer that seeks to the end of each partition, then reads backwards to get the latest messages. Runs synchronously (called via executor). """ import os from confluent_kafka import Consumer, TopicPartition bootstrap = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "") topic = os.getenv("KAFKA_TOPIC_PROCESSED", "news.processed") if not bootstrap: logger.warning("KAFKA_BOOTSTRAP_SERVERS not set — skipping Kafka top stories") return [] # SSL certs: support both env-var content and file paths # Priority: env var content → file path → skip SSL def _write_cert(env_content_key: str, env_path_key: str, tmp_path: str) -> bool: content = os.getenv(env_content_key, "") if content: with open(tmp_path, "w") as f: f.write(content.replace("\\n", "\n")) return True file_path = os.getenv(env_path_key, "") if file_path and os.path.exists(file_path): import shutil shutil.copy(file_path, tmp_path) return True # Try default cert locations (HF Spaces mounts certs here) default_paths = [ f"/app/certs/{os.path.basename(tmp_path)}", f"certs/{os.path.basename(tmp_path)}", ] for dp in default_paths: if os.path.exists(dp): import shutil shutil.copy(dp, tmp_path) return True return False has_ca = _write_cert("KAFKA_SSL_CA", "KAFKA_SSL_CA_PATH", "/tmp/ca.pem") has_cert = _write_cert("KAFKA_SSL_CERT", "KAFKA_SSL_CERT_PATH", "/tmp/service.cert") has_key = _write_cert("KAFKA_SSL_KEY", "KAFKA_SSL_KEY_PATH", "/tmp/service.key") conf = { "bootstrap.servers": bootstrap, "group.id": "top-stories-reader", "auto.offset.reset": "latest", "enable.auto.commit": False, "log_level": 0, "session.timeout.ms": 10000, } if has_ca and has_cert and has_key: conf["security.protocol"] = "SSL" conf["ssl.ca.location"] = "/tmp/ca.pem" conf["ssl.certificate.location"] = "/tmp/service.cert" conf["ssl.key.location"] = "/tmp/service.key" logger.info("Kafka SSL configured for top stories consumer") else: logger.warning("Kafka SSL certs not found — connecting without SSL") consumer = Consumer(conf) stories: List[TopStory] = [] try: # Get partition metadata meta = consumer.list_topics(topic, timeout=5) if topic not in meta.topics: logger.warning(f"Kafka topic '{topic}' not found") return [] partitions = [ TopicPartition(topic, p) for p in meta.topics[topic].partitions.keys() ] # Get high watermarks and seek to (high - n) per partition assigned = [] for tp in partitions: low, high = consumer.get_watermark_offsets(tp, timeout=5) if high > 0: start = max(low, high - n) assigned.append(TopicPartition(topic, tp.partition, start)) if not assigned: return [] consumer.assign(assigned) # Poll until we have n messages or timeout import time deadline = time.time() + 5.0 raw_messages = [] while len(raw_messages) < n and time.time() < deadline: msg = consumer.poll(timeout=1.0) if msg is None: break if msg.error(): continue raw_messages.append(msg) # Parse messages seen_titles: set = set() for msg in raw_messages: try: value = msg.value() try: event = msgpack.unpackb(value, raw=False) if HAS_MSGPACK else None if event is None: raise ValueError("msgpack not available") except Exception: event = json.loads(value.decode("utf-8", errors="ignore")) title = event.get("title") or event.get("content", "")[:80] url = event.get("url") or event.get("link") or "" source = event.get("source") or event.get("publisher") or "ARKI" pub_at = event.get("published_at") or event.get("pub_date") or datetime.utcnow().isoformat() content = event.get("content") or event.get("text") or "" excerpt = content[:150] if content else None if not title or title in seen_titles: continue seen_titles.add(title) stories.append(TopStory( title=title.strip()[:200], url=url.strip(), source=source.strip(), published_at=pub_at, category="NEWS", excerpt=excerpt, image_url=event.get("image_url") or event.get("thumbnail"), origin="kafka", )) except Exception as e: logger.debug(f"Failed to parse Kafka message: {e}") continue except Exception as e: logger.error(f"Kafka top stories error: {e}") finally: consumer.close() logger.info(f"Kafka top stories: fetched {len(stories)} from '{topic}'") return stories[:n] async def fetch_kafka_stories(n: int = 3) -> List[TopStory]: """Async wrapper — runs Kafka consumer in thread pool""" loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(None, _fetch_kafka_stories_sync, n), timeout=6.0 ) except asyncio.TimeoutError: logger.warning("Kafka top stories timeout") return [] except Exception as e: logger.error(f"Kafka top stories async error: {e}") return [] # ── DuckDuckGo: fetch N live stories ───────────────────────────────────────── async def fetch_live_stories(n: int = 6, adapter: DuckDuckGoAdapter = None) -> List[TopStory]: """Fetch N live stories from DuckDuckGo using the dedicated adapter""" if not adapter: return [] async def fetch_live_stories(n: int = 6, adapter: DuckDuckGoAdapter = None) -> List[TopStory]: """Fetch N live stories from DuckDuckGo using multi-region queries for maximum yield""" if not adapter: return [] try: # We run 4 parallel searches with different regional focuses search_configs = [ {"q": "Ethiopia news breaking today", "reg": "et-en"}, # Local Focus {"q": "Ethiopia latest breaking news", "reg": "wt-wt"}, # Global Focus (CNN, BBC, etc) {"q": "Addis Ababa news updates", "reg": "et-en"}, # Capital Focus {"q": "Ethiopia world news reporting", "reg": "us-en"} # International Perspective ] search_tasks = [ adapter.search(conf["q"], region=conf["reg"], max_results=10) for conf in search_configs ] all_results_lists = await asyncio.gather(*search_tasks) # Flatten and deduplicate stories = [] seen_urls = set() seen_titles = set() for results in all_results_lists: for r in results: url = r.get("url", "#") title = r.get("title", "Untitled") title_key = title.lower().strip()[:60] # Check for duplicates or empty titles if url in seen_urls or title_key in seen_titles or len(title) < 10: continue seen_urls.add(url) seen_titles.add(title_key) stories.append(TopStory( title=title, url=url, source=r.get("source", "Live News"), published_at=r.get("published_at", datetime.utcnow().isoformat()), category="BREAKING", excerpt=r.get("content", "")[:150], image_url=r.get("image_url") or r.get("thumbnail"), origin="live", )) # Sorting: Prioritize those with images, then by freshness stories.sort(key=lambda s: (1 if s.image_url else 0, s.published_at), reverse=True) logger.info(f"Multi-region search: collected {len(stories)} unique stories") return stories[:n] except Exception as e: logger.error(f"Live top stories error: {e}") return [] # ── Endpoint ────────────────────────────────────────────────────────────────── @router.get("/top-stories", response_model=TopStoriesResponse) async def get_top_stories( force_refresh: bool = Query(default=False, description="Force cache refresh"), cache: CachePort = Depends(get_cache_port), adapter: DuckDuckGoAdapter = Depends(get_live_search_port) ): """ Get top 6 news stories for the landing page. Combines pipeline-fresh Kafka news with live-search results. Uses Redis for global caching. """ cache_key = "arki_top_stories_v2" if not force_refresh: cached = cache.get(cache_key) if cached: try: data = json.loads(cached) logger.info("Top stories Redis cache HIT") return TopStoriesResponse( stories=[TopStory(**s) for s in data["stories"]], fetched_at=data["fetched_at"], cache_hit=True, kafka_count=data["kafka_count"], live_count=data["live_count"], ) except Exception as e: logger.warning(f"Failed to parse top stories cache: {e}") # Fetch both sources in parallel kafka_stories, live_stories = await asyncio.gather( fetch_kafka_stories(4), fetch_live_stories(6, adapter), ) # Merge and deduplicate all_stories: List[TopStory] = [] seen_titles: set = set() for story in live_stories + kafka_stories: # Prioritize live title_key = story.title.lower().strip()[:60] if title_key not in seen_titles: seen_titles.add(title_key) all_stories.append(story) # Ensure exactly 6 final_stories = all_stories[:6] logger.info(f"Final top stories count: {len(final_stories)}") now_iso = datetime.utcnow().isoformat() payload = { "stories": [s.dict() for s in final_stories], "fetched_at": now_iso, "kafka_count": len(kafka_stories), "live_count": len(live_stories), } # Store in Redis cache.set(cache_key, json.dumps(payload), expiration=_cache_ttl) return TopStoriesResponse( stories=final_stories, fetched_at=now_iso, cache_hit=False, kafka_count=len(kafka_stories), live_count=len(live_stories), ) @router.post("/top-stories/refresh") async def refresh_top_stories(): """Clear the top stories cache""" global _cache _cache.clear() return {"success": True, "cleared_at": datetime.utcnow().isoformat()} @router.get("/top-stories/categories") async def get_categories(): return { "categories": [ {"id": "news", "name": "News", "query": "Ethiopia"}, {"id": "politics", "name": "Politics", "query": "Ethiopia politics"}, {"id": "economy", "name": "Economy", "query": "Ethiopia economy"}, {"id": "sports", "name": "Sports", "query": "Ethiopia sports"}, ] }