Spaces:
Running
Running
File size: 2,612 Bytes
c8cd75e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import asyncio
import time
import json
import logging
from typing import List, Dict, Any
from agent_plugins.search_agent import ResearchAgent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("IngestionCluster")
class RealtimeIngestionCluster:
def __init__(self):
self.search_agent = ResearchAgent()
self.intelligence_cache: Dict[str, Any] = {}
self.is_running = False
self.topics = ["IPL 2026 Live Scores", "AI Technology News", "Global Market Trends"]
async def start(self):
"""Starts the realtime ingestion loop."""
if self.is_running:
return
self.is_running = True
logger.info("Neural Ingestion Cluster Activated.")
asyncio.create_task(self._ingestion_loop())
async def _ingestion_loop(self):
while self.is_running:
try:
for topic in self.topics:
logger.info(f"Ingesting live intelligence for: {topic}")
results = self.search_agent.search_live(topic)
self.intelligence_cache[topic] = {
"data": results,
"timestamp": time.time(),
"summary": self._generate_quick_summary(results)
}
# Small delay between topics to avoid rate limits
await asyncio.sleep(2)
# Global sleep before next full sync (e.g., every 5 minutes)
await asyncio.sleep(300)
except Exception as e:
logger.error(f"Ingestion Error: {e}")
await asyncio.sleep(60)
def _generate_quick_summary(self, results: str) -> str:
# In a real scenario, we could use an LLM here to summarize.
# For now, we'll just take the first 500 chars as a "neural preview".
return results[:500] + "..." if len(results) > 500 else results
def query(self, query: str) -> List[Dict[str, Any]]:
"""Returns relevant intelligence from the cluster."""
relevant_data = []
query_lower = query.lower()
for topic, info in self.intelligence_cache.items():
if any(word in topic.lower() for word in query_lower.split()):
relevant_data.append({
"topic": topic,
"intelligence": info["data"],
"freshness": f"{int(time.time() - info['timestamp'])}s ago"
})
return relevant_data
# Singleton instance
ingestion_cluster = RealtimeIngestionCluster()
|