BolyosCsaba
updated to public
5fae0de
"""OFP Dialog Event Objects"""
from typing import Any, Dict, Optional
from ..utils.helpers import get_timestamp
def create_dialog_event(
event_type: str,
event_data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a DialogEventObject
Based on: https://openfloor.dev/protocol/specifications/dialog-event-object.md
Args:
event_type: Type of event (e.g., "GRANT_FLOOR", "GET_MANIFEST")
event_ Event-specific data
meta Optional metadata
Returns:
DialogEventObject dictionary
"""
event = {
"event_type": event_type,
"timestamp": get_timestamp(),
"event_data": event_data
}
if metadata:
event["metadata"] = metadata
return event
def create_get_manifest_event(
agent_id: str,
requested_by: str = "floor_manager"
) -> Dict[str, Any]:
"""
Create GET_MANIFEST event
Args:
agent_id: ID of agent to get manifest from
requested_by: Who is requesting the manifest
Returns:
DialogEventObject for GET_MANIFEST
"""
return create_dialog_event(
event_type="GET_MANIFEST",
event_data={
"agent_id": agent_id,
"requested_by": requested_by
}
)
def create_grant_floor_event(
agent_id: str,
granted_by: str = "convener",
duration: Optional[int] = None
) -> Dict[str, Any]:
"""
Create GRANT_FLOOR event
Args:
agent_id: ID of agent receiving floor
granted_by: Who granted the floor
duration: Optional floor duration in seconds
Returns:
DialogEventObject for GRANT_FLOOR
"""
event_data = {
"agent_id": agent_id,
"granted_by": granted_by
}
if duration:
event_data["duration"] = duration
return create_dialog_event(
event_type="GRANT_FLOOR",
event_data=event_data
)
def create_revoke_floor_event(
agent_id: str,
revoked_by: str = "convener",
reason: str = "floor_expired"
) -> Dict[str, Any]:
"""
Create REVOKE_FLOOR event
Args:
agent_id: ID of agent losing floor
revoked_by: Who revoked the floor
reason: Reason for revocation
Returns:
DialogEventObject for REVOKE_FLOOR
"""
return create_dialog_event(
event_type="REVOKE_FLOOR",
event_data={
"agent_id": agent_id,
"revoked_by": revoked_by,
"reason": reason
}
)
def create_invite_event(
session_id: str,
inviter: str = "floor_manager",
join_url: Optional[str] = None
) -> Dict[str, Any]:
"""
Create INVITE event
Args:
session_id: Session to join
inviter: Who is sending the invite
join_url: Optional URL to join session
Returns:
DialogEventObject for INVITE
"""
event_data = {
"session_id": session_id,
"inviter": inviter
}
if join_url:
event_data["join_url"] = join_url
return create_dialog_event(
event_type="INVITE",
event_data=event_data
)
def create_floor_request_event(
agent_id: str,
priority: int = 0
) -> Dict[str, Any]:
"""
Create FLOOR_REQUEST event
Args:
agent_id: ID of agent requesting floor
priority: Request priority (higher = more urgent)
Returns:
DialogEventObject for FLOOR_REQUEST
"""
return create_dialog_event(
event_type="FLOOR_REQUEST",
event_data={
"agent_id": agent_id,
"priority": priority
}
)
def create_agent_joined_event(
agent_id: str,
agent_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Create AGENT_JOINED event
Args:
agent_id: ID of agent that joined
agent_name: Optional agent name
Returns:
DialogEventObject for AGENT_JOINED
"""
event_data = {"agent_id": agent_id}
if agent_name:
event_data["agent_name"] = agent_name
return create_dialog_event(
event_type="AGENT_JOINED",
event_data=event_data
)
def create_agent_left_event(
agent_id: str,
reason: str = "disconnected"
) -> Dict[str, Any]:
"""
Create AGENT_LEFT event
Args:
agent_id: ID of agent that left
reason: Reason for leaving
Returns:
DialogEventObject for AGENT_LEFT
"""
return create_dialog_event(
event_type="AGENT_LEFT",
event_data={
"agent_id": agent_id,
"reason": reason
}
)