agent-flow / src /backend /base /langflow /schema /playground_events.py
truthtaicom's picture
Upload folder using huggingface_hub
4b0794d verified
import inspect
from collections.abc import Callable
from datetime import datetime, timezone
from typing import Annotated, Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
from langflow.schema.content_block import ContentBlock
from langflow.schema.content_types import ErrorContent
from langflow.schema.properties import Properties
from langflow.schema.validators import timestamp_to_str_validator
from langflow.utils.constants import MESSAGE_SENDER_USER
class PlaygroundEvent(BaseModel):
model_config = ConfigDict(extra="allow", populate_by_name=True)
properties: Properties | None = Field(default=None)
sender_name: str | None = Field(default=None)
content_blocks: list[ContentBlock] | None = Field(default=None)
format_type: Literal["default", "error", "warning", "info"] = Field(default="default")
files: list[str] | None = Field(default=None)
text: str | None = Field(default=None)
timestamp: Annotated[str, timestamp_to_str_validator] = Field(
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
)
id_: UUID | str | None = Field(default=None, alias="id")
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v: str) -> str:
return v
@field_validator("id_")
@classmethod
def validate_id(cls, v: UUID | str | None) -> str | None:
if isinstance(v, UUID):
return str(v)
return v
class MessageEvent(PlaygroundEvent):
category: Literal["message", "error", "warning", "info"] = "message"
format_type: Literal["default", "error", "warning", "info"] = Field(default="default")
session_id: str | None = Field(default=None)
error: bool = Field(default=False)
edit: bool = Field(default=False)
flow_id: UUID | str | None = Field(default=None)
sender: str = Field(default=MESSAGE_SENDER_USER)
sender_name: str = Field(default="User")
@field_validator("flow_id")
@classmethod
def validate_flow_id(cls, v: UUID | str | None) -> str | None:
if isinstance(v, UUID):
return str(v)
return v
class ErrorEvent(MessageEvent):
background_color: str = Field(default="#FF0000")
text_color: str = Field(default="#FFFFFF")
format_type: Literal["default", "error", "warning", "info"] = Field(default="error")
allow_markdown: bool = Field(default=False)
category: Literal["error"] = "error"
class WarningEvent(PlaygroundEvent):
background_color: str = Field(default="#FFA500")
text_color: str = Field(default="#000000")
format_type: Literal["default", "error", "warning", "info"] = Field(default="warning")
class InfoEvent(PlaygroundEvent):
background_color: str = Field(default="#0000FF")
text_color: str = Field(default="#FFFFFF")
format_type: Literal["default", "error", "warning", "info"] = Field(default="info")
class TokenEvent(BaseModel):
chunk: str = Field(...)
id: UUID | str | None = Field(alias="id")
timestamp: Annotated[str, timestamp_to_str_validator] = Field(
default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
)
# Factory functions first
def create_message(
text: str,
category: Literal["message", "error", "warning", "info"] = "message",
properties: dict | None = None,
content_blocks: list[ContentBlock] | None = None,
sender_name: str | None = None,
files: list[str] | None = None,
timestamp: str | None = None,
format_type: Literal["default", "error", "warning", "info"] = "default",
sender: str | None = None,
session_id: str | None = None,
id: UUID | str | None = None, # noqa: A002
flow_id: UUID | str | None = None,
*,
error: bool = False,
edit: bool = False,
) -> MessageEvent:
return MessageEvent(
text=text,
properties=properties,
category=category,
content_blocks=content_blocks,
sender_name=sender_name,
files=files,
timestamp=timestamp,
format_type=format_type,
sender=sender,
id=id,
session_id=session_id,
error=error,
edit=edit,
flow_id=flow_id,
)
def create_error(
text: str,
properties: dict | None = None,
traceback: str | None = None,
title: str = "Error",
timestamp: str | None = None,
id: UUID | str | None = None, # noqa: A002
flow_id: UUID | str | None = None,
session_id: str | None = None,
content_blocks: list[ContentBlock] | None = None,
) -> ErrorEvent:
if traceback:
content_blocks = content_blocks or []
content_blocks += [ContentBlock(title=title, contents=[ErrorContent(type="error", traceback=traceback)])]
return ErrorEvent(
text=text,
properties=properties,
content_blocks=content_blocks,
timestamp=timestamp,
id=id,
flow_id=flow_id,
session_id=session_id,
)
def create_warning(message: str) -> WarningEvent:
return WarningEvent(text=message)
def create_info(message: str) -> InfoEvent:
return InfoEvent(text=message)
def create_token(chunk: str, id: str) -> TokenEvent: # noqa: A002
return TokenEvent(
chunk=chunk,
id=id,
)
_EVENT_CREATORS: dict[str, tuple[Callable, inspect.Signature]] = {
"message": (create_message, inspect.signature(create_message)),
"error": (create_error, inspect.signature(create_error)),
"warning": (create_warning, inspect.signature(create_warning)),
"info": (create_info, inspect.signature(create_info)),
"token": (create_token, inspect.signature(create_token)),
}
def create_event_by_type(
event_type: Literal["message", "error", "warning", "info", "token"], **kwargs
) -> PlaygroundEvent | dict:
if event_type not in _EVENT_CREATORS:
return kwargs
creator_func, signature = _EVENT_CREATORS[event_type]
valid_params = {k: v for k, v in kwargs.items() if k in signature.parameters}
return creator_func(**valid_params)