FloorManager / src /convener.py
BolyosCsaba
implement OFP 1.1.0 sequential floor routing with default room
b61c4de
"""
Convener Manager - Sequential floor routing per OFP 1.1.0
Routes user utterances to agents sequentially, providing each agent with
context from prior agents' responses in the same conversation turn.
"""
import requests
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from .agent_manager import AgentInfo
from .utils.config import settings
logger = logging.getLogger(__name__)
class ConvenerManager:
"""
Manages sequential agent engagement per OFP 1.1.0 floor protocol.
Instead of broadcasting to all agents simultaneously, this class:
1. Scores agents by relevance to the query
2. Engages them one at a time in score order
3. Passes each agent the responses from prior agents as context
"""
def __init__(self, floor_manager, floor_session):
self.floor_manager = floor_manager
self.floor_session = floor_session
def score_agent(self, query: str, agent_info: AgentInfo) -> float:
"""
Score an agent's relevance to a query.
Scoring:
- 2.0 pts per capability keyphrase found in query
- 0.3 pts per overlapping word in capability descriptions
- 0.2 pts per overlapping word in identification synopsis
"""
score = 0.0
query_lower = query.lower()
query_words = set(query_lower.split())
manifest = agent_info.manifest or {}
identification = manifest.get("identification", manifest)
capabilities = agent_info.capabilities or []
for cap in capabilities:
for phrase in cap.get("keyphrases", []):
if phrase.lower() in query_lower:
score += 2.0
for desc in cap.get("descriptions", []):
desc_words = set(desc.lower().split())
overlap = len(query_words & desc_words)
score += overlap * 0.3
synopsis = identification.get("synopsis", "")
if synopsis:
synopsis_words = set(synopsis.lower().split())
overlap = len(query_words & synopsis_words)
score += overlap * 0.2
return score
def select_agents(self, query: str) -> List[str]:
"""Return all agent URIs sorted by relevance score (descending)."""
scored = [
(uri, self.score_agent(query, agent_info))
for uri, agent_info in self.floor_session.participants.items()
]
scored.sort(key=lambda x: x[1], reverse=True)
return [uri for uri, _ in scored]
async def route_and_engage(self, user_uri: str, message: str) -> List[Dict]:
"""
Sequentially engage all agents, passing prior responses as context.
Returns list of agent response dicts with keys:
agent_uri, agent_name, message, html_content, timestamp
"""
agent_uris = self.select_agents(message)
if not agent_uris:
logger.info("No agents to route to")
return []
logger.info(f"Routing message to {len(agent_uris)} agent(s) sequentially")
prior_responses: List[Dict] = []
all_responses: List[Dict] = []
for uri in agent_uris:
agent_info = self.floor_session.participants.get(uri)
if not agent_info:
continue
response = await self._send_grant_envelope(
agent_info, user_uri, message, prior_responses
)
if response:
all_responses.append(response)
prior_responses.append(response)
return all_responses
async def _send_grant_envelope(
self,
agent_info: AgentInfo,
user_uri: str,
message: str,
prior_responses: List[Dict],
) -> Optional[Dict]:
"""Build and POST a grantFloor+utterance envelope to a single agent."""
import json
envelope = self._build_grant_envelope(agent_info, user_uri, message, prior_responses)
print(f"\n{'='*80}")
print(f"📤 CONVENER → {agent_info.speaker_uri}")
print(f"{'='*80}")
print(json.dumps(envelope, indent=2))
print(f"{'='*80}\n")
try:
response = requests.post(
agent_info.service_url,
json=envelope,
headers={
"Content-Type": "application/json",
"User-Agent": "OFP-FloorManager/1.1.0",
},
timeout=settings.MESSAGE_TIMEOUT,
)
response.raise_for_status()
response_data = response.json()
print(f"\n{'='*80}")
print(f"📥 CONVENER ← {agent_info.speaker_uri}")
print(json.dumps(response_data, indent=2))
print(f"{'='*80}\n")
return self._parse_agent_response(response_data, agent_info)
except requests.exceptions.Timeout:
logger.error(f"Timeout sending to {agent_info.speaker_uri}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request error sending to {agent_info.speaker_uri}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error sending to {agent_info.speaker_uri}: {e}")
return None
def _build_grant_envelope(
self,
agent_info: AgentInfo,
user_uri: str,
message: str,
prior_responses: List[Dict],
) -> Dict:
"""Build an OFP 1.1.0 grantFloor + private utterance envelope."""
session = self.floor_session
if prior_responses:
context_lines = ["[Context from this conversation turn:"]
for resp in prior_responses:
agent_name = resp.get("agent_name", resp.get("agent_uri", "Agent"))
text = resp.get("message", "")
if text:
context_lines.append(f'{agent_name}: "{text}"')
context_lines.append("]")
context_lines.append(f'User: "{message}"')
context_text = "\n".join(context_lines)
else:
context_text = message
return {
"openFloor": {
"schema": {"version": "1.1.0"},
"conversation": {
"id": session.session_id,
"floorGranted": [agent_info.speaker_uri],
"conversants": self._get_conversants_list(),
},
"sender": {
"speakerUri": session.floor_manager_uri,
"serviceUrl": session.floor_manager_url,
},
"events": [
{
"eventType": "grantFloor",
"to": {"speakerUri": agent_info.speaker_uri},
},
{
"eventType": "utterance",
"to": {
"speakerUri": agent_info.speaker_uri,
"serviceUrl": agent_info.service_url,
"private": True,
},
"parameters": {
"dialogEvent": {
"speakerUri": user_uri,
"span": {
"startTime": datetime.now(timezone.utc)
.isoformat()
.replace("+00:00", "Z")
},
"features": {
"text": {
"mimeType": "text/plain",
"tokens": [{"value": context_text}],
}
},
}
},
},
],
}
}
def _parse_agent_response(
self, response_data: Dict, agent_info: AgentInfo
) -> Optional[Dict]:
"""
Parse an agent's response envelope.
Extracts utterance text/HTML and checks for yieldFloor.
Returns None if no utterance content found.
"""
if "openFloor" not in response_data:
logger.warning(f"Response from {agent_info.speaker_uri} missing openFloor")
return None
events = response_data["openFloor"].get("events", [])
text_content: Optional[str] = None
html_content: Optional[str] = None
for event in events:
event_type = event.get("eventType")
if event_type == "yieldFloor":
logger.info(f"Agent {agent_info.speaker_uri} yielded the floor (advisory)")
continue
if event_type == "utterance":
params = event.get("parameters", {})
dialog_event = params.get("dialogEvent", {})
features = dialog_event.get("features", {})
text_feature = features.get("text", {})
tokens = text_feature.get("tokens", [])
if tokens:
text_content = " ".join(
t.get("value", t.get("token", "")) for t in tokens
)
html_feature = features.get("html", {})
html_tokens = html_feature.get("tokens", [])
if html_tokens:
html_content = "".join(t.get("value", "") for t in html_tokens)
if not text_content and not html_content:
logger.info(f"No utterance content in response from {agent_info.speaker_uri}")
return None
return {
"agent_uri": agent_info.speaker_uri,
"agent_name": agent_info.display_name,
"message": text_content or "",
"html_content": html_content,
"timestamp": datetime.now(),
}
def _get_conversants_list(self) -> List[Dict]:
"""Build partial manifests of all session participants for the envelope."""
conversants = []
for uri, agent_info in self.floor_session.participants.items():
manifest = agent_info.manifest or {}
identification = manifest.get("identification", manifest)
conversant: Dict[str, Any] = {
"identification": {
"speakerUri": uri,
"serviceUrl": agent_info.service_url,
"conversationalName": identification.get(
"conversationalName", uri.split(":")[-1]
),
}
}
if agent_info.capabilities:
conversant["capabilities"] = agent_info.capabilities
conversants.append(conversant)
return conversants