SignalMesh / tools /rss_matrix_sync.py
Ig0tU
Initial Release: Signal Mesh Spatially Indexed Orchestrator
dba8b8e
Raw
History Blame Contribute Delete
2.51 kB
import xml.etree.ElementTree as ET
from typing import Any, Dict, Optional
import httpx
from app.tool.base import BaseTool
from app.utils.spatial_grid import SpatialGridManager
from app.logger import logger
class RSSMatrixSync(BaseTool):
"""
Synchronizes RSS feed items to the Agent Matrix via spatial indexing.
Mimics MCP functionality by populating grid nodes with live data.
"""
name: str = "rss_matrix_sync"
description: str = "Fetch RSS feed and sync items to matrix nodes using URI-based spatial indexing."
parameters: dict = {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL of the RSS feed to synchronize."
}
},
"required": ["url"]
}
async def execute(self, url: str) -> str:
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
root = ET.fromstring(response.content)
items = root.findall(".//item")
grid_mgr = SpatialGridManager()
sync_count = 0
assignments = []
for item in items:
title = item.find("title").text if item.find("title") is not None else "No Title"
link = item.find("link").text if item.find("link") is not None else ""
pub_date = item.find("pubDate").text if item.find("pubDate") is not None else ""
description = item.find("description").text if item.find("description") is not None else ""
if not link:
continue
content = {
"title": title,
"published": pub_date,
"summary": description[:500] if description else "" # Truncate for context efficiency
}
result = grid_mgr.sync_data_to_node(link, content)
if result:
agent_key, coords = result
assignments.append(f"Item '{title}' -> Node {coords} ({agent_key})")
sync_count += 1
return f"Successfully synchronized {sync_count} items to the spatial grid.\n" + "\n".join(assignments[:10])
except Exception as e:
logger.error(f"RSS Spatial Sync failed: {e}")
return f"Error during spatial synchronization: {str(e)}"