|
|
import asyncio |
|
|
from typing import Any |
|
|
from unittest.mock import MagicMock |
|
|
|
|
|
import pytest |
|
|
from langflow.custom.custom_component.component import Component |
|
|
from langflow.events.event_manager import EventManager |
|
|
from langflow.schema.content_block import ContentBlock |
|
|
from langflow.schema.content_types import TextContent, ToolContent |
|
|
from langflow.schema.message import Message |
|
|
from langflow.schema.properties import Source |
|
|
from langflow.template.field.base import Output |
|
|
|
|
|
|
|
|
async def create_event_queue(): |
|
|
"""Create a queue for testing events.""" |
|
|
return asyncio.Queue() |
|
|
|
|
|
|
|
|
class ComponentForTesting(Component): |
|
|
"""Test component that implements basic functionality.""" |
|
|
|
|
|
def build(self) -> None: |
|
|
pass |
|
|
|
|
|
def get_text(self) -> str: |
|
|
"""Return a simple text output.""" |
|
|
return "test output" |
|
|
|
|
|
def get_tool(self) -> dict[str, Any]: |
|
|
"""Return a tool output.""" |
|
|
return {"name": "test_tool", "description": "A test tool"} |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_message_sending(): |
|
|
"""Test component's message sending functionality.""" |
|
|
|
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
|
|
|
|
|
|
component = ComponentForTesting() |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
message = Message( |
|
|
sender="test_sender", |
|
|
session_id="test_session", |
|
|
sender_name="test_sender_name", |
|
|
content_blocks=[ContentBlock(title="Test Block", contents=[TextContent(type="text", text="Test message")])], |
|
|
) |
|
|
|
|
|
|
|
|
sent_message = await component.send_message(message) |
|
|
|
|
|
|
|
|
assert sent_message.id is not None |
|
|
assert len(sent_message.content_blocks) == 1 |
|
|
assert isinstance(sent_message.content_blocks[0].contents[0], TextContent) |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_tool_output(): |
|
|
"""Test component's tool output functionality.""" |
|
|
|
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
|
|
|
|
|
|
component = ComponentForTesting() |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
message = Message( |
|
|
sender="test_sender", |
|
|
session_id="test_session", |
|
|
sender_name="test_sender_name", |
|
|
content_blocks=[ |
|
|
ContentBlock( |
|
|
title="Tool Output", |
|
|
contents=[ToolContent(type="tool_use", name="test_tool", tool_input={"query": "test input"})], |
|
|
) |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
sent_message = await component.send_message(message) |
|
|
|
|
|
|
|
|
assert sent_message.id is not None |
|
|
assert len(sent_message.content_blocks) == 1 |
|
|
assert isinstance(sent_message.content_blocks[0].contents[0], ToolContent) |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_error_handling(): |
|
|
"""Test component's error handling.""" |
|
|
|
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
|
|
|
|
|
|
component = ComponentForTesting() |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
class CustomError(Exception): |
|
|
pass |
|
|
|
|
|
try: |
|
|
msg = "Test error" |
|
|
raise CustomError(msg) |
|
|
except CustomError as e: |
|
|
sent_message = await component.send_error( |
|
|
exception=e, |
|
|
session_id="test_session", |
|
|
trace_name="test_trace", |
|
|
source=Source(id="test_id", display_name="Test Component", source="Test Component"), |
|
|
) |
|
|
|
|
|
|
|
|
assert sent_message is not None |
|
|
assert "Test error" in str(sent_message.text) |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_build_results(): |
|
|
"""Test component's build_results functionality.""" |
|
|
|
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
|
|
|
|
|
|
component = ComponentForTesting() |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
component._outputs_map = { |
|
|
"text_output": Output(name="text_output", method="get_text"), |
|
|
"tool_output": Output(name="tool_output", method="get_tool"), |
|
|
} |
|
|
|
|
|
|
|
|
results, artifacts = await component._build_results() |
|
|
|
|
|
|
|
|
assert "text_output" in results |
|
|
assert results["text_output"] == "test output" |
|
|
assert "tool_output" in results |
|
|
assert results["tool_output"]["name"] == "test_tool" |
|
|
|
|
|
|
|
|
assert "text_output" in artifacts |
|
|
assert "tool_output" in artifacts |
|
|
assert artifacts["text_output"]["type"] == "text" |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_logging(): |
|
|
"""Test component's logging functionality.""" |
|
|
|
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
|
|
|
|
|
|
component = ComponentForTesting() |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
component._current_output = "test_output" |
|
|
component._id = "test_component_id" |
|
|
|
|
|
|
|
|
def log_callback(*, manager: EventManager, event_type: str, data: dict): |
|
|
manager.send_event( |
|
|
event_type="info", data={"message": data["message"], "id": data.get("component_id", "test_id")} |
|
|
) |
|
|
|
|
|
|
|
|
event_manager.register_event("on_log", "info", callback=log_callback) |
|
|
|
|
|
|
|
|
await asyncio.to_thread(component.log, "Test log message") |
|
|
|
|
|
|
|
|
event_id, event_data, _ = queue.get_nowait() |
|
|
event = event_data.decode("utf-8") |
|
|
|
|
|
assert "Test log message" in event |
|
|
assert event_id.startswith("info-") |
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("client") |
|
|
async def test_component_streaming_message(): |
|
|
"""Test component's streaming message functionality.""" |
|
|
queue = await create_event_queue() |
|
|
event_manager = EventManager(queue) |
|
|
event_manager.register_event("on_token", "token") |
|
|
|
|
|
|
|
|
vertex = MagicMock() |
|
|
mock_graph = MagicMock() |
|
|
mock_graph.flow_id = "12345678-1234-5678-1234-567812345678" |
|
|
vertex.graph = mock_graph |
|
|
|
|
|
component = ComponentForTesting(_vertex=vertex) |
|
|
component.set_event_manager(event_manager) |
|
|
|
|
|
|
|
|
class StreamChunk: |
|
|
def __init__(self, content: str): |
|
|
self.content = content |
|
|
|
|
|
async def text_generator(): |
|
|
chunks = ["Hello", " ", "World", "!"] |
|
|
for chunk in chunks: |
|
|
yield StreamChunk(chunk) |
|
|
|
|
|
|
|
|
message = Message( |
|
|
sender="test_sender", |
|
|
session_id="test_session", |
|
|
sender_name="test_sender_name", |
|
|
text=text_generator(), |
|
|
) |
|
|
|
|
|
|
|
|
sent_message = await component.send_message(message) |
|
|
|
|
|
|
|
|
assert sent_message.id is not None |
|
|
assert sent_message.text == "Hello World!" |
|
|
|
|
|
|
|
|
tokens = [] |
|
|
while not queue.empty(): |
|
|
_, event_data, _ = queue.get_nowait() |
|
|
event = event_data.decode("utf-8") |
|
|
if "token" in event: |
|
|
tokens.append(event) |
|
|
|
|
|
assert len(tokens) > 0 |
|
|
|