| | import asyncio |
| | import json |
| | import time |
| | import uuid |
| |
|
| | import pytest |
| | from langflow.events.event_manager import EventManager |
| | from langflow.schema.log import LoggableType |
| |
|
| |
|
| | class TestEventManager: |
| | |
| | def test_register_event_with_valid_name_and_callback_with_mock_callback(self): |
| | def mock_callback(event_type: str, data: LoggableType): |
| | pass |
| |
|
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type", mock_callback) |
| | assert "on_test_event" in manager.events |
| | assert manager.events["on_test_event"].func == mock_callback |
| |
|
| | |
| |
|
| | def test_register_event_with_empty_name(self): |
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | with pytest.raises(ValueError, match="Event name cannot be empty"): |
| | manager.register_event("", "test_type") |
| |
|
| | |
| | def test_register_event_with_valid_name_and_no_callback(self): |
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type") |
| | assert "on_test_event" in manager.events |
| | assert manager.events["on_test_event"].func == manager.send_event |
| |
|
| | |
| | def test_accessing_non_registered_event_callback_with_recommended_fix(self): |
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | result = manager.__getattr__("non_registered_event") |
| | assert result == manager.noop |
| |
|
| | |
| | def test_accessing_registered_event_callback(self): |
| | def mock_callback(event_type: str, data: LoggableType): |
| | pass |
| |
|
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type", mock_callback) |
| | assert manager.on_test_event.func == mock_callback |
| |
|
| | |
| | def test_handling_large_number_of_events(self): |
| | def mock_queue_put_nowait(item): |
| | pass |
| |
|
| | queue = asyncio.Queue() |
| | queue.put_nowait = mock_queue_put_nowait |
| | manager = EventManager(queue) |
| |
|
| | for i in range(1000): |
| | manager.register_event(f"on_test_event_{i}", "test_type", manager.noop) |
| |
|
| | assert len(manager.events) == 1000 |
| |
|
| | |
| | def test_register_event_with_invalid_name_fixed(self): |
| | def mock_callback(event_type, data): |
| | pass |
| |
|
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | with pytest.raises(ValueError, match="Event name cannot be empty"): |
| | manager.register_event("", "test_type", mock_callback) |
| | with pytest.raises(ValueError, match="Event name must start with 'on_'"): |
| | manager.register_event("invalid_name", "test_type", mock_callback) |
| |
|
| | |
| | async def test_sending_event_with_complex_data(self): |
| | queue = asyncio.Queue() |
| |
|
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type", manager.noop) |
| | data = {"key": "value", "nested": [1, 2, 3]} |
| | manager.send_event(event_type="test_type", data=data) |
| | event_id, str_data, event_time = await queue.get() |
| | assert event_id is not None |
| | assert str_data is not None |
| | assert event_time <= time.time() |
| |
|
| | |
| | def test_sending_event_with_none_data(self): |
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type") |
| | assert "on_test_event" in manager.events |
| | assert manager.events["on_test_event"].func.__name__ == "send_event" |
| |
|
| | |
| | async def test_thread_safety_accessing_events_dictionary(self): |
| | def mock_callback(event_type: str, data: LoggableType): |
| | pass |
| |
|
| | async def register_events(manager): |
| | manager.register_event("on_test_event_1", "test_type_1", mock_callback) |
| | manager.register_event("on_test_event_2", "test_type_2", mock_callback) |
| |
|
| | async def access_events(manager): |
| | assert "on_test_event_1" in manager.events |
| | assert "on_test_event_2" in manager.events |
| |
|
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| |
|
| | await asyncio.gather(register_events(manager), access_events(manager)) |
| |
|
| | |
| | def test_performance_impact_frequent_registrations(self): |
| | def mock_callback(event_type: str, data: LoggableType): |
| | pass |
| |
|
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | for i in range(1000): |
| | manager.register_event(f"on_test_event_{i}", "test_type", mock_callback) |
| | assert len(manager.events) == 1000 |
| |
|
| | |
| | import pytest |
| |
|
| | async def test_event_id_uniqueness_with_await(self): |
| | queue = asyncio.Queue() |
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type") |
| | manager.on_test_event(data={"data_1": "value_1"}) |
| | manager.on_test_event(data={"data_2": "value_2"}) |
| | try: |
| | event_id_1, _, _ = await queue.get() |
| | event_id_2, _, _ = await queue.get() |
| | except asyncio.TimeoutError: |
| | pytest.fail("Test timed out while waiting for queue items") |
| |
|
| | assert event_id_1 != event_id_2 |
| |
|
| | |
| | async def test_queue_receives_correct_event_data_format(self): |
| | async def mock_queue_put_nowait(data): |
| | pass |
| |
|
| | async def mock_queue_get(): |
| | return (uuid.uuid4(), b'{"event": "test_type", "data": "test_data"}\n\n', time.time()) |
| |
|
| | queue = asyncio.Queue() |
| | queue.put_nowait = mock_queue_put_nowait |
| | queue.get = mock_queue_get |
| |
|
| | manager = EventManager(queue) |
| | manager.register_event("on_test_event", "test_type", manager.noop) |
| | event_data = "test_data" |
| | manager.send_event(event_type="test_type", data=event_data) |
| |
|
| | event_id, str_data, _ = await queue.get() |
| | assert isinstance(event_id, uuid.UUID) |
| | assert isinstance(str_data, bytes) |
| | assert json.loads(str_data.decode("utf-8")) == {"event": "test_type", "data": event_data} |
| |
|
| | |
| | def test_register_event_without_event_type_argument_fixed(self): |
| | class MockQueue: |
| | def __init__(self): |
| | self.data = [] |
| |
|
| | def put_nowait(self, item): |
| | self.data.append(item) |
| |
|
| | queue = MockQueue() |
| | event_manager = EventManager(queue) |
| | event_manager.register_event("on_test_event", "test_event_type", callback=event_manager.noop) |
| | event_manager.send_event(event_type="test_type", data={"key": "value"}) |
| |
|
| | assert len(queue.data) == 1 |
| | event_id, str_data, timestamp = queue.data[0] |
| | |
| | event_type_from_id = event_id.split("-")[0] |
| | assert event_type_from_id == "test_type" |
| | uuid_from_id = event_id.split(event_type_from_id)[1] |
| | assert isinstance(uuid_from_id, str) |
| | |
| | try: |
| | uuid.UUID(uuid_from_id) |
| | except ValueError: |
| | pytest.fail(f"Invalid UUID: {uuid_from_id}") |
| | assert isinstance(str_data, bytes) |
| | assert isinstance(timestamp, float) |
| |
|
| | |
| | def test_accessing_non_registered_callback(self): |
| | class MockQueue: |
| | def __init__(self): |
| | pass |
| |
|
| | def put_nowait(self, item): |
| | pass |
| |
|
| | queue = MockQueue() |
| | event_manager = EventManager(queue) |
| |
|
| | |
| | callback = event_manager.on_non_existing_event |
| | assert callback.__name__ == "noop" |
| |
|