Spaces:
Running
Running
File size: 887 Bytes
1d47e3c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | import asyncio
import pytest
from app.main import _qdrant_keepalive_loop
class _FakeQdrant:
def __init__(self) -> None:
self.calls = 0
def get_collections(self) -> None:
self.calls += 1
@pytest.mark.asyncio
async def test_keepalive_loop_pings_qdrant() -> None:
qdrant = _FakeQdrant()
stop_event = asyncio.Event()
task = asyncio.create_task(
_qdrant_keepalive_loop(qdrant=qdrant, interval_seconds=1, stop_event=stop_event)
)
await asyncio.sleep(1.2)
stop_event.set()
await asyncio.wait_for(task, timeout=1)
assert qdrant.calls >= 1
@pytest.mark.asyncio
async def test_keepalive_loop_disabled_when_interval_non_positive() -> None:
qdrant = _FakeQdrant()
stop_event = asyncio.Event()
await _qdrant_keepalive_loop(qdrant=qdrant, interval_seconds=0, stop_event=stop_event)
assert qdrant.calls == 0
|