Spaces:
Runtime error
Runtime error
| """CORE.XDA module implementation.""" | |
| from __future__ import annotations | |
| import asyncio | |
| from dataclasses import dataclass | |
| from typing import Dict, List | |
| from typing import Any, Dict | |
| from jenaai.core.module import BaseModule | |
| class SubAgent: | |
| name: str | |
| role: str | |
| class Module(BaseModule): | |
| """Central AI brain with tool-using agent loop and sub-agent coordination.""" | |
| def __init__(self, event_bus, config): | |
| super().__init__(event_bus, config) | |
| self.agents: List[SubAgent] = [] | |
| self.agent_routes: Dict[str, str] = {} | |
| for raw in config.get("agents", "searcher,vision,avatar,ops").split(","): | |
| name = raw.strip() | |
| if not name: | |
| continue | |
| self.agents.append(SubAgent(name=name, role=f"{name} agent")) | |
| for raw in config.get( | |
| "agent_routes", "searcher=tools.search,vision=tools.vision,avatar=avatar.control,ops=ops.command" | |
| ).split(","): | |
| if "=" not in raw: | |
| continue | |
| name, route = raw.split("=", 1) | |
| self.agent_routes[name.strip()] = route.strip() | |
| class Module(BaseModule): | |
| """Central AI brain with tool-using agent loop.""" | |
| async def start(self) -> None: | |
| await self.event_bus.publish( | |
| "system.log", | |
| {"message": "CORE.XDA started", "module": self.metadata.name}, | |
| ) | |
| async def stop(self) -> None: | |
| await self.event_bus.publish( | |
| "system.log", | |
| {"message": "CORE.XDA stopped", "module": self.metadata.name}, | |
| ) | |
| async def generate(self, prompt: str) -> str: | |
| await asyncio.sleep(0.1) | |
| return f"[JenaAI] {prompt}" | |
| async def run_task(self, task: str) -> dict: | |
| """Dispatch work to sub-agents via the event bus.""" | |
| plan = [] | |
| for agent in self.agents: | |
| route = self.agent_routes.get(agent.name, "agent.task") | |
| plan.append({"agent": agent.name, "role": agent.role, "task": task}) | |
| await self.event_bus.publish( | |
| route, | |
| { | |
| "agent": agent.name, | |
| "role": agent.role, | |
| "task": task, | |
| "route": route, | |
| "module": self.metadata.name, | |
| }, | |
| ) | |
| return {"task": task, "plan": plan} | |
| async def health_check(self) -> bool: | |
| return True | |