""" Convener Manager - Sequential floor routing per OFP 1.1.0 Routes user utterances to agents sequentially, providing each agent with context from prior agents' responses in the same conversation turn. """ import requests import logging from typing import Dict, List, Optional, Any from datetime import datetime, timezone from .agent_manager import AgentInfo from .utils.config import settings logger = logging.getLogger(__name__) class ConvenerManager: """ Manages sequential agent engagement per OFP 1.1.0 floor protocol. Instead of broadcasting to all agents simultaneously, this class: 1. Scores agents by relevance to the query 2. Engages them one at a time in score order 3. Passes each agent the responses from prior agents as context """ def __init__(self, floor_manager, floor_session): self.floor_manager = floor_manager self.floor_session = floor_session def score_agent(self, query: str, agent_info: AgentInfo) -> float: """ Score an agent's relevance to a query. Scoring: - 2.0 pts per capability keyphrase found in query - 0.3 pts per overlapping word in capability descriptions - 0.2 pts per overlapping word in identification synopsis """ score = 0.0 query_lower = query.lower() query_words = set(query_lower.split()) manifest = agent_info.manifest or {} identification = manifest.get("identification", manifest) capabilities = agent_info.capabilities or [] for cap in capabilities: for phrase in cap.get("keyphrases", []): if phrase.lower() in query_lower: score += 2.0 for desc in cap.get("descriptions", []): desc_words = set(desc.lower().split()) overlap = len(query_words & desc_words) score += overlap * 0.3 synopsis = identification.get("synopsis", "") if synopsis: synopsis_words = set(synopsis.lower().split()) overlap = len(query_words & synopsis_words) score += overlap * 0.2 return score def select_agents(self, query: str) -> List[str]: """Return all agent URIs sorted by relevance score (descending).""" scored = [ (uri, self.score_agent(query, agent_info)) for uri, agent_info in self.floor_session.participants.items() ] scored.sort(key=lambda x: x[1], reverse=True) return [uri for uri, _ in scored] async def route_and_engage(self, user_uri: str, message: str) -> List[Dict]: """ Sequentially engage all agents, passing prior responses as context. Returns list of agent response dicts with keys: agent_uri, agent_name, message, html_content, timestamp """ agent_uris = self.select_agents(message) if not agent_uris: logger.info("No agents to route to") return [] logger.info(f"Routing message to {len(agent_uris)} agent(s) sequentially") prior_responses: List[Dict] = [] all_responses: List[Dict] = [] for uri in agent_uris: agent_info = self.floor_session.participants.get(uri) if not agent_info: continue response = await self._send_grant_envelope( agent_info, user_uri, message, prior_responses ) if response: all_responses.append(response) prior_responses.append(response) return all_responses async def _send_grant_envelope( self, agent_info: AgentInfo, user_uri: str, message: str, prior_responses: List[Dict], ) -> Optional[Dict]: """Build and POST a grantFloor+utterance envelope to a single agent.""" import json envelope = self._build_grant_envelope(agent_info, user_uri, message, prior_responses) print(f"\n{'='*80}") print(f"📤 CONVENER → {agent_info.speaker_uri}") print(f"{'='*80}") print(json.dumps(envelope, indent=2)) print(f"{'='*80}\n") try: response = requests.post( agent_info.service_url, json=envelope, headers={ "Content-Type": "application/json", "User-Agent": "OFP-FloorManager/1.1.0", }, timeout=settings.MESSAGE_TIMEOUT, ) response.raise_for_status() response_data = response.json() print(f"\n{'='*80}") print(f"📥 CONVENER ← {agent_info.speaker_uri}") print(json.dumps(response_data, indent=2)) print(f"{'='*80}\n") return self._parse_agent_response(response_data, agent_info) except requests.exceptions.Timeout: logger.error(f"Timeout sending to {agent_info.speaker_uri}") return None except requests.exceptions.RequestException as e: logger.error(f"Request error sending to {agent_info.speaker_uri}: {e}") return None except Exception as e: logger.error(f"Unexpected error sending to {agent_info.speaker_uri}: {e}") return None def _build_grant_envelope( self, agent_info: AgentInfo, user_uri: str, message: str, prior_responses: List[Dict], ) -> Dict: """Build an OFP 1.1.0 grantFloor + private utterance envelope.""" session = self.floor_session if prior_responses: context_lines = ["[Context from this conversation turn:"] for resp in prior_responses: agent_name = resp.get("agent_name", resp.get("agent_uri", "Agent")) text = resp.get("message", "") if text: context_lines.append(f'{agent_name}: "{text}"') context_lines.append("]") context_lines.append(f'User: "{message}"') context_text = "\n".join(context_lines) else: context_text = message return { "openFloor": { "schema": {"version": "1.1.0"}, "conversation": { "id": session.session_id, "floorGranted": [agent_info.speaker_uri], "conversants": self._get_conversants_list(), }, "sender": { "speakerUri": session.floor_manager_uri, "serviceUrl": session.floor_manager_url, }, "events": [ { "eventType": "grantFloor", "to": {"speakerUri": agent_info.speaker_uri}, }, { "eventType": "utterance", "to": { "speakerUri": agent_info.speaker_uri, "serviceUrl": agent_info.service_url, "private": True, }, "parameters": { "dialogEvent": { "speakerUri": user_uri, "span": { "startTime": datetime.now(timezone.utc) .isoformat() .replace("+00:00", "Z") }, "features": { "text": { "mimeType": "text/plain", "tokens": [{"value": context_text}], } }, } }, }, ], } } def _parse_agent_response( self, response_data: Dict, agent_info: AgentInfo ) -> Optional[Dict]: """ Parse an agent's response envelope. Extracts utterance text/HTML and checks for yieldFloor. Returns None if no utterance content found. """ if "openFloor" not in response_data: logger.warning(f"Response from {agent_info.speaker_uri} missing openFloor") return None events = response_data["openFloor"].get("events", []) text_content: Optional[str] = None html_content: Optional[str] = None for event in events: event_type = event.get("eventType") if event_type == "yieldFloor": logger.info(f"Agent {agent_info.speaker_uri} yielded the floor (advisory)") continue if event_type == "utterance": params = event.get("parameters", {}) dialog_event = params.get("dialogEvent", {}) features = dialog_event.get("features", {}) text_feature = features.get("text", {}) tokens = text_feature.get("tokens", []) if tokens: text_content = " ".join( t.get("value", t.get("token", "")) for t in tokens ) html_feature = features.get("html", {}) html_tokens = html_feature.get("tokens", []) if html_tokens: html_content = "".join(t.get("value", "") for t in html_tokens) if not text_content and not html_content: logger.info(f"No utterance content in response from {agent_info.speaker_uri}") return None return { "agent_uri": agent_info.speaker_uri, "agent_name": agent_info.display_name, "message": text_content or "", "html_content": html_content, "timestamp": datetime.now(), } def _get_conversants_list(self) -> List[Dict]: """Build partial manifests of all session participants for the envelope.""" conversants = [] for uri, agent_info in self.floor_session.participants.items(): manifest = agent_info.manifest or {} identification = manifest.get("identification", manifest) conversant: Dict[str, Any] = { "identification": { "speakerUri": uri, "serviceUrl": agent_info.service_url, "conversationalName": identification.get( "conversationalName", uri.split(":")[-1] ), } } if agent_info.capabilities: conversant["capabilities"] = agent_info.capabilities conversants.append(conversant) return conversants