File size: 2,956 Bytes
148a4a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""OFP Envelope handling"""

from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime

from ..utils.helpers import generate_message_id, get_timestamp
from ..utils.config import settings


@dataclass
class Envelope:
    """
    OpenFloor Protocol Envelope structure
    Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md
    """
    version: str
    sender: str
    recipients: List[str]
    message: Dict[str, Any]
    timestamp: str
    message_id: str
    correlation_id: Optional[str] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert envelope to dictionary"""
        data = asdict(self)
        # Remove None values
        return {k: v for k, v in data.items() if v is not None}
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Envelope":
        """Create envelope from dictionary"""
        return cls(
            version=data.get("version", settings.OFP_VERSION),
            sender=data["sender"],
            recipients=data["recipients"],
            message=data["message"],
            timestamp=data.get("timestamp", get_timestamp()),
            message_id=data.get("message_id", generate_message_id()),
            correlation_id=data.get("correlation_id")
        )


def create_envelope(
    sender: str,
    recipients: List[str],
    message: Dict[str, Any],
    correlation_id: Optional[str] = None
) -> Envelope:
    """
    Create a new OFP envelope
    
    Args:
        sender: Agent ID of sender
        recipients: List of recipient agent IDs or ["broadcast"]
        message: Message content (InterAgentMessage)
        correlation_id: Optional correlation ID for request-response
    
    Returns:
        Envelope object
    """
    return Envelope(
        version=settings.OFP_VERSION,
        sender=sender,
        recipients=recipients,
        message=message,
        timestamp=get_timestamp(),
        message_id=generate_message_id(),
        correlation_id=correlation_id
    )


def create_inter_agent_message(
    message_type: str,
    content: Any,
    dialog_event: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Create InterAgentMessage structure
    Based on: https://openfloor.dev/protocol/specifications/inter-agent-message.md
    
    Args:
        message_type: Type of message (e.g., "agent_response", "floor_request")
        content: Message content (can be text, JSON, etc.)
        dialog_event: Optional DialogEventObject
        meta Optional metadata
    
    Returns:
        InterAgentMessage dictionary
    """
    message = {
        "message_type": message_type,
        "content": content,
        "timestamp": get_timestamp()
    }
    
    if dialog_event:
        message["dialog_event"] = dialog_event
    
    if metadata:
        message["metadata"] = metadata
    
    return message