| import asyncio |
|
|
| import pytest |
|
|
| from docsifer.exceptions import ( |
| CircuitOpenError, |
| QueueFullError, |
| TooManyRequestsError, |
| ) |
| from docsifer.safety.circuit_breaker import CircuitBreaker |
| from docsifer.safety.conversion_gate import ConversionGate |
| from docsifer.safety.per_ip_limiter import PerIPLimiter |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_conversion_gate_admits_up_to_max() -> None: |
| gate = ConversionGate(max_concurrent=2, max_queue=0) |
|
|
| async def _job() -> None: |
| async with gate.acquire(): |
| await asyncio.sleep(0.05) |
|
|
| await asyncio.gather(_job(), _job()) |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_conversion_gate_rejects_when_queue_full() -> None: |
| gate = ConversionGate(max_concurrent=1, max_queue=0) |
|
|
| async with gate.acquire(): |
| with pytest.raises(QueueFullError): |
| async with gate.acquire(): |
| pass |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_per_ip_limiter() -> None: |
| lim = PerIPLimiter(max_per_ip=1) |
| async with lim.acquire("1.2.3.4"): |
| with pytest.raises(TooManyRequestsError): |
| async with lim.acquire("1.2.3.4"): |
| pass |
| |
| async with lim.acquire("1.2.3.4"): |
| pass |
|
|
|
|
| @pytest.mark.asyncio |
| async def test_circuit_breaker_opens_on_failures() -> None: |
| cb = CircuitBreaker(failure_threshold=2, reset_timeout_sec=60) |
|
|
| async def _bad() -> None: |
| raise RuntimeError("boom") |
|
|
| with pytest.raises(RuntimeError): |
| await cb.call(_bad) |
| with pytest.raises(RuntimeError): |
| await cb.call(_bad) |
| with pytest.raises(CircuitOpenError): |
| await cb.call(_bad) |
| assert cb.state == "open" |
|
|