Spaces:
Running
Running
File size: 6,662 Bytes
24f95f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | """
Knowledge ingestion from external sources.
Ingests knowledge from web search, URLs, and news sources,
compressing content to 2-4KB summaries for efficient storage.
"""
import logging
from typing import Dict, Any, Optional
import httpx
from datetime import datetime
logger = logging.getLogger(__name__)
class KnowledgeIngestor:
"""Ingests knowledge from external sources."""
def __init__(self, tavily_key: Optional[str], newsapi_key: Optional[str], model_fn):
self.tavily_key = tavily_key
self.newsapi_key = newsapi_key
self.model_fn = model_fn
self.jina_reader_base = "https://r.jina.ai/"
async def ingest_from_search(self, query: str, max_results: int = 5) -> list[Dict[str, Any]]:
"""
Ingest knowledge from web search using Tavily API.
Args:
query: Search query
max_results: Maximum number of results to ingest
Returns:
List of knowledge items with compressed content
"""
if not self.tavily_key:
logger.warning("Tavily API key not configured")
return []
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"https://api.tavily.com/search",
json={
"api_key": self.tavily_key,
"query": query,
"max_results": max_results,
}
)
response.raise_for_status()
data = response.json()
items = []
for result in data.get("results", []):
summary = await self.compress_content(result.get("content", ""))
items.append({
"source": "tavily_search",
"query": query,
"url": result.get("url"),
"title": result.get("title"),
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
})
logger.info(f"Ingested {len(items)} items from Tavily search: {query}")
return items
except Exception as e:
logger.error(f"Failed to ingest from Tavily search: {e}")
return []
async def ingest_from_url(self, url: str) -> Optional[Dict[str, Any]]:
"""
Ingest knowledge from a specific URL using Jina Reader.
Args:
url: URL to ingest
Returns:
Knowledge item with compressed content, or None if failed
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f"{self.jina_reader_base}{url}")
response.raise_for_status()
content = response.text
summary = await self.compress_content(content)
item = {
"source": "jina_reader",
"url": url,
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
}
logger.info(f"Ingested content from URL: {url}")
return item
except Exception as e:
logger.error(f"Failed to ingest from URL {url}: {e}")
return None
async def ingest_from_news(self, query: str, max_results: int = 10) -> list[Dict[str, Any]]:
"""
Ingest knowledge from news sources using NewsAPI.
Args:
query: News search query
max_results: Maximum number of articles to ingest
Returns:
List of knowledge items with compressed content
"""
if not self.newsapi_key:
logger.warning("NewsAPI key not configured")
return []
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
"https://newsapi.org/v2/everything",
params={
"apiKey": self.newsapi_key,
"q": query,
"pageSize": max_results,
"sortBy": "publishedAt",
}
)
response.raise_for_status()
data = response.json()
items = []
for article in data.get("articles", []):
content = f"{article.get('title', '')} {article.get('description', '')} {article.get('content', '')}"
summary = await self.compress_content(content)
items.append({
"source": "newsapi",
"query": query,
"url": article.get("url"),
"title": article.get("title"),
"published_at": article.get("publishedAt"),
"summary": summary,
"ingested_at": datetime.utcnow().isoformat(),
})
logger.info(f"Ingested {len(items)} articles from NewsAPI: {query}")
return items
except Exception as e:
logger.error(f"Failed to ingest from NewsAPI: {e}")
return []
async def compress_content(self, content: str) -> str:
"""
Compress content to 2-4KB summary using LLM.
Args:
content: Raw content to compress
Returns:
Compressed summary (2-4KB)
"""
if not content:
return ""
# If already small enough, return as-is
if len(content.encode('utf-8')) <= 4096:
return content
prompt = f"""Summarize the following content into a concise summary of 500-1000 words.
Focus on key facts, insights, and actionable information.
Content:
{content[:10000]}
Summary:"""
try:
summary = await self.model_fn(prompt, max_tokens=1500)
# Ensure summary is within 2-4KB range
summary_bytes = summary.encode('utf-8')
if len(summary_bytes) > 4096:
# Truncate if too long
summary = summary_bytes[:4096].decode('utf-8', errors='ignore')
return summary
except Exception as e:
logger.error(f"Failed to compress content: {e}")
# Return truncated content as fallback
return content[:4096]
|