Spaces:
Sleeping
Sleeping
| """ | |
| Convener Manager - Sequential floor routing per OFP 1.1.0 | |
| Routes user utterances to agents sequentially, providing each agent with | |
| context from prior agents' responses in the same conversation turn. | |
| """ | |
| import requests | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timezone | |
| from .agent_manager import AgentInfo | |
| from .utils.config import settings | |
| logger = logging.getLogger(__name__) | |
| class ConvenerManager: | |
| """ | |
| Manages sequential agent engagement per OFP 1.1.0 floor protocol. | |
| Instead of broadcasting to all agents simultaneously, this class: | |
| 1. Scores agents by relevance to the query | |
| 2. Engages them one at a time in score order | |
| 3. Passes each agent the responses from prior agents as context | |
| """ | |
| def __init__(self, floor_manager, floor_session): | |
| self.floor_manager = floor_manager | |
| self.floor_session = floor_session | |
| def score_agent(self, query: str, agent_info: AgentInfo) -> float: | |
| """ | |
| Score an agent's relevance to a query. | |
| Scoring: | |
| - 2.0 pts per capability keyphrase found in query | |
| - 0.3 pts per overlapping word in capability descriptions | |
| - 0.2 pts per overlapping word in identification synopsis | |
| """ | |
| score = 0.0 | |
| query_lower = query.lower() | |
| query_words = set(query_lower.split()) | |
| manifest = agent_info.manifest or {} | |
| identification = manifest.get("identification", manifest) | |
| capabilities = agent_info.capabilities or [] | |
| for cap in capabilities: | |
| for phrase in cap.get("keyphrases", []): | |
| if phrase.lower() in query_lower: | |
| score += 2.0 | |
| for desc in cap.get("descriptions", []): | |
| desc_words = set(desc.lower().split()) | |
| overlap = len(query_words & desc_words) | |
| score += overlap * 0.3 | |
| synopsis = identification.get("synopsis", "") | |
| if synopsis: | |
| synopsis_words = set(synopsis.lower().split()) | |
| overlap = len(query_words & synopsis_words) | |
| score += overlap * 0.2 | |
| return score | |
| def select_agents(self, query: str) -> List[str]: | |
| """Return all agent URIs sorted by relevance score (descending).""" | |
| scored = [ | |
| (uri, self.score_agent(query, agent_info)) | |
| for uri, agent_info in self.floor_session.participants.items() | |
| ] | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| return [uri for uri, _ in scored] | |
| async def route_and_engage(self, user_uri: str, message: str) -> List[Dict]: | |
| """ | |
| Sequentially engage all agents, passing prior responses as context. | |
| Returns list of agent response dicts with keys: | |
| agent_uri, agent_name, message, html_content, timestamp | |
| """ | |
| agent_uris = self.select_agents(message) | |
| if not agent_uris: | |
| logger.info("No agents to route to") | |
| return [] | |
| logger.info(f"Routing message to {len(agent_uris)} agent(s) sequentially") | |
| prior_responses: List[Dict] = [] | |
| all_responses: List[Dict] = [] | |
| for uri in agent_uris: | |
| agent_info = self.floor_session.participants.get(uri) | |
| if not agent_info: | |
| continue | |
| response = await self._send_grant_envelope( | |
| agent_info, user_uri, message, prior_responses | |
| ) | |
| if response: | |
| all_responses.append(response) | |
| prior_responses.append(response) | |
| return all_responses | |
| async def _send_grant_envelope( | |
| self, | |
| agent_info: AgentInfo, | |
| user_uri: str, | |
| message: str, | |
| prior_responses: List[Dict], | |
| ) -> Optional[Dict]: | |
| """Build and POST a grantFloor+utterance envelope to a single agent.""" | |
| import json | |
| envelope = self._build_grant_envelope(agent_info, user_uri, message, prior_responses) | |
| print(f"\n{'='*80}") | |
| print(f"📤 CONVENER → {agent_info.speaker_uri}") | |
| print(f"{'='*80}") | |
| print(json.dumps(envelope, indent=2)) | |
| print(f"{'='*80}\n") | |
| try: | |
| response = requests.post( | |
| agent_info.service_url, | |
| json=envelope, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "User-Agent": "OFP-FloorManager/1.1.0", | |
| }, | |
| timeout=settings.MESSAGE_TIMEOUT, | |
| ) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| print(f"\n{'='*80}") | |
| print(f"📥 CONVENER ← {agent_info.speaker_uri}") | |
| print(json.dumps(response_data, indent=2)) | |
| print(f"{'='*80}\n") | |
| return self._parse_agent_response(response_data, agent_info) | |
| except requests.exceptions.Timeout: | |
| logger.error(f"Timeout sending to {agent_info.speaker_uri}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request error sending to {agent_info.speaker_uri}: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error sending to {agent_info.speaker_uri}: {e}") | |
| return None | |
| def _build_grant_envelope( | |
| self, | |
| agent_info: AgentInfo, | |
| user_uri: str, | |
| message: str, | |
| prior_responses: List[Dict], | |
| ) -> Dict: | |
| """Build an OFP 1.1.0 grantFloor + private utterance envelope.""" | |
| session = self.floor_session | |
| if prior_responses: | |
| context_lines = ["[Context from this conversation turn:"] | |
| for resp in prior_responses: | |
| agent_name = resp.get("agent_name", resp.get("agent_uri", "Agent")) | |
| text = resp.get("message", "") | |
| if text: | |
| context_lines.append(f'{agent_name}: "{text}"') | |
| context_lines.append("]") | |
| context_lines.append(f'User: "{message}"') | |
| context_text = "\n".join(context_lines) | |
| else: | |
| context_text = message | |
| return { | |
| "openFloor": { | |
| "schema": {"version": "1.1.0"}, | |
| "conversation": { | |
| "id": session.session_id, | |
| "floorGranted": [agent_info.speaker_uri], | |
| "conversants": self._get_conversants_list(), | |
| }, | |
| "sender": { | |
| "speakerUri": session.floor_manager_uri, | |
| "serviceUrl": session.floor_manager_url, | |
| }, | |
| "events": [ | |
| { | |
| "eventType": "grantFloor", | |
| "to": {"speakerUri": agent_info.speaker_uri}, | |
| }, | |
| { | |
| "eventType": "utterance", | |
| "to": { | |
| "speakerUri": agent_info.speaker_uri, | |
| "serviceUrl": agent_info.service_url, | |
| "private": True, | |
| }, | |
| "parameters": { | |
| "dialogEvent": { | |
| "speakerUri": user_uri, | |
| "span": { | |
| "startTime": datetime.now(timezone.utc) | |
| .isoformat() | |
| .replace("+00:00", "Z") | |
| }, | |
| "features": { | |
| "text": { | |
| "mimeType": "text/plain", | |
| "tokens": [{"value": context_text}], | |
| } | |
| }, | |
| } | |
| }, | |
| }, | |
| ], | |
| } | |
| } | |
| def _parse_agent_response( | |
| self, response_data: Dict, agent_info: AgentInfo | |
| ) -> Optional[Dict]: | |
| """ | |
| Parse an agent's response envelope. | |
| Extracts utterance text/HTML and checks for yieldFloor. | |
| Returns None if no utterance content found. | |
| """ | |
| if "openFloor" not in response_data: | |
| logger.warning(f"Response from {agent_info.speaker_uri} missing openFloor") | |
| return None | |
| events = response_data["openFloor"].get("events", []) | |
| text_content: Optional[str] = None | |
| html_content: Optional[str] = None | |
| for event in events: | |
| event_type = event.get("eventType") | |
| if event_type == "yieldFloor": | |
| logger.info(f"Agent {agent_info.speaker_uri} yielded the floor (advisory)") | |
| continue | |
| if event_type == "utterance": | |
| params = event.get("parameters", {}) | |
| dialog_event = params.get("dialogEvent", {}) | |
| features = dialog_event.get("features", {}) | |
| text_feature = features.get("text", {}) | |
| tokens = text_feature.get("tokens", []) | |
| if tokens: | |
| text_content = " ".join( | |
| t.get("value", t.get("token", "")) for t in tokens | |
| ) | |
| html_feature = features.get("html", {}) | |
| html_tokens = html_feature.get("tokens", []) | |
| if html_tokens: | |
| html_content = "".join(t.get("value", "") for t in html_tokens) | |
| if not text_content and not html_content: | |
| logger.info(f"No utterance content in response from {agent_info.speaker_uri}") | |
| return None | |
| return { | |
| "agent_uri": agent_info.speaker_uri, | |
| "agent_name": agent_info.display_name, | |
| "message": text_content or "", | |
| "html_content": html_content, | |
| "timestamp": datetime.now(), | |
| } | |
| def _get_conversants_list(self) -> List[Dict]: | |
| """Build partial manifests of all session participants for the envelope.""" | |
| conversants = [] | |
| for uri, agent_info in self.floor_session.participants.items(): | |
| manifest = agent_info.manifest or {} | |
| identification = manifest.get("identification", manifest) | |
| conversant: Dict[str, Any] = { | |
| "identification": { | |
| "speakerUri": uri, | |
| "serviceUrl": agent_info.service_url, | |
| "conversationalName": identification.get( | |
| "conversationalName", uri.split(":")[-1] | |
| ), | |
| } | |
| } | |
| if agent_info.capabilities: | |
| conversant["capabilities"] = agent_info.capabilities | |
| conversants.append(conversant) | |
| return conversants | |