Spaces:
Runtime error
Runtime error
File size: 6,710 Bytes
66227af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
from src.schemas import DreamType, ReconcilerType, ResolvedConfiguration
class BasePayload(BaseModel):
"""Base payload with common fields."""
model_config = ConfigDict(extra="forbid") # pyright: ignore[reportUnannotatedClassAttribute]
class RepresentationPayload(BasePayload):
"""Payload for representation tasks."""
task_type: Literal["representation"] = "representation"
session_name: str
content: str
observers: list[str]
observed: str
created_at: datetime
configuration: ResolvedConfiguration
class RepresentationPayloads(BasePayload):
"""Payload for a batch of representation tasks."""
payloads: list[RepresentationPayload]
class SummaryPayload(BasePayload):
"""Payload for summary tasks."""
task_type: Literal["summary"] = "summary"
session_name: str
message_seq_in_session: int
configuration: ResolvedConfiguration
# Optional for backward compatibility with older queue items
message_public_id: str | None = None
class WebhookPayload(BasePayload):
"""Payload for webhook delivery tasks."""
task_type: Literal["webhook"] = "webhook"
event_type: str
data: dict[str, Any]
class DreamPayload(BasePayload):
"""Payload for dream tasks."""
task_type: Literal["dream"] = "dream"
dream_type: DreamType
observer: str
observed: str
session_name: str | None = None
class DeletionPayload(BasePayload):
"""Payload for deletion tasks."""
task_type: Literal["deletion"] = "deletion"
deletion_type: Literal["session", "observation", "workspace"]
resource_id: str
class ReconcilerPayload(BasePayload):
"""Payload for reconciler tasks (vector sync, queue cleanup, self-healing)."""
task_type: Literal["reconciler"] = "reconciler"
reconciler_type: ReconcilerType
def create_webhook_payload(
event_type: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Create a webhook payload."""
return WebhookPayload(event_type=event_type, data=data).model_dump(
mode="json", exclude_none=True
)
def create_dream_payload(
dream_type: DreamType,
*,
observer: str,
observed: str,
session_name: str | None = None,
) -> dict[str, Any]:
"""Create a dream payload."""
return DreamPayload(
dream_type=dream_type,
observer=observer,
observed=observed,
session_name=session_name,
).model_dump(mode="json", exclude_none=True)
def create_deletion_payload(
deletion_type: Literal["session", "observation", "workspace"],
resource_id: str,
) -> dict[str, Any]:
"""Create a deletion payload."""
return DeletionPayload(
deletion_type=deletion_type,
resource_id=resource_id,
).model_dump(mode="json", exclude_none=True)
def create_payload(
message: dict[str, Any],
configuration: ResolvedConfiguration,
task_type: Literal["representation", "summary"],
message_seq_in_session: int | None = None,
*,
observers: list[str] | None = None,
observed: str | None = None,
) -> dict[str, Any]:
"""
Create a processed payload from a message for queue processing.
Note: workspace_name and message_id are no longer included in the returned payload
as they are now stored in dedicated columns on the queue table. The caller is
responsible for extracting and passing these values separately.
Args:
message: The original message dictionary
task_type: Type of task ('representation' or 'summary')
message_seq_in_session: Required for summary tasks, must be None for representation
observers: List of observer peer names (required for representation tasks)
observed: Name of the observed peer (*always* the peer who sent the message) (required for representation tasks)
Returns:
Processed payload dictionary ready for queue processing (without workspace_name and message_id)
Raises:
ValueError: If the payload doesn't match the expected schema
"""
workspace_name = message.get("workspace_name")
session_name = message.get("session_name")
message_id = message.get("message_id")
if not isinstance(workspace_name, str):
raise TypeError("Workspace name must be a string")
if not isinstance(session_name, str):
raise TypeError("Session name must be a string")
if not isinstance(message_id, int):
raise TypeError("Message ID must be an integer")
# Create the appropriate payload type based on task_type
try:
if task_type == "representation":
content = message.get("content")
created_at = message.get("created_at")
if not isinstance(content, str):
raise TypeError("Message content must be a string")
if not isinstance(created_at, datetime):
raise TypeError("created_at must be a datetime object")
if observers is None or len(observers) == 0:
raise ValueError("observers is required for representation tasks")
if observed is None:
raise ValueError("observed is required for representation tasks")
validated_payload = RepresentationPayload(
content=content,
session_name=session_name,
created_at=created_at,
observers=observers,
observed=observed,
configuration=configuration,
)
elif task_type == "summary":
if message_seq_in_session is None:
raise ValueError("message_seq_in_session is required for summary tasks")
message_public_id = message.get("message_public_id")
if message_public_id is not None and (
not isinstance(message_public_id, str) or not message_public_id.strip()
):
raise ValueError(
"message_public_id must be a non-empty string if provided"
)
validated_payload = SummaryPayload(
session_name=session_name,
message_seq_in_session=message_seq_in_session,
configuration=configuration,
message_public_id=message_public_id,
)
# Convert back to dict for compatibility with JSON serialization
# mode='json' ensures datetime is converted to ISO string
payload = validated_payload.model_dump(mode="json", exclude_none=True)
except Exception as e:
raise ValueError(f"Failed to create valid payload: {str(e)}") from e
return payload
|