Peterase commited on
Commit
7a3c674
Β·
1 Parent(s): 470548c

feat: visual top stories, redis caching, and automated 4-hour refresh

Browse files
Files changed (1) hide show
  1. src/api/routes/top_stories.py +65 -83
src/api/routes/top_stories.py CHANGED
@@ -14,6 +14,10 @@ from fastapi import APIRouter, Query
14
  from pydantic import BaseModel
15
  from datetime import datetime
16
 
 
 
 
 
17
  try:
18
  import msgpack
19
  HAS_MSGPACK = True
@@ -33,6 +37,7 @@ class TopStory(BaseModel):
33
  published_at: str
34
  category: str = "NEWS"
35
  excerpt: Optional[str] = None
 
36
  origin: str = "kafka" # "kafka" or "live"
37
 
38
 
@@ -45,9 +50,8 @@ class TopStoriesResponse(BaseModel):
45
  live_count: int = 0
46
 
47
 
48
- # Simple in-memory cache (2 minutes β€” shorter TTL for freshness)
49
- _cache: dict = {}
50
- _cache_ttl = 120 # 2 minutes
51
 
52
 
53
  # ── Kafka: read latest N messages from news.processed ────────────────────────
@@ -187,6 +191,7 @@ def _fetch_kafka_stories_sync(n: int = 3) -> List[TopStory]:
187
  published_at=pub_at,
188
  category="NEWS",
189
  excerpt=excerpt,
 
190
  origin="kafka",
191
  ))
192
 
@@ -221,46 +226,32 @@ async def fetch_kafka_stories(n: int = 3) -> List[TopStory]:
221
 
222
  # ── DuckDuckGo: fetch N live stories ─────────────────────────────────────────
223
 
224
- async def fetch_ddg_stories(n: int = 3) -> List[TopStory]:
225
- """Fetch N live stories from DuckDuckGo"""
 
 
 
226
  try:
227
- from ddgs import DDGS
228
-
229
- loop = asyncio.get_event_loop()
230
-
231
- def _search():
232
- ddgs = DDGS()
233
- return list(ddgs.news("Ethiopia", region="et-en", max_results=n))
234
-
235
- raw = await asyncio.wait_for(
236
- loop.run_in_executor(None, _search),
237
- timeout=5.0
238
- )
239
-
240
  stories = []
241
- for r in raw:
242
- title = r.get("title", "").strip()
243
- url = r.get("url", "").strip()
244
- if not title or not url:
245
- continue
246
  stories.append(TopStory(
247
- title=title,
248
- url=url,
249
- source=r.get("source", "Unknown").strip(),
250
- published_at=r.get("date", datetime.utcnow().isoformat()),
251
- category="NEWS",
252
- excerpt=r.get("body", "")[:150] if r.get("body") else None,
 
253
  origin="live",
254
  ))
255
-
256
- logger.info(f"DuckDuckGo top stories: fetched {len(stories)}")
257
- return stories[:n]
258
-
259
- except asyncio.TimeoutError:
260
- logger.warning("DuckDuckGo top stories timeout")
261
- return []
262
  except Exception as e:
263
- logger.error(f"DuckDuckGo top stories error: {e}")
264
  return []
265
 
266
 
@@ -269,76 +260,67 @@ async def fetch_ddg_stories(n: int = 3) -> List[TopStory]:
269
  @router.get("/top-stories", response_model=TopStoriesResponse)
270
  async def get_top_stories(
271
  force_refresh: bool = Query(default=False, description="Force cache refresh"),
 
 
272
  ):
273
  """
274
  Get top 6 news stories for the landing page.
275
-
276
- **Sources:**
277
- - 3 from Kafka `news.processed` topic (pipeline-fresh, multilingual)
278
- - 3 from DuckDuckGo live search (real-time, English)
279
-
280
- **Cache:** 2-minute TTL for freshness.
281
  """
282
- cache_key = "top_stories_hybrid"
283
-
284
- if not force_refresh and cache_key in _cache:
285
- cached_data, cached_time = _cache[cache_key]
286
- age = (datetime.utcnow() - cached_time).total_seconds()
287
- if age < _cache_ttl:
288
- logger.info(f"Top stories cache HIT (age={age:.0f}s)")
289
- return TopStoriesResponse(
290
- stories=cached_data["stories"],
291
- fetched_at=cached_time.isoformat(),
292
- cache_hit=True,
293
- kafka_count=cached_data["kafka_count"],
294
- live_count=cached_data["live_count"],
295
- )
 
 
 
296
 
297
  # Fetch both sources in parallel
298
- kafka_stories, ddg_stories = await asyncio.gather(
299
  fetch_kafka_stories(3),
300
- fetch_ddg_stories(3),
301
  )
302
 
303
- # Merge: Kafka first (pipeline-fresh), then DuckDuckGo (live)
304
- # Deduplicate by title similarity
305
  all_stories: List[TopStory] = []
306
  seen_titles: set = set()
307
 
308
- for story in kafka_stories + ddg_stories:
309
  title_key = story.title.lower()[:60]
310
  if title_key not in seen_titles:
311
  seen_titles.add(title_key)
312
  all_stories.append(story)
313
 
314
- # Fallback: if Kafka returned nothing, fill with more DuckDuckGo
315
- if len(kafka_stories) == 0:
316
- extra_ddg = await fetch_ddg_stories(6)
317
- for story in extra_ddg:
318
- title_key = story.title.lower()[:60]
319
- if title_key not in seen_titles and len(all_stories) < 6:
320
- seen_titles.add(title_key)
321
- all_stories.append(story)
322
-
323
- now = datetime.utcnow()
324
  payload = {
325
- "stories": all_stories[:6],
 
326
  "kafka_count": len(kafka_stories),
327
- "live_count": len(ddg_stories),
328
  }
