rag-api-node-1 / src /infrastructure /adapters /jina_reader_adapter.py
Peterase's picture
fix: live doc dedup + Jina boilerplate stripping
d13f5bc
"""
Jina AI Reader Adapter
Extracts clean, full article content from URLs using Jina AI Reader API.
Removes ads, navigation, boilerplate, and returns markdown-formatted text.
Features:
- Async execution with timeout
- Parallel extraction for multiple URLs
- Graceful fallback to snippets on failure
- No API key required (free tier)
- 71x more content than snippets (14,000 vs 200 chars)
Integration:
- Enhances DuckDuckGo live search results
- Replaces 200-char snippets with full articles
- Improves LLM context quality dramatically
"""
import logging
import asyncio
import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class JinaReaderAdapter:
"""
Adapter for Jina AI Reader API.
Extracts full article content from URLs to enhance RAG context quality.
"""
def __init__(
self,
timeout: float = 10.0,
max_concurrent: int = 5,
base_url: str = "https://r.jina.ai"
):
"""
Initialize Jina Reader adapter.
Args:
timeout: Maximum time to wait per article (seconds)
max_concurrent: Maximum parallel extractions
base_url: Jina Reader API base URL
"""
self.base_url = base_url
self.timeout = timeout
self.max_concurrent = max_concurrent
self.client = None
logger.info(
f"Jina Reader initialized: timeout={timeout}s, "
f"max_concurrent={max_concurrent}"
)
async def _ensure_client(self):
"""Lazy initialization of HTTP client with optional API key auth"""
if self.client is None:
headers = {
"User-Agent": "ARKI-AI-RAG/2.4 (Ethiopia News Assistant)",
"Accept": "text/plain, text/markdown",
}
# Add Jina API key if available (required for most sites)
try:
from src.core.config import settings
jina_key = getattr(settings, "JINA_API_KEY", "")
if jina_key and jina_key not in ("", "your-jina-api-key-here"):
headers["Authorization"] = f"Bearer {jina_key}"
logger.info("Jina Reader: using API key authentication")
else:
logger.warning("Jina Reader: no API key set — most sites will return 401. Get free key at https://jina.ai")
except Exception:
pass
self.client = httpx.AsyncClient(
timeout=self.timeout,
follow_redirects=True,
headers=headers
)
async def extract_article(self, url: str) -> Dict[str, Any]:
"""
Extract clean article content from a single URL.
Args:
url: Article URL to extract
Returns:
Dict with:
- success: bool
- url: str
- title: str (if success)
- content: str (if success)
- length: int (if success)
- error: str (if failure)
"""
await self._ensure_client()
logger.debug(f"Extracting article: {url[:80]}")
try:
# Jina Reader API: https://r.jina.ai/{url}
jina_url = f"{self.base_url}/{url}"
response = await self.client.get(jina_url)
if response.status_code == 200:
content = response.text
# Parse markdown response
lines = content.split('\n')
# Extract title (first line, usually starts with # or Title:)
title = ""
if lines:
first_line = lines[0]
title = (
first_line
.replace('# ', '')
.replace('Title: ', '')
.strip()
)
# Extract body (skip title and empty lines)
body_lines = []
for i, line in enumerate(lines):
if i == 0: # Skip title line
continue
if line.strip(): # Skip empty lines at start
body_lines = lines[i:]
break
body = '\n'.join(body_lines).strip()
# ── Strip boilerplate: navigation, footer, archives ───────────
# Jina extracts the full page markdown including nav/footer.
# We cut at the first sign of boilerplate to keep only the article.
body = self._strip_boilerplate(body)
# Validate content
if not body or len(body) < 100:
logger.warning(
f"Jina returned insufficient content for {url[:50]} "
f"({len(body)} chars)"
)
return {
"success": False,
"url": url,
"error": "Insufficient content extracted"
}
logger.info(
f"✅ Jina extracted {len(body):,} chars from {url[:50]}"
)
return {
"success": True,
"url": url,
"title": title or "Untitled",
"content": body,
"length": len(body),
"extracted_at": datetime.utcnow().isoformat()
}
elif response.status_code == 451:
# 451 Unavailable For Legal Reasons (geo-blocking)
logger.debug(f"Jina: 451 geo-blocked for {url[:50]}")
return {
"success": False,
"url": url,
"error": "Content geo-blocked"
}
elif response.status_code == 404:
logger.debug(f"Jina: 404 not found for {url[:50]}")
return {
"success": False,
"url": url,
"error": "Article not found"
}
else:
logger.debug(
f"Jina returned status {response.status_code} for {url[:50]}"
)
return {
"success": False,
"url": url,
"error": f"HTTP {response.status_code}"
}
except asyncio.TimeoutError:
logger.debug(f"Jina timeout ({self.timeout}s) for {url[:50]}")
return {
"success": False,
"url": url,
"error": "Extraction timeout"
}
except Exception as e:
logger.debug(f"Jina extraction error for {url[:50]}: {e}")
return {
"success": False,
"url": url,
"error": str(e)
}
async def extract_multiple(
self,
urls: List[str],
max_articles: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Extract content from multiple URLs in parallel.
Args:
urls: List of article URLs
max_articles: Maximum articles to extract (default: max_concurrent)
Returns:
List of extraction results (same order as input URLs)
"""
if not urls:
return []
# Limit number of articles
max_articles = max_articles or self.max_concurrent
urls_to_extract = urls[:max_articles]
logger.info(
f"Extracting {len(urls_to_extract)} articles in parallel "
f"(max_concurrent={self.max_concurrent})"
)
# Create tasks for parallel extraction
tasks = [self.extract_article(url) for url in urls_to_extract]
# Execute with semaphore to limit concurrency
semaphore = asyncio.Semaphore(self.max_concurrent)
async def bounded_extract(task):
async with semaphore:
return await task
results = await asyncio.gather(
*[bounded_extract(task) for task in tasks],
return_exceptions=True
)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Extraction failed for {urls_to_extract[i][:50]}: {result}")
processed_results.append({
"success": False,
"url": urls_to_extract[i],
"error": str(result)
})
else:
processed_results.append(result)
# Log summary
successful = sum(1 for r in processed_results if r.get("success"))
total_chars = sum(r.get("length", 0) for r in processed_results if r.get("success"))
logger.info(
f"Jina extraction complete: {successful}/{len(processed_results)} successful, "
f"{total_chars:,} total chars"
)
return processed_results
async def enhance_search_results(
self,
search_results: List[Dict[str, Any]],
fallback_to_snippet: bool = True
) -> List[Dict[str, Any]]:
"""
Enhance search results by replacing snippets with full articles.
Args:
search_results: List of search results with URLs and snippets
fallback_to_snippet: Keep original snippet if extraction fails
Returns:
Enhanced search results with full article content
"""
if not search_results:
return []
# Extract URLs
urls = [r.get("url") for r in search_results if r.get("url")]
if not urls:
logger.warning("No URLs found in search results")
return search_results
# Extract full articles
extractions = await self.extract_multiple(urls)
# Merge extractions back into search results
enhanced_results = []
for i, result in enumerate(search_results):
enhanced = dict(result) # Copy original
if i < len(extractions):
extraction = extractions[i]
if extraction.get("success"):
# Replace snippet with full article
enhanced["content"] = extraction["content"]
enhanced["full_article"] = True
enhanced["content_length"] = extraction["length"]
enhanced["jina_title"] = extraction.get("title")
enhanced["extracted_at"] = extraction.get("extracted_at")
logger.debug(
f"Enhanced result {i+1}: {extraction['length']:,} chars "
f"(was {len(result.get('content', ''))}) chars"
)
else:
# Extraction failed
enhanced["full_article"] = False
enhanced["jina_error"] = extraction.get("error")
if not fallback_to_snippet:
# Remove result if fallback disabled
logger.debug(
f"Skipping result {i+1}: Jina failed and fallback disabled"
)
continue
else:
logger.debug(
f"Keeping snippet for result {i+1}: {extraction.get('error')}"
)
enhanced_results.append(enhanced)
# Log enhancement summary
full_articles = sum(1 for r in enhanced_results if r.get("full_article"))
snippets = len(enhanced_results) - full_articles
logger.info(
f"Enhanced {len(enhanced_results)} results: "
f"{full_articles} full articles, {snippets} snippets"
)
return enhanced_results
async def close(self):
"""Close HTTP client"""
if self.client:
await self.client.aclose()
self.client = None
logger.debug("Jina Reader client closed")
def _strip_boilerplate(self, content: str, max_chars: int = 8000) -> str:
"""
Strip navigation, footer, archives and other boilerplate from
Jina-extracted markdown. Keeps only the article body.
Strategy:
1. Cut at common boilerplate section markers
2. Hard cap at max_chars to avoid sending 176K chars to the LLM
"""
import re
# Markers that indicate end of article content
# Everything after these is navigation/footer/boilerplate
CUTOFF_PATTERNS = [
r'\n## (Post navigation|Archives|Categories|Recent Posts|Search|Newsletter|Socials|Tags|Related)',
r'\n### (Post navigation|Archives|Categories|Recent Posts|Related)',
r'\n\* \[Home\]\(', # Navigation list starting with Home
r'\n\* \[Facebook\]\(', # Social links
r'\nCopyright ©',
r'\n---\n.*\n---', # Horizontal rules often mark footer
r'\nShare on (Facebook|Twitter|X|LinkedIn)',
r'\n## Search\n',
r'\n## Newsletter\n',
r'\n## Socials\n',
]
for pattern in CUTOFF_PATTERNS:
match = re.search(pattern, content, re.IGNORECASE)
if match:
content = content[:match.start()].strip()
break
# Hard cap — LLM context window protection
if len(content) > max_chars:
# Try to cut at a paragraph boundary
cutoff = content[:max_chars].rfind('\n\n')
if cutoff > max_chars * 0.7:
content = content[:cutoff].strip()
else:
content = content[:max_chars].strip()
return content
def is_available(self) -> bool:
"""Check if Jina Reader is available"""
# Jina Reader is always available (no API key required)
return True
# Module-level singleton for easy import
_default_adapter = None
def get_jina_reader_adapter(
timeout: float = 10.0,
max_concurrent: int = 5
) -> JinaReaderAdapter:
"""
Get or create the default Jina Reader adapter instance.
Args:
timeout: Extraction timeout in seconds
max_concurrent: Maximum parallel extractions
Returns:
JinaReaderAdapter instance
"""
global _default_adapter
if _default_adapter is None:
_default_adapter = JinaReaderAdapter(
timeout=timeout,
max_concurrent=max_concurrent
)
return _default_adapter