| from typing import Any | |
| from langchain_core.messages import ChatMessage | |
| from langgraph.types import StreamWriter | |
| from pydantic import BaseModel, Field | |
| class CustomData(BaseModel): | |
| "Custom data being sent by an agent" | |
| data: dict[str, Any] = Field(description="The custom data") | |
| def to_langchain(self) -> ChatMessage: | |
| return ChatMessage(content=[self.data], role="custom") | |
| def dispatch(self, writer: StreamWriter) -> None: | |
| writer(self.to_langchain()) | |