| | from collections.abc import AsyncIterator |
| | from typing import Any |
| | from unittest.mock import AsyncMock |
| |
|
| | from langchain_core.agents import AgentFinish |
| | from langflow.base.agents.agent import process_agent_events |
| | from langflow.base.agents.events import ( |
| | handle_on_chain_end, |
| | handle_on_chain_start, |
| | handle_on_chain_stream, |
| | handle_on_tool_end, |
| | handle_on_tool_error, |
| | handle_on_tool_start, |
| | ) |
| | from langflow.schema.content_block import ContentBlock |
| | from langflow.schema.content_types import ToolContent |
| | from langflow.schema.message import Message |
| | from langflow.utils.constants import MESSAGE_SENDER_AI |
| |
|
| |
|
| | async def create_event_iterator(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: |
| | """Helper function to create an async iterator from a list of events.""" |
| | for event in events: |
| | yield event |
| |
|
| |
|
| | async def test_chain_start_event(): |
| | """Test handling of on_chain_start event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | events = [ |
| | {"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0} |
| | ] |
| |
|
| | |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | send_message.return_value = agent_message |
| |
|
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert result.properties.icon == "Bot" |
| | assert len(result.content_blocks) == 1 |
| | assert result.content_blocks[0].title == "Agent Steps" |
| |
|
| |
|
| | async def test_chain_end_event(): |
| | """Test handling of on_chain_end event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | |
| | output = AgentFinish(return_values={"output": "final output"}, log="test log") |
| |
|
| | events = [{"event": "on_chain_end", "data": {"output": output}, "start_time": 0}] |
| |
|
| | |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | send_message.return_value = agent_message |
| |
|
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert result.properties.icon == "Bot" |
| | assert result.properties.state == "complete" |
| | assert result.text == "final output" |
| |
|
| |
|
| | async def test_tool_start_event(): |
| | """Test handling of on_tool_start event.""" |
| | send_message = AsyncMock() |
| |
|
| | |
| | def update_message(message): |
| | |
| | return Message(**message.model_dump()) |
| |
|
| | send_message.side_effect = update_message |
| |
|
| | events = [ |
| | { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | "start_time": 0, |
| | } |
| | ] |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert result.properties.icon == "Bot" |
| | assert len(result.content_blocks) == 1 |
| | assert result.content_blocks[0].title == "Agent Steps" |
| | assert len(result.content_blocks[0].contents) > 0 |
| | tool_content = result.content_blocks[0].contents[-1] |
| | assert isinstance(tool_content, ToolContent) |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.tool_input == {"query": "tool input"}, tool_content |
| |
|
| |
|
| | async def test_tool_end_event(): |
| | """Test handling of on_tool_end event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | events = [ |
| | { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | "start_time": 0, |
| | }, |
| | { |
| | "event": "on_tool_end", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"output": "tool output"}, |
| | "start_time": 0, |
| | }, |
| | ] |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert len(result.content_blocks) == 1 |
| | tool_content = result.content_blocks[0].contents[-1] |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.output == "tool output" |
| |
|
| |
|
| | async def test_tool_error_event(): |
| | """Test handling of on_tool_error event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | events = [ |
| | { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | "start_time": 0, |
| | }, |
| | { |
| | "event": "on_tool_error", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"error": "error message"}, |
| | "start_time": 0, |
| | }, |
| | ] |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| |
|
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | tool_content = result.content_blocks[0].contents[-1] |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.error == "error message" |
| | assert tool_content.header["title"] == "Error using **test_tool**" |
| |
|
| |
|
| | async def test_chain_stream_event(): |
| | """Test handling of on_chain_stream event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | events = [{"event": "on_chain_stream", "data": {"chunk": {"output": "streamed output"}}, "start_time": 0}] |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert result.properties.state == "complete" |
| | assert result.text == "streamed output" |
| |
|
| |
|
| | async def test_multiple_events(): |
| | """Test handling of multiple events in sequence.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| |
|
| | |
| | output = AgentFinish(return_values={"output": "final output"}, log="test log") |
| |
|
| | events = [ |
| | {"event": "on_chain_start", "data": {"input": {"input": "initial input", "chat_history": []}}, "start_time": 0}, |
| | { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | "start_time": 0, |
| | }, |
| | { |
| | "event": "on_tool_end", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"output": "tool output"}, |
| | "start_time": 0, |
| | }, |
| | {"event": "on_chain_end", "data": {"output": output}, "start_time": 0}, |
| | ] |
| |
|
| | |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | send_message.return_value = agent_message |
| |
|
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | assert result.properties.state == "complete" |
| | assert result.properties.icon == "Bot" |
| | assert len(result.content_blocks) == 1 |
| | assert result.text == "final output" |
| |
|
| |
|
| | async def test_unknown_event(): |
| | """Test handling of unknown event type.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | send_message.return_value = agent_message |
| |
|
| | events = [{"event": "unknown_event", "data": {"some": "data"}, "start_time": 0}] |
| |
|
| | result = await process_agent_events(create_event_iterator(events), agent_message, send_message) |
| |
|
| | |
| | assert result.properties.state == "complete" |
| | |
| | assert len(result.content_blocks) == 1 |
| | assert len(result.content_blocks[0].contents) == 0 |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def test_handle_on_chain_start_with_input(): |
| | """Test handle_on_chain_start with input.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = {"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert len(updated_message.content_blocks) == 1 |
| | assert updated_message.content_blocks[0].title == "Agent Steps" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_start_no_input(): |
| | """Test handle_on_chain_start without input.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = {"event": "on_chain_start", "data": {}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert len(updated_message.content_blocks) == 1 |
| | assert len(updated_message.content_blocks[0].contents) == 0 |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_end_with_output(): |
| | """Test handle_on_chain_end with output.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| |
|
| | output = AgentFinish(return_values={"output": "final output"}, log="test log") |
| | event = {"event": "on_chain_end", "data": {"output": output}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert updated_message.properties.state == "complete" |
| | assert updated_message.text == "final output" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_end_no_output(): |
| | """Test handle_on_chain_end without output key in data.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = {"event": "on_chain_end", "data": {}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert updated_message.properties.state == "partial" |
| | assert updated_message.text == "" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_end_empty_data(): |
| | """Test handle_on_chain_end with empty data.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = {"event": "on_chain_end", "data": {"output": None}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert updated_message.properties.state == "partial" |
| | assert updated_message.text == "" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_end_with_empty_return_values(): |
| | """Test handle_on_chain_end with empty return_values.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| |
|
| | class MockOutputEmptyReturnValues: |
| | def __init__(self): |
| | self.return_values = {} |
| |
|
| | event = {"event": "on_chain_end", "data": {"output": MockOutputEmptyReturnValues()}, "start_time": 0} |
| |
|
| | updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.properties.icon == "Bot" |
| | assert updated_message.properties.state == "partial" |
| | assert updated_message.text == "" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_tool_start(): |
| | """Test handle_on_tool_start event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | tool_blocks_map = {} |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | "start_time": 0, |
| | } |
| |
|
| | updated_message, start_time = await handle_on_tool_start(event, agent_message, tool_blocks_map, send_message, 0.0) |
| |
|
| | assert len(updated_message.content_blocks) == 1 |
| | assert len(updated_message.content_blocks[0].contents) > 0 |
| | tool_key = f"{event['name']}_{event['run_id']}" |
| | tool_content = updated_message.content_blocks[0].contents[-1] |
| | assert tool_content == tool_blocks_map.get(tool_key) |
| | assert isinstance(tool_content, ToolContent) |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.tool_input == {"query": "tool input"} |
| | assert isinstance(tool_content.duration, int) |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_tool_end(): |
| | """Test handle_on_tool_end event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | tool_blocks_map = {} |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| |
|
| | start_event = { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | } |
| | agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0) |
| |
|
| | end_event = { |
| | "event": "on_tool_end", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"output": "tool output"}, |
| | "start_time": 0, |
| | } |
| |
|
| | updated_message, start_time = await handle_on_tool_end(end_event, agent_message, tool_blocks_map, send_message, 0.0) |
| |
|
| | f"{end_event['name']}_{end_event['run_id']}" |
| | tool_content = updated_message.content_blocks[0].contents[-1] |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.output == "tool output" |
| | assert isinstance(tool_content.duration, int) |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_tool_error(): |
| | """Test handle_on_tool_error event.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | tool_blocks_map = {} |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| |
|
| | start_event = { |
| | "event": "on_tool_start", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"input": {"query": "tool input"}}, |
| | } |
| | agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0) |
| |
|
| | error_event = { |
| | "event": "on_tool_error", |
| | "name": "test_tool", |
| | "run_id": "test_run", |
| | "data": {"error": "error message"}, |
| | "start_time": 0, |
| | } |
| |
|
| | updated_message, start_time = await handle_on_tool_error( |
| | error_event, agent_message, tool_blocks_map, send_message, 0.0 |
| | ) |
| |
|
| | tool_content = updated_message.content_blocks[0].contents[-1] |
| | assert tool_content.name == "test_tool" |
| | assert tool_content.error == "error message" |
| | assert tool_content.header["title"] == "Error using **test_tool**" |
| | assert isinstance(tool_content.duration, int) |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_stream_with_output(): |
| | """Test handle_on_chain_stream with output.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | ) |
| | event = { |
| | "event": "on_chain_stream", |
| | "data": {"chunk": {"output": "streamed output"}}, |
| | } |
| |
|
| | updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.text == "streamed output" |
| | assert updated_message.properties.state == "complete" |
| | assert isinstance(start_time, float) |
| |
|
| |
|
| | async def test_handle_on_chain_stream_no_output(): |
| | """Test handle_on_chain_stream without output.""" |
| | send_message = AsyncMock(side_effect=lambda message: message) |
| | agent_message = Message( |
| | sender=MESSAGE_SENDER_AI, |
| | sender_name="Agent", |
| | properties={"icon": "Bot", "state": "partial"}, |
| | content_blocks=[ContentBlock(title="Agent Steps", contents=[])], |
| | session_id="test_session_id", |
| | ) |
| | event = { |
| | "event": "on_chain_stream", |
| | "data": {"chunk": {}}, |
| | } |
| |
|
| | updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0) |
| |
|
| | assert updated_message.text == "" |
| | assert updated_message.properties.state == "partial" |
| | assert isinstance(start_time, float) |
| |
|