import xml.etree.ElementTree as ET from typing import Any, Dict, Optional import httpx from app.tool.base import BaseTool from app.utils.spatial_grid import SpatialGridManager from app.logger import logger class RSSMatrixSync(BaseTool): """ Synchronizes RSS feed items to the Agent Matrix via spatial indexing. Mimics MCP functionality by populating grid nodes with live data. """ name: str = "rss_matrix_sync" description: str = "Fetch RSS feed and sync items to matrix nodes using URI-based spatial indexing." parameters: dict = { "type": "object", "properties": { "url": { "type": "string", "description": "The URL of the RSS feed to synchronize." } }, "required": ["url"] } async def execute(self, url: str) -> str: try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url) response.raise_for_status() root = ET.fromstring(response.content) items = root.findall(".//item") grid_mgr = SpatialGridManager() sync_count = 0 assignments = [] for item in items: title = item.find("title").text if item.find("title") is not None else "No Title" link = item.find("link").text if item.find("link") is not None else "" pub_date = item.find("pubDate").text if item.find("pubDate") is not None else "" description = item.find("description").text if item.find("description") is not None else "" if not link: continue content = { "title": title, "published": pub_date, "summary": description[:500] if description else "" # Truncate for context efficiency } result = grid_mgr.sync_data_to_node(link, content) if result: agent_key, coords = result assignments.append(f"Item '{title}' -> Node {coords} ({agent_key})") sync_count += 1 return f"Successfully synchronized {sync_count} items to the spatial grid.\n" + "\n".join(assignments[:10]) except Exception as e: logger.error(f"RSS Spatial Sync failed: {e}") return f"Error during spatial synchronization: {str(e)}"