Spaces:
Sleeping
Sleeping
File size: 4,688 Bytes
148a4a7 5fae0de 148a4a7 5fae0de 148a4a7 5fae0de 148a4a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
"""OFP Dialog Event Objects"""
from typing import Any, Dict, Optional
from ..utils.helpers import get_timestamp
def create_dialog_event(
event_type: str,
event_data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a DialogEventObject
Based on: https://openfloor.dev/protocol/specifications/dialog-event-object.md
Args:
event_type: Type of event (e.g., "GRANT_FLOOR", "GET_MANIFEST")
event_ Event-specific data
meta Optional metadata
Returns:
DialogEventObject dictionary
"""
event = {
"event_type": event_type,
"timestamp": get_timestamp(),
"event_data": event_data
}
if metadata:
event["metadata"] = metadata
return event
def create_get_manifest_event(
agent_id: str,
requested_by: str = "floor_manager"
) -> Dict[str, Any]:
"""
Create GET_MANIFEST event
Args:
agent_id: ID of agent to get manifest from
requested_by: Who is requesting the manifest
Returns:
DialogEventObject for GET_MANIFEST
"""
return create_dialog_event(
event_type="GET_MANIFEST",
event_data={
"agent_id": agent_id,
"requested_by": requested_by
}
)
def create_grant_floor_event(
agent_id: str,
granted_by: str = "convener",
duration: Optional[int] = None
) -> Dict[str, Any]:
"""
Create GRANT_FLOOR event
Args:
agent_id: ID of agent receiving floor
granted_by: Who granted the floor
duration: Optional floor duration in seconds
Returns:
DialogEventObject for GRANT_FLOOR
"""
event_data = {
"agent_id": agent_id,
"granted_by": granted_by
}
if duration:
event_data["duration"] = duration
return create_dialog_event(
event_type="GRANT_FLOOR",
event_data=event_data
)
def create_revoke_floor_event(
agent_id: str,
revoked_by: str = "convener",
reason: str = "floor_expired"
) -> Dict[str, Any]:
"""
Create REVOKE_FLOOR event
Args:
agent_id: ID of agent losing floor
revoked_by: Who revoked the floor
reason: Reason for revocation
Returns:
DialogEventObject for REVOKE_FLOOR
"""
return create_dialog_event(
event_type="REVOKE_FLOOR",
event_data={
"agent_id": agent_id,
"revoked_by": revoked_by,
"reason": reason
}
)
def create_invite_event(
session_id: str,
inviter: str = "floor_manager",
join_url: Optional[str] = None
) -> Dict[str, Any]:
"""
Create INVITE event
Args:
session_id: Session to join
inviter: Who is sending the invite
join_url: Optional URL to join session
Returns:
DialogEventObject for INVITE
"""
event_data = {
"session_id": session_id,
"inviter": inviter
}
if join_url:
event_data["join_url"] = join_url
return create_dialog_event(
event_type="INVITE",
event_data=event_data
)
def create_floor_request_event(
agent_id: str,
priority: int = 0
) -> Dict[str, Any]:
"""
Create FLOOR_REQUEST event
Args:
agent_id: ID of agent requesting floor
priority: Request priority (higher = more urgent)
Returns:
DialogEventObject for FLOOR_REQUEST
"""
return create_dialog_event(
event_type="FLOOR_REQUEST",
event_data={
"agent_id": agent_id,
"priority": priority
}
)
def create_agent_joined_event(
agent_id: str,
agent_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Create AGENT_JOINED event
Args:
agent_id: ID of agent that joined
agent_name: Optional agent name
Returns:
DialogEventObject for AGENT_JOINED
"""
event_data = {"agent_id": agent_id}
if agent_name:
event_data["agent_name"] = agent_name
return create_dialog_event(
event_type="AGENT_JOINED",
event_data=event_data
)
def create_agent_left_event(
agent_id: str,
reason: str = "disconnected"
) -> Dict[str, Any]:
"""
Create AGENT_LEFT event
Args:
agent_id: ID of agent that left
reason: Reason for leaving
Returns:
DialogEventObject for AGENT_LEFT
"""
return create_dialog_event(
event_type="AGENT_LEFT",
event_data={
"agent_id": agent_id,
"reason": reason
}
)
|