Spaces:
Running
Running
| import xml.etree.ElementTree as ET | |
| from typing import Any, Dict, Optional | |
| import httpx | |
| from app.tool.base import BaseTool | |
| from app.utils.spatial_grid import SpatialGridManager | |
| from app.logger import logger | |
| class RSSMatrixSync(BaseTool): | |
| """ | |
| Synchronizes RSS feed items to the Agent Matrix via spatial indexing. | |
| Mimics MCP functionality by populating grid nodes with live data. | |
| """ | |
| name: str = "rss_matrix_sync" | |
| description: str = "Fetch RSS feed and sync items to matrix nodes using URI-based spatial indexing." | |
| parameters: dict = { | |
| "type": "object", | |
| "properties": { | |
| "url": { | |
| "type": "string", | |
| "description": "The URL of the RSS feed to synchronize." | |
| } | |
| }, | |
| "required": ["url"] | |
| } | |
| async def execute(self, url: str) -> str: | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| root = ET.fromstring(response.content) | |
| items = root.findall(".//item") | |
| grid_mgr = SpatialGridManager() | |
| sync_count = 0 | |
| assignments = [] | |
| for item in items: | |
| title = item.find("title").text if item.find("title") is not None else "No Title" | |
| link = item.find("link").text if item.find("link") is not None else "" | |
| pub_date = item.find("pubDate").text if item.find("pubDate") is not None else "" | |
| description = item.find("description").text if item.find("description") is not None else "" | |
| if not link: | |
| continue | |
| content = { | |
| "title": title, | |
| "published": pub_date, | |
| "summary": description[:500] if description else "" # Truncate for context efficiency | |
| } | |
| result = grid_mgr.sync_data_to_node(link, content) | |
| if result: | |
| agent_key, coords = result | |
| assignments.append(f"Item '{title}' -> Node {coords} ({agent_key})") | |
| sync_count += 1 | |
| return f"Successfully synchronized {sync_count} items to the spatial grid.\n" + "\n".join(assignments[:10]) | |
| except Exception as e: | |
| logger.error(f"RSS Spatial Sync failed: {e}") | |
| return f"Error during spatial synchronization: {str(e)}" | |