Spaces:
Sleeping
Sleeping
| """OFP Envelope handling""" | |
| from typing import Any, Dict, List, Optional | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime | |
| from ..utils.helpers import generate_message_id, get_timestamp | |
| from ..utils.config import settings | |
| class Envelope: | |
| """ | |
| OpenFloor Protocol Envelope structure | |
| Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md | |
| """ | |
| version: str | |
| sender: str | |
| recipients: List[str] | |
| message: Dict[str, Any] | |
| timestamp: str | |
| message_id: str | |
| correlation_id: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert envelope to dictionary""" | |
| data = asdict(self) | |
| # Remove None values | |
| return {k: v for k, v in data.items() if v is not None} | |
| def from_dict(cls, data: Dict[str, Any]) -> "Envelope": | |
| """Create envelope from dictionary""" | |
| return cls( | |
| version=data.get("version", settings.OFP_VERSION), | |
| sender=data["sender"], | |
| recipients=data["recipients"], | |
| message=data["message"], | |
| timestamp=data.get("timestamp", get_timestamp()), | |
| message_id=data.get("message_id", generate_message_id()), | |
| correlation_id=data.get("correlation_id") | |
| ) | |
| def create_envelope( | |
| sender: str, | |
| recipients: List[str], | |
| message: Dict[str, Any], | |
| correlation_id: Optional[str] = None | |
| ) -> Envelope: | |
| """ | |
| Create a new OFP envelope | |
| Args: | |
| sender: Agent ID of sender | |
| recipients: List of recipient agent IDs or ["broadcast"] | |
| message: Message content (InterAgentMessage) | |
| correlation_id: Optional correlation ID for request-response | |
| Returns: | |
| Envelope object | |
| """ | |
| return Envelope( | |
| version=settings.OFP_VERSION, | |
| sender=sender, | |
| recipients=recipients, | |
| message=message, | |
| timestamp=get_timestamp(), | |
| message_id=generate_message_id(), | |
| correlation_id=correlation_id | |
| ) | |
| def create_inter_agent_message( | |
| message_type: str, | |
| content: Any, | |
| dialog_event: Optional[Dict[str, Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create InterAgentMessage structure | |
| Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md | |
| Args: | |
| message_type: Type of message (e.g., "agent_response", "floor_request") | |
| content: Message content (can be text, JSON, etc.) | |
| dialog_event: Optional DialogEventObject | |
| meta Optional metadata | |
| Returns: | |
| InterAgentMessage dictionary | |
| """ | |
| message = { | |
| "message_type": message_type, | |
| "content": content, | |
| "timestamp": get_timestamp() | |
| } | |
| if dialog_event: | |
| message["dialog_event"] = dialog_event | |
| if metadata: | |
| message["metadata"] = metadata | |
| return message | |