| import inspect |
| from collections.abc import Callable |
| from datetime import datetime, timezone |
| from typing import Annotated, Literal |
| from uuid import UUID |
|
|
| from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator |
|
|
| from langflow.schema.content_block import ContentBlock |
| from langflow.schema.content_types import ErrorContent |
| from langflow.schema.properties import Properties |
| from langflow.schema.validators import timestamp_to_str_validator |
| from langflow.utils.constants import MESSAGE_SENDER_USER |
|
|
|
|
| class PlaygroundEvent(BaseModel): |
| model_config = ConfigDict(extra="allow", populate_by_name=True) |
| properties: Properties | None = Field(default=None) |
| sender_name: str | None = Field(default=None) |
| content_blocks: list[ContentBlock] | None = Field(default=None) |
| format_type: Literal["default", "error", "warning", "info"] = Field(default="default") |
| files: list[str] | None = Field(default=None) |
| text: str | None = Field(default=None) |
| timestamp: Annotated[str, timestamp_to_str_validator] = Field( |
| default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") |
| ) |
| id_: UUID | str | None = Field(default=None, alias="id") |
|
|
| @field_serializer("timestamp") |
| @classmethod |
| def serialize_timestamp(cls, v: str) -> str: |
| return v |
|
|
| @field_validator("id_") |
| @classmethod |
| def validate_id(cls, v: UUID | str | None) -> str | None: |
| if isinstance(v, UUID): |
| return str(v) |
| return v |
|
|
|
|
| class MessageEvent(PlaygroundEvent): |
| category: Literal["message", "error", "warning", "info"] = "message" |
| format_type: Literal["default", "error", "warning", "info"] = Field(default="default") |
| session_id: str | None = Field(default=None) |
| error: bool = Field(default=False) |
| edit: bool = Field(default=False) |
| flow_id: UUID | str | None = Field(default=None) |
| sender: str = Field(default=MESSAGE_SENDER_USER) |
| sender_name: str = Field(default="User") |
|
|
| @field_validator("flow_id") |
| @classmethod |
| def validate_flow_id(cls, v: UUID | str | None) -> str | None: |
| if isinstance(v, UUID): |
| return str(v) |
| return v |
|
|
|
|
| class ErrorEvent(MessageEvent): |
| background_color: str = Field(default="#FF0000") |
| text_color: str = Field(default="#FFFFFF") |
| format_type: Literal["default", "error", "warning", "info"] = Field(default="error") |
| allow_markdown: bool = Field(default=False) |
| category: Literal["error"] = "error" |
|
|
|
|
| class WarningEvent(PlaygroundEvent): |
| background_color: str = Field(default="#FFA500") |
| text_color: str = Field(default="#000000") |
| format_type: Literal["default", "error", "warning", "info"] = Field(default="warning") |
|
|
|
|
| class InfoEvent(PlaygroundEvent): |
| background_color: str = Field(default="#0000FF") |
| text_color: str = Field(default="#FFFFFF") |
| format_type: Literal["default", "error", "warning", "info"] = Field(default="info") |
|
|
|
|
| class TokenEvent(BaseModel): |
| chunk: str = Field(...) |
| id: UUID | str | None = Field(alias="id") |
| timestamp: Annotated[str, timestamp_to_str_validator] = Field( |
| default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") |
| ) |
|
|
|
|
| |
| def create_message( |
| text: str, |
| category: Literal["message", "error", "warning", "info"] = "message", |
| properties: dict | None = None, |
| content_blocks: list[ContentBlock] | None = None, |
| sender_name: str | None = None, |
| files: list[str] | None = None, |
| timestamp: str | None = None, |
| format_type: Literal["default", "error", "warning", "info"] = "default", |
| sender: str | None = None, |
| session_id: str | None = None, |
| id: UUID | str | None = None, |
| flow_id: UUID | str | None = None, |
| *, |
| error: bool = False, |
| edit: bool = False, |
| ) -> MessageEvent: |
| return MessageEvent( |
| text=text, |
| properties=properties, |
| category=category, |
| content_blocks=content_blocks, |
| sender_name=sender_name, |
| files=files, |
| timestamp=timestamp, |
| format_type=format_type, |
| sender=sender, |
| id=id, |
| session_id=session_id, |
| error=error, |
| edit=edit, |
| flow_id=flow_id, |
| ) |
|
|
|
|
| def create_error( |
| text: str, |
| properties: dict | None = None, |
| traceback: str | None = None, |
| title: str = "Error", |
| timestamp: str | None = None, |
| id: UUID | str | None = None, |
| flow_id: UUID | str | None = None, |
| session_id: str | None = None, |
| content_blocks: list[ContentBlock] | None = None, |
| ) -> ErrorEvent: |
| if traceback: |
| content_blocks = content_blocks or [] |
| content_blocks += [ContentBlock(title=title, contents=[ErrorContent(type="error", traceback=traceback)])] |
| return ErrorEvent( |
| text=text, |
| properties=properties, |
| content_blocks=content_blocks, |
| timestamp=timestamp, |
| id=id, |
| flow_id=flow_id, |
| session_id=session_id, |
| ) |
|
|
|
|
| def create_warning(message: str) -> WarningEvent: |
| return WarningEvent(text=message) |
|
|
|
|
| def create_info(message: str) -> InfoEvent: |
| return InfoEvent(text=message) |
|
|
|
|
| def create_token(chunk: str, id: str) -> TokenEvent: |
| return TokenEvent( |
| chunk=chunk, |
| id=id, |
| ) |
|
|
|
|
| _EVENT_CREATORS: dict[str, tuple[Callable, inspect.Signature]] = { |
| "message": (create_message, inspect.signature(create_message)), |
| "error": (create_error, inspect.signature(create_error)), |
| "warning": (create_warning, inspect.signature(create_warning)), |
| "info": (create_info, inspect.signature(create_info)), |
| "token": (create_token, inspect.signature(create_token)), |
| } |
|
|
|
|
| def create_event_by_type( |
| event_type: Literal["message", "error", "warning", "info", "token"], **kwargs |
| ) -> PlaygroundEvent | dict: |
| if event_type not in _EVENT_CREATORS: |
| return kwargs |
|
|
| creator_func, signature = _EVENT_CREATORS[event_type] |
| valid_params = {k: v for k, v in kwargs.items() if k in signature.parameters} |
| return creator_func(**valid_params) |
|
|