FloorManager / src /agent_manager.py
BolyosCsaba
ofp agent issue fixed
4d7b8a6
"""
Agent Manager for OpenFloor Protocol
Handles agent discovery, invitation, and manifest management
"""
import requests
import logging
from typing import Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
from .protocol.envelope import create_envelope, create_inter_agent_message
from .protocol.events import create_invite_event, create_get_manifest_event
from .utils.helpers import generate_message_id
from .utils.config import settings
logger = logging.getLogger(__name__)
@dataclass
class AgentInfo:
"""Information about an agent"""
speaker_uri: str
service_url: str
manifest: Optional[Dict[str, Any]] = None
conversation_id: Optional[str] = None
invited_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"speaker_uri": self.speaker_uri,
"service_url": self.service_url,
"manifest": self.manifest,
"conversation_id": self.conversation_id,
"invited_at": self.invited_at.isoformat() if self.invited_at else None
}
class AgentManager:
"""
Manages agent discovery and invitation for OpenFloor Protocol
Based on OFP 1.0.0 specification:
- getManifests event: Discover agent capabilities
- invite event: Invite agents to join conversations
"""
def __init__(self, floor_manager_uri: str, floor_manager_url: str):
"""
Initialize Agent Manager
Args:
floor_manager_uri: Speaker URI of the floor manager
floor_manager_url: Service URL of the floor manager
"""
self.floor_manager_uri = floor_manager_uri
self.floor_manager_url = floor_manager_url
self.agents: Dict[str, AgentInfo] = {}
logger.info(f"Agent Manager initialized: {floor_manager_uri}")
async def get_agent_manifest(
self,
agent_service_url: str,
timeout: int = 10
) -> Optional[Dict[str, Any]]:
"""
Get an agent's manifest by sending getManifests event
Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-117-getmanifests-event
Args:
agent_service_url: URL of the agent's service
timeout: Request timeout in seconds
Returns:
Agent manifest dictionary or None if failed
"""
try:
logger.info(f"Requesting manifest from agent: {agent_service_url}")
# Create getManifests event envelope
conversation_id = f"manifest_request_{generate_message_id()}"
# Create OFP envelope with getManifests event
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": conversation_id
},
"sender": {
"speakerUri": self.floor_manager_uri,
"serviceUrl": self.floor_manager_url
},
"events": [
{
"eventType": "getManifests"
}
]
}
}
# Send HTTPS POST request to agent
response = requests.post(
agent_service_url,
json=envelope_dict,
headers={
'Content-Type': 'application/json',
'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}'
},
timeout=timeout
)
response.raise_for_status()
# Parse response - should contain publishManifests or publishManifest event
response_data = response.json()
if "openFloor" in response_data:
events = response_data["openFloor"].get("events", [])
for event in events:
event_type = event.get("eventType")
# Handle both publishManifests (plural) and publishManifest (singular)
if event_type in ["publishManifests", "publishManifest"]:
parameters = event.get("parameters", {})
# Try direct manifest field
manifest = parameters.get("manifest")
if manifest:
logger.info(f"βœ“ Received manifest from {agent_service_url}")
return manifest
# Try servicingManifests array (OFP spec format)
servicing_manifests = parameters.get("servicingManifests", [])
if servicing_manifests and len(servicing_manifests) > 0:
# Get the first manifest and extract identification
first_manifest = servicing_manifests[0]
identification = first_manifest.get("identification", {})
if identification:
logger.info(f"βœ“ Received manifest from {agent_service_url}")
# Return the identification as the manifest
return identification
logger.warning(f"No manifest found in response from {agent_service_url}")
return None
except requests.exceptions.Timeout:
logger.error(f"βœ— Timeout getting manifest from {agent_service_url}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"βœ— Failed to get manifest from {agent_service_url}: {e}")
return None
except Exception as e:
logger.error(f"βœ— Unexpected error getting manifest: {e}")
return None
async def invite_agent(
self,
agent_speaker_uri: str,
agent_service_url: str,
conversation_id: str,
timeout: int = 10
) -> bool:
"""
Invite an agent to join a conversation
Per OFP spec: https://openfloor.dev/protocol/specifications/inter-agent-message#h2-113-invite-event
Args:
agent_speaker_uri: Speaker URI of the agent to invite
agent_service_url: Service URL of the agent
conversation_id: Conversation ID to invite agent to
timeout: Request timeout in seconds
Returns:
True if invite was sent successfully, False otherwise
"""
try:
logger.info(f"Inviting agent {agent_speaker_uri} to conversation {conversation_id}")
# Create OFP envelope with invite event
envelope_dict = {
"openFloor": {
"schema": {
"version": settings.OFP_VERSION
},
"conversation": {
"id": conversation_id
},
"sender": {
"speakerUri": self.floor_manager_uri,
"serviceUrl": self.floor_manager_url
},
"events": [
{
"eventType": "invite",
"to": {
"serviceUrl": agent_service_url,
"speakerUri": agent_speaker_uri
}
}
]
}
}
# Send HTTPS POST request to agent
response = requests.post(
agent_service_url,
json=envelope_dict,
headers={
'Content-Type': 'application/json',
'User-Agent': f'OFP-FloorManager/{settings.OFP_VERSION}'
},
timeout=timeout
)
response.raise_for_status()
# Store agent info
agent_info = AgentInfo(
speaker_uri=agent_speaker_uri,
service_url=agent_service_url,
conversation_id=conversation_id,
invited_at=datetime.now()
)
self.agents[agent_speaker_uri] = agent_info
logger.info(f"βœ“ Successfully invited agent {agent_speaker_uri}")
return True
except requests.exceptions.Timeout:
logger.error(f"βœ— Timeout inviting agent {agent_speaker_uri}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"βœ— Failed to invite agent {agent_speaker_uri}: {e}")
return False
except Exception as e:
logger.error(f"βœ— Unexpected error inviting agent: {e}")
return False
async def add_agent(
self,
agent_service_url: str,
conversation_id: str,
timeout: int = 10,
session_participants: Optional[Dict[str, AgentInfo]] = None
) -> Optional[AgentInfo]:
"""
Add an agent by getting their manifest and inviting them to a conversation
This is a convenience method that combines:
1. Getting the agent's manifest (to discover their speaker URI)
2. Inviting the agent to the conversation
3. Optionally adding to session participants
Args:
agent_service_url: Service URL of the agent
conversation_id: Conversation ID to invite agent to
timeout: Request timeout in seconds
session_participants: Optional dict to add the agent to
Returns:
AgentInfo object if successful, None otherwise
"""
try:
# Step 1: Get agent's manifest
manifest = await self.get_agent_manifest(agent_service_url, timeout)
if not manifest:
logger.error(f"Failed to get manifest for agent: {agent_service_url}")
return None
# Extract speaker URI from manifest
agent_speaker_uri = manifest.get("speakerUri")
if not agent_speaker_uri:
logger.error(f"Manifest missing speakerUri: {agent_service_url}")
return None
# Step 2: Invite the agent
success = await self.invite_agent(
agent_speaker_uri,
agent_service_url,
conversation_id,
timeout
)
if not success:
logger.error(f"Failed to invite agent: {agent_speaker_uri}")
return None
# Update agent info with manifest
agent_info = self.agents[agent_speaker_uri]
agent_info.manifest = manifest
# Add to session participants if provided
if session_participants is not None:
session_participants[agent_speaker_uri] = agent_info
logger.info(f"Added {agent_speaker_uri} to session participants")
logger.info(f"βœ“ Successfully added agent {agent_speaker_uri} to conversation {conversation_id}")
return agent_info
except Exception as e:
logger.error(f"βœ— Error adding agent: {e}")
return None
def get_agent_info(self, agent_speaker_uri: str) -> Optional[AgentInfo]:
"""Get information about an agent"""
return self.agents.get(agent_speaker_uri)
def list_agents(self) -> Dict[str, AgentInfo]:
"""List all known agents"""
return self.agents.copy()
def remove_agent(self, agent_speaker_uri: str) -> bool:
"""Remove an agent from the manager"""
if agent_speaker_uri in self.agents:
del self.agents[agent_speaker_uri]
logger.info(f"Removed agent: {agent_speaker_uri}")
return True
return False