329
- _cache[cache_key] = (payload, now)
330
-
331
- logger.info(
332
- f"Top stories: {len(kafka_stories)} Kafka + {len(ddg_stories)} DuckDuckGo "
333
- f"= {len(all_stories[:6])} total"
334
- )
335
 
336
  return TopStoriesResponse(
337
- stories=all_stories[:6],
338
- fetched_at=now.isoformat(),
339
  cache_hit=False,
340
  kafka_count=len(kafka_stories),
341
- live_count=len(ddg_stories),
342
  )
343
 
344
 
 
14
  from pydantic import BaseModel
15
  from datetime import datetime
16
 
17
+ from src.api.dependencies import get_cache_port, get_live_search_port
18
+ from src.core.ports.cache_port import CachePort
19
+ from src.infrastructure.adapters.duckduckgo_adapter import DuckDuckGoAdapter
20
+
21
  try:
22
  import msgpack
23
  HAS_MSGPACK = True
 
37
  published_at: str
38
  category: str = "NEWS"
39
  excerpt: Optional[str] = None
40
+ image_url: Optional[str] = None
41
  origin: str = "kafka" # "kafka" or "live"
42
 
43
 
 
50
  live_count: int = 0
51
 
52
 
53
+ # Default TTL for top stories (15 minutes β€” balanced for performance)
54
+ _cache_ttl = 900
 
55
 
56
 
57
  # ── Kafka: read latest N messages from news.processed ────────────────────────
 
191
  published_at=pub_at,
192
  category="NEWS",
193
  excerpt=excerpt,
194
+ image_url=event.get("image_url") or event.get("thumbnail"),
195
  origin="kafka",
196
  ))
197
 
 
226
 
227
  # ── DuckDuckGo: fetch N live stories ─────────────────────────────────────────
228
 
229
+ async def fetch_live_stories(n: int = 4, adapter: DuckDuckGoAdapter = None) -> List[TopStory]:
230
+ """Fetch N live stories from DuckDuckGo using the dedicated adapter"""
231
+ if not adapter:
232
+ return []
233
+
234
  try:
235
+ # Search for fresh Ethiopia news
236
+ query = "Ethiopia news today"
237
+ results = await adapter.search(query)
238
+
 
 
 
 
 
 
 
 
 
239
  stories = []
240
+ for r in results[:n]:
 
 
 
 
241
  stories.append(TopStory(
242
+ title=r.get("title", "Untitled"),
243
+ url=r.get("url", "#"),
244
+ source=r.get("source", "Live News"),
245
+ published_at=r.get("published_at", datetime.utcnow().isoformat()),
246
+ category="BREAKING",
247
+ excerpt=r.get("content", "")[:150],
248
+ image_url=r.get("image_url") or r.get("thumbnail"),
249
  origin="live",
250
  ))
251
+
252
+ return stories
 
 
 
 
 
253
  except Exception as e:
254
+ logger.error(f"Live top stories error: {e}")
255
  return []
256
 
257
 
 
260
  @router.get("/top-stories", response_model=TopStoriesResponse)
261
  async def get_top_stories(
262
  force_refresh: bool = Query(default=False, description="Force cache refresh"),
263
+ cache: CachePort = Depends(get_cache_port),
264
+ adapter: DuckDuckGoAdapter = Depends(get_live_search_port)
265
  ):
266
  """
267
  Get top 6 news stories for the landing page.
268
+ Combines pipeline-fresh Kafka news with live-search results.
269
+ Uses Redis for global caching.
 
 
 
 
270
  """
271
+ cache_key = "arki_top_stories_v2"
272
+
273
+ if not force_refresh:
274
+ cached = cache.get(cache_key)
275
+ if cached:
276
+ try:
277
+ data = json.loads(cached)
278
+ logger.info("Top stories Redis cache HIT")
279
+ return TopStoriesResponse(
280
+ stories=[TopStory(**s) for s in data["stories"]],
281
+ fetched_at=data["fetched_at"],
282
+ cache_hit=True,
283
+ kafka_count=data["kafka_count"],
284
+ live_count=data["live_count"],
285
+ )
286
+ except Exception as e:
287
+ logger.warning(f"Failed to parse top stories cache: {e}")
288
 
289
  # Fetch both sources in parallel
290
+ kafka_stories, live_stories = await asyncio.gather(
291
  fetch_kafka_stories(3),
292
+ fetch_live_stories(4, adapter),
293
  )
294
 
295
+ # Merge and deduplicate
 
296
  all_stories: List[TopStory] = []
297
  seen_titles: set = set()
298
 
299
+ for story in live_stories + kafka_stories: # Prioritize live for today's top stories
300
  title_key = story.title.lower()[:60]
301
  if title_key not in seen_titles:
302
  seen_titles.add(title_key)
303
  all_stories.append(story)
304
 
305
+ now_iso = datetime.utcnow().isoformat()
306
+ final_stories = all_stories[:6]
307
+
 
 
 
 
 
 
 
308
  payload = {
309
+ "stories": [s.dict() for s in final_stories],
310
+ "fetched_at": now_iso,
311
  "kafka_count": len(kafka_stories),
312
+ "live_count": len(live_stories),
313
  }
314
+
315
+ # Store in Redis
316
+ cache.set(cache_key, json.dumps(payload), expiration=_cache_ttl)
 
 
 
317
 
318
  return TopStoriesResponse(
319
+ stories=final_stories,
320
+ fetched_at=now_iso,
321
  cache_hit=False,
322
  kafka_count=len(kafka_stories),
323
+ live_count=len(live_stories),
324
  )
325
 
326