Janus-backend / backend /app /services /learning /knowledge_ingestor.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Knowledge ingestion from external sources.
Ingests knowledge from web search, URLs, and news sources,
compressing content to 2-4KB summaries for efficient storage.
"""
import logging
from typing import Dict, Any, Optional
import httpx
from datetime import datetime
logger = logging.getLogger(__name__)
class KnowledgeIngestor:
"""Ingests knowledge from external sources."""
def __init__(self, tavily_key: Optional[str], newsapi_key: Optional[str], model_fn):
self.tavily_key = tavily_key
self.newsapi_key = newsapi_key
self.model_fn = model_fn
self.jina_reader_base = "https://r.jina.ai/"
async def ingest_from_search(self, query: str, max_results: int = 5) -> list[Dict[str, Any]]:
"""
Ingest knowledge from web search using Tavily API.
Args:
query: Search query
max_results: Maximum number of results to ingest
Returns:
List of knowledge items with compressed content
"""
if not self.tavily_key:
logger.warning("Tavily API key not configured")
return []
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"https://api.tavily.com/search",
json={
"api_key": self.tavily_key,
"query": query,
"max_results": max_results,
}
)
response.raise_for_status()
data = response.json()
items = []
for result in data.get("results", []):
summary = await self.compress_content(result.get("content", ""))
items.append({
"source": "tavily_search",
"query": query,
"url": result.get("url"),
"title": result.get("title"),
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
})
logger.info(f"Ingested {len(items)} items from Tavily search: {query}")
return items
except Exception as e:
logger.error(f"Failed to ingest from Tavily search: {e}")
return []
async def ingest_from_url(self, url: str) -> Optional[Dict[str, Any]]:
"""
Ingest knowledge from a specific URL using Jina Reader.
Args:
url: URL to ingest
Returns:
Knowledge item with compressed content, or None if failed
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f"{self.jina_reader_base}{url}")
response.raise_for_status()
content = response.text
summary = await self.compress_content(content)
item = {
"source": "jina_reader",
"url": url,
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
}
logger.info(f"Ingested content from URL: {url}")
return item
except Exception as e:
logger.error(f"Failed to ingest from URL {url}: {e}")
return None
async def ingest_from_news(self, query: str, max_results: int = 10) -> list[Dict[str, Any]]:
"""
Ingest knowledge from news sources using NewsAPI.
Args:
query: News search query
max_results: Maximum number of articles to ingest
Returns:
List of knowledge items with compressed content
"""
if not self.newsapi_key:
logger.warning("NewsAPI key not configured")
return []
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
"https://newsapi.org/v2/everything",
params={
"apiKey": self.newsapi_key,
"q": query,
"pageSize": max_results,
"sortBy": "publishedAt",
}
)
response.raise_for_status()
data = response.json()
items = []
for article in data.get("articles", []):
content = f"{article.get('title', '')} {article.get('description', '')} {article.get('content', '')}"
summary = await self.compress_content(content)
items.append({
"source": "newsapi",
"query": query,
"url": article.get("url"),
"title": article.get("title"),
"published_at": article.get("publishedAt"),
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
})
logger.info(f"Ingested {len(items)} articles from NewsAPI: {query}")
return items
except Exception as e:
logger.error(f"Failed to ingest from NewsAPI: {e}")
return []
async def compress_content(self, content: str) -> str:
"""
Compress content to 2-4KB summary using LLM.
Args:
content: Raw content to compress
Returns:
Compressed summary (2-4KB)
"""
if not content:
return ""
# If already small enough, return as-is
if len(content.encode('utf-8')) <= 4096:
return content
prompt = f"""Summarize the following content into a concise summary of 500-1000 words.
Focus on key facts, insights, and actionable information.
Content:
{content[:10000]}
Summary:"""
try:
summary = await self.model_fn(prompt, max_tokens=1500)
# Ensure summary is within 2-4KB range
summary_bytes = summary.encode('utf-8')
if len(summary_bytes) > 4096:
# Truncate if too long
summary = summary_bytes[:4096].decode('utf-8', errors='ignore')
return summary
except Exception as e:
logger.error(f"Failed to compress content: {e}")
# Return truncated content as fallback
return content[:4096]