docsifer / tests /unit /test_safety.py
lamhieu's picture
refactor(core): overhaul architecture for better performance, efficiency, and maintainability
41788c4
Raw
History Blame Contribute Delete
1.7 kB
import asyncio
import pytest
from docsifer.exceptions import (
CircuitOpenError,
QueueFullError,
TooManyRequestsError,
)
from docsifer.safety.circuit_breaker import CircuitBreaker
from docsifer.safety.conversion_gate import ConversionGate
from docsifer.safety.per_ip_limiter import PerIPLimiter
@pytest.mark.asyncio
async def test_conversion_gate_admits_up_to_max() -> None:
gate = ConversionGate(max_concurrent=2, max_queue=0)
async def _job() -> None:
async with gate.acquire():
await asyncio.sleep(0.05)
await asyncio.gather(_job(), _job()) # OK, capacity = 2
@pytest.mark.asyncio
async def test_conversion_gate_rejects_when_queue_full() -> None:
gate = ConversionGate(max_concurrent=1, max_queue=0)
async with gate.acquire():
with pytest.raises(QueueFullError):
async with gate.acquire():
pass
@pytest.mark.asyncio
async def test_per_ip_limiter() -> None:
lim = PerIPLimiter(max_per_ip=1)
async with lim.acquire("1.2.3.4"):
with pytest.raises(TooManyRequestsError):
async with lim.acquire("1.2.3.4"):
pass
# Same IP can acquire again after release
async with lim.acquire("1.2.3.4"):
pass
@pytest.mark.asyncio
async def test_circuit_breaker_opens_on_failures() -> None:
cb = CircuitBreaker(failure_threshold=2, reset_timeout_sec=60)
async def _bad() -> None:
raise RuntimeError("boom")
with pytest.raises(RuntimeError):
await cb.call(_bad)
with pytest.raises(RuntimeError):
await cb.call(_bad)
with pytest.raises(CircuitOpenError):
await cb.call(_bad)
assert cb.state == "open"