|
|
import json |
|
|
import os |
|
|
from unittest.mock import patch |
|
|
|
|
|
import pytest |
|
|
from langflow.logging.logger import SizedLogBuffer |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def sized_log_buffer(): |
|
|
return SizedLogBuffer() |
|
|
|
|
|
|
|
|
def test_init_default(): |
|
|
buffer = SizedLogBuffer() |
|
|
assert buffer.max == 0 |
|
|
assert buffer._max_readers == 20 |
|
|
|
|
|
|
|
|
def test_init_with_env_variable(): |
|
|
with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}): |
|
|
buffer = SizedLogBuffer() |
|
|
assert buffer.max == 100 |
|
|
|
|
|
|
|
|
def test_write(sized_log_buffer): |
|
|
message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600.1244334}}}) |
|
|
sized_log_buffer.max = 1 |
|
|
sized_log_buffer.write(message) |
|
|
assert len(sized_log_buffer.buffer) == 1 |
|
|
assert sized_log_buffer.buffer[0][0] == 1625097600124 |
|
|
assert sized_log_buffer.buffer[0][1] == "Test log" |
|
|
|
|
|
|
|
|
def test_write_overflow(sized_log_buffer): |
|
|
sized_log_buffer.max = 2 |
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] |
|
|
for message in messages: |
|
|
sized_log_buffer.write(message) |
|
|
|
|
|
assert len(sized_log_buffer.buffer) == 2 |
|
|
assert sized_log_buffer.buffer[0][0] == 1625097601000 |
|
|
assert sized_log_buffer.buffer[1][0] == 1625097602000 |
|
|
|
|
|
|
|
|
def test_len(sized_log_buffer): |
|
|
sized_log_buffer.max = 3 |
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] |
|
|
for message in messages: |
|
|
sized_log_buffer.write(message) |
|
|
|
|
|
assert len(sized_log_buffer) == 3 |
|
|
|
|
|
|
|
|
def test_get_after_timestamp(sized_log_buffer): |
|
|
sized_log_buffer.max = 5 |
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] |
|
|
for message in messages: |
|
|
sized_log_buffer.write(message) |
|
|
|
|
|
result = sized_log_buffer.get_after_timestamp(1625097602000, lines=2) |
|
|
assert len(result) == 2 |
|
|
assert 1625097603000 in result |
|
|
assert 1625097602000 in result |
|
|
|
|
|
|
|
|
def test_get_before_timestamp(sized_log_buffer): |
|
|
sized_log_buffer.max = 5 |
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] |
|
|
for message in messages: |
|
|
sized_log_buffer.write(message) |
|
|
|
|
|
result = sized_log_buffer.get_before_timestamp(1625097603000, lines=2) |
|
|
assert len(result) == 2 |
|
|
assert 1625097601000 in result |
|
|
assert 1625097602000 in result |
|
|
|
|
|
|
|
|
def test_get_last_n(sized_log_buffer): |
|
|
sized_log_buffer.max = 5 |
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] |
|
|
for message in messages: |
|
|
sized_log_buffer.write(message) |
|
|
|
|
|
result = sized_log_buffer.get_last_n(3) |
|
|
assert len(result) == 3 |
|
|
assert 1625097602000 in result |
|
|
assert 1625097603000 in result |
|
|
assert 1625097604000 in result |
|
|
|
|
|
|
|
|
def test_enabled(sized_log_buffer): |
|
|
assert not sized_log_buffer.enabled() |
|
|
sized_log_buffer.max = 1 |
|
|
assert sized_log_buffer.enabled() |
|
|
|
|
|
|
|
|
def test_max_size(sized_log_buffer): |
|
|
assert sized_log_buffer.max_size() == 0 |
|
|
sized_log_buffer.max = 100 |
|
|
assert sized_log_buffer.max_size() == 100 |
|
|
|