Spaces:
Sleeping
Sleeping
File size: 2,956 Bytes
148a4a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
"""OFP Envelope handling"""
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from ..utils.helpers import generate_message_id, get_timestamp
from ..utils.config import settings
@dataclass
class Envelope:
"""
OpenFloor Protocol Envelope structure
Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md
"""
version: str
sender: str
recipients: List[str]
message: Dict[str, Any]
timestamp: str
message_id: str
correlation_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert envelope to dictionary"""
data = asdict(self)
# Remove None values
return {k: v for k, v in data.items() if v is not None}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Envelope":
"""Create envelope from dictionary"""
return cls(
version=data.get("version", settings.OFP_VERSION),
sender=data["sender"],
recipients=data["recipients"],
message=data["message"],
timestamp=data.get("timestamp", get_timestamp()),
message_id=data.get("message_id", generate_message_id()),
correlation_id=data.get("correlation_id")
)
def create_envelope(
sender: str,
recipients: List[str],
message: Dict[str, Any],
correlation_id: Optional[str] = None
) -> Envelope:
"""
Create a new OFP envelope
Args:
sender: Agent ID of sender
recipients: List of recipient agent IDs or ["broadcast"]
message: Message content (InterAgentMessage)
correlation_id: Optional correlation ID for request-response
Returns:
Envelope object
"""
return Envelope(
version=settings.OFP_VERSION,
sender=sender,
recipients=recipients,
message=message,
timestamp=get_timestamp(),
message_id=generate_message_id(),
correlation_id=correlation_id
)
def create_inter_agent_message(
message_type: str,
content: Any,
dialog_event: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create InterAgentMessage structure
Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md
Args:
message_type: Type of message (e.g., "agent_response", "floor_request")
content: Message content (can be text, JSON, etc.)
dialog_event: Optional DialogEventObject
meta Optional metadata
Returns:
InterAgentMessage dictionary
"""
message = {
"message_type": message_type,
"content": content,
"timestamp": get_timestamp()
}
if dialog_event:
message["dialog_event"] = dialog_event
if metadata:
message["metadata"] = metadata
return message
|