| # Phase 17: Rate Limiting with `limits` Library |
|
|
| **Priority**: P0 CRITICAL - Prevents API blocks |
| **Effort**: ~1 hour |
| **Dependencies**: None |
|
|
| --- |
|
|
| ## CRITICAL: Async Safety Requirements |
|
|
| **WARNING**: The rate limiter MUST be async-safe. Blocking the event loop will freeze: |
| - The Gradio UI |
| - All parallel searches |
| - The orchestrator |
|
|
| **Rules**: |
| 1. **NEVER use `time.sleep()`** - Always use `await asyncio.sleep()` |
| 2. **NEVER use blocking while loops** - Use async-aware polling |
| 3. **The `limits` library check is synchronous** - Wrap it carefully |
|
|
| The implementation below uses a polling pattern that: |
| - Checks the limit (synchronous, fast) |
| - If exceeded, `await asyncio.sleep()` (non-blocking) |
| - Retry the check |
|
|
| **Alternative**: If `limits` proves problematic, use `aiolimiter` which is pure-async. |
|
|
| --- |
|
|
| ## Overview |
|
|
| Replace naive `asyncio.sleep` rate limiting with proper rate limiter using the `limits` library, which provides: |
| - Moving window rate limiting |
| - Per-API configurable limits |
| - Thread-safe storage |
| - Already used in reference repo |
|
|
| **Why This Matters?** |
| - NCBI will block us without proper rate limiting (3/sec without key, 10/sec with) |
| - Current implementation only has simple sleep delay |
| - Need coordinated limits across all PubMed calls |
| - Professional-grade rate limiting prevents production issues |
|
|
| --- |
|
|
| ## Current State |
|
|
| ### What We Have (`src/tools/pubmed.py:20-21, 34-41`) |
|
|
| ```python |
| RATE_LIMIT_DELAY = 0.34 # ~3 requests/sec without API key |
| |
| async def _rate_limit(self) -> None: |
| """Enforce NCBI rate limiting.""" |
| loop = asyncio.get_running_loop() |
| now = loop.time() |
| elapsed = now - self._last_request_time |
| if elapsed < self.RATE_LIMIT_DELAY: |
| await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed) |
| self._last_request_time = loop.time() |
| ``` |
|
|
| ### Problems |
|
|
| 1. **Not shared across instances**: Each `PubMedTool()` has its own counter |
| 2. **Simple delay vs moving window**: Doesn't handle bursts properly |
| 3. **Hardcoded rate**: Doesn't adapt to API key presence |
| 4. **No backoff on 429**: Just retries blindly |
|
|
| --- |
|
|
| ## TDD Implementation Plan |
|
|
| ### Step 1: Add Dependency |
|
|
| **File**: `pyproject.toml` |
|
|
| ```toml |
| dependencies = [ |
| # ... existing deps ... |
| "limits>=3.0", |
| ] |
| ``` |
|
|
| Then run: |
| ```bash |
| uv sync |
| ``` |
|
|
| --- |
|
|
| ### Step 2: Write the Tests First |
|
|
| **File**: `tests/unit/tools/test_rate_limiting.py` |
|
|
| ```python |
| """Tests for rate limiting functionality.""" |
| |
| import asyncio |
| import time |
| |
| import pytest |
| |
| from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter |
| |
| |
| class TestRateLimiter: |
| """Test suite for rate limiter.""" |
| |
| def test_create_limiter_without_api_key(self) -> None: |
| """Should create 3/sec limiter without API key.""" |
| limiter = RateLimiter(rate="3/second") |
| assert limiter.rate == "3/second" |
| |
| def test_create_limiter_with_api_key(self) -> None: |
| """Should create 10/sec limiter with API key.""" |
| limiter = RateLimiter(rate="10/second") |
| assert limiter.rate == "10/second" |
| |
| @pytest.mark.asyncio |
| async def test_limiter_allows_requests_under_limit(self) -> None: |
| """Should allow requests under the rate limit.""" |
| limiter = RateLimiter(rate="10/second") |
| |
| # 3 requests should all succeed immediately |
| for _ in range(3): |
| allowed = await limiter.acquire() |
| assert allowed is True |
| |
| @pytest.mark.asyncio |
| async def test_limiter_blocks_when_exceeded(self) -> None: |
| """Should wait when rate limit exceeded.""" |
| limiter = RateLimiter(rate="2/second") |
| |
| # First 2 should be instant |
| await limiter.acquire() |
| await limiter.acquire() |
| |
| # Third should block briefly |
| start = time.monotonic() |
| await limiter.acquire() |
| elapsed = time.monotonic() - start |
| |
| # Should have waited ~0.5 seconds (half second window for 2/sec) |
| assert elapsed >= 0.3 |
| |
| @pytest.mark.asyncio |
| async def test_limiter_resets_after_window(self) -> None: |
| """Rate limit should reset after time window.""" |
| limiter = RateLimiter(rate="5/second") |
| |
| # Use up the limit |
| for _ in range(5): |
| await limiter.acquire() |
| |
| # Wait for window to pass |
| await asyncio.sleep(1.1) |
| |
| # Should be allowed again |
| start = time.monotonic() |
| await limiter.acquire() |
| elapsed = time.monotonic() - start |
| |
| assert elapsed < 0.1 # Should be nearly instant |
| |
| |
| class TestGetPubmedLimiter: |
| """Test PubMed-specific limiter factory.""" |
| |
| def test_limiter_without_api_key(self) -> None: |
| """Should return 3/sec limiter without key.""" |
| limiter = get_pubmed_limiter(api_key=None) |
| assert "3" in limiter.rate |
| |
| def test_limiter_with_api_key(self) -> None: |
| """Should return 10/sec limiter with key.""" |
| limiter = get_pubmed_limiter(api_key="my-api-key") |
| assert "10" in limiter.rate |
| |
| def test_limiter_is_singleton(self) -> None: |
| """Same API key should return same limiter instance.""" |
| limiter1 = get_pubmed_limiter(api_key="key1") |
| limiter2 = get_pubmed_limiter(api_key="key1") |
| assert limiter1 is limiter2 |
| |
| def test_different_keys_different_limiters(self) -> None: |
| """Different API keys should return different limiters.""" |
| limiter1 = get_pubmed_limiter(api_key="key1") |
| limiter2 = get_pubmed_limiter(api_key="key2") |
| # Clear cache for clean test |
| # Actually, different keys SHOULD share the same limiter |
| # since we're limiting against the same API |
| assert limiter1 is limiter2 # Shared NCBI rate limit |
| ``` |
|
|
| --- |
|
|
| ### Step 3: Create Rate Limiter Module |
|
|
| **File**: `src/tools/rate_limiter.py` |
|
|
| ```python |
| """Rate limiting utilities using the limits library.""" |
| |
| import asyncio |
| from typing import ClassVar |
| |
| from limits import RateLimitItem, parse |
| from limits.storage import MemoryStorage |
| from limits.strategies import MovingWindowRateLimiter |
| |
| |
| class RateLimiter: |
| """ |
| Async-compatible rate limiter using limits library. |
| |
| Uses moving window algorithm for smooth rate limiting. |
| """ |
| |
| def __init__(self, rate: str) -> None: |
| """ |
| Initialize rate limiter. |
| |
| Args: |
| rate: Rate string like "3/second" or "10/second" |
| """ |
| self.rate = rate |
| self._storage = MemoryStorage() |
| self._limiter = MovingWindowRateLimiter(self._storage) |
| self._rate_limit: RateLimitItem = parse(rate) |
| self._identity = "default" # Single identity for shared limiting |
| |
| async def acquire(self, wait: bool = True) -> bool: |
| """ |
| Acquire permission to make a request. |
| |
| ASYNC-SAFE: Uses asyncio.sleep(), never time.sleep(). |
| The polling pattern allows other coroutines to run while waiting. |
| |
| Args: |
| wait: If True, wait until allowed. If False, return immediately. |
| |
| Returns: |
| True if allowed, False if not (only when wait=False) |
| """ |
| while True: |
| # Check if we can proceed (synchronous, fast - ~microseconds) |
| if self._limiter.hit(self._rate_limit, self._identity): |
| return True |
| |
| if not wait: |
| return False |
| |
| # CRITICAL: Use asyncio.sleep(), NOT time.sleep() |
| # This yields control to the event loop, allowing other |
| # coroutines (UI, parallel searches) to run |
| await asyncio.sleep(0.1) |
| |
| def reset(self) -> None: |
| """Reset the rate limiter (for testing).""" |
| self._storage.reset() |
| |
| |
| # Singleton limiter for PubMed/NCBI |
| _pubmed_limiter: RateLimiter | None = None |
| |
| |
| def get_pubmed_limiter(api_key: str | None = None) -> RateLimiter: |
| """ |
| Get the shared PubMed rate limiter. |
| |
| Rate depends on whether API key is provided: |
| - Without key: 3 requests/second |
| - With key: 10 requests/second |
| |
| Args: |
| api_key: NCBI API key (optional) |
| |
| Returns: |
| Shared RateLimiter instance |
| """ |
| global _pubmed_limiter |
| |
| if _pubmed_limiter is None: |
| rate = "10/second" if api_key else "3/second" |
| _pubmed_limiter = RateLimiter(rate) |
| |
| return _pubmed_limiter |
| |
| |
| def reset_pubmed_limiter() -> None: |
| """Reset the PubMed limiter (for testing).""" |
| global _pubmed_limiter |
| _pubmed_limiter = None |
| |
| |
| # Factory for other APIs |
| class RateLimiterFactory: |
| """Factory for creating/getting rate limiters for different APIs.""" |
| |
| _limiters: ClassVar[dict[str, RateLimiter]] = {} |
| |
| @classmethod |
| def get(cls, api_name: str, rate: str) -> RateLimiter: |
| """ |
| Get or create a rate limiter for an API. |
| |
| Args: |
| api_name: Unique identifier for the API |
| rate: Rate limit string (e.g., "10/second") |
| |
| Returns: |
| RateLimiter instance (shared for same api_name) |
| """ |
| if api_name not in cls._limiters: |
| cls._limiters[api_name] = RateLimiter(rate) |
| return cls._limiters[api_name] |
| |
| @classmethod |
| def reset_all(cls) -> None: |
| """Reset all limiters (for testing).""" |
| cls._limiters.clear() |
| ``` |
|
|
| --- |
|
|
| ### Step 4: Update PubMed Tool |
|
|
| **File**: `src/tools/pubmed.py` (replace rate limiting code) |
|
|
| ```python |
| # Replace imports and rate limiting |
| |
| from src.tools.rate_limiter import get_pubmed_limiter |
| |
| |
| class PubMedTool: |
| """Search tool for PubMed/NCBI.""" |
| |
| BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" |
| HTTP_TOO_MANY_REQUESTS = 429 |
| |
| def __init__(self, api_key: str | None = None) -> None: |
| self.api_key = api_key or settings.ncbi_api_key |
| if self.api_key == "your-ncbi-key-here": |
| self.api_key = None |
| # Use shared rate limiter |
| self._limiter = get_pubmed_limiter(self.api_key) |
| |
| async def _rate_limit(self) -> None: |
| """Enforce NCBI rate limiting using shared limiter.""" |
| await self._limiter.acquire() |
| |
| # ... rest of class unchanged ... |
| ``` |
|
|
| --- |
|
|
| ### Step 5: Add Rate Limiters for Other APIs |
|
|
| **File**: `src/tools/clinicaltrials.py` (optional) |
|
|
| ```python |
| from src.tools.rate_limiter import RateLimiterFactory |
| |
| |
| class ClinicalTrialsTool: |
| def __init__(self) -> None: |
| # ClinicalTrials.gov doesn't document limits, but be conservative |
| self._limiter = RateLimiterFactory.get("clinicaltrials", "5/second") |
| |
| async def search(self, query: str, max_results: int = 10) -> list[Evidence]: |
| await self._limiter.acquire() |
| # ... rest of method ... |
| ``` |
|
|
| **File**: `src/tools/europepmc.py` (optional) |
|
|
| ```python |
| from src.tools.rate_limiter import RateLimiterFactory |
| |
| |
| class EuropePMCTool: |
| def __init__(self) -> None: |
| # Europe PMC is generous, but still be respectful |
| self._limiter = RateLimiterFactory.get("europepmc", "10/second") |
| |
| async def search(self, query: str, max_results: int = 10) -> list[Evidence]: |
| await self._limiter.acquire() |
| # ... rest of method ... |
| ``` |
|
|
| --- |
|
|
| ## Demo Script |
|
|
| **File**: `examples/rate_limiting_demo.py` |
|
|
| ```python |
| #!/usr/bin/env python3 |
| """Demo script to verify rate limiting works correctly.""" |
| |
| import asyncio |
| import time |
| |
| from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter, reset_pubmed_limiter |
| from src.tools.pubmed import PubMedTool |
| |
| |
| async def test_basic_limiter(): |
| """Test basic rate limiter behavior.""" |
| print("=" * 60) |
| print("Rate Limiting Demo") |
| print("=" * 60) |
| |
| # Test 1: Basic limiter |
| print("\n[Test 1] Testing 3/second limiter...") |
| limiter = RateLimiter("3/second") |
| |
| start = time.monotonic() |
| for i in range(6): |
| await limiter.acquire() |
| elapsed = time.monotonic() - start |
| print(f" Request {i+1} at {elapsed:.2f}s") |
| |
| total = time.monotonic() - start |
| print(f" Total time for 6 requests: {total:.2f}s (expected ~2s)") |
| |
| |
| async def test_pubmed_limiter(): |
| """Test PubMed-specific limiter.""" |
| print("\n[Test 2] Testing PubMed limiter (shared)...") |
| |
| reset_pubmed_limiter() # Clean state |
| |
| # Without API key: 3/sec |
| limiter = get_pubmed_limiter(api_key=None) |
| print(f" Rate without key: {limiter.rate}") |
| |
| # Multiple tools should share the same limiter |
| tool1 = PubMedTool() |
| tool2 = PubMedTool() |
| |
| # Verify they share the limiter |
| print(f" Tools share limiter: {tool1._limiter is tool2._limiter}") |
| |
| |
| async def test_concurrent_requests(): |
| """Test rate limiting under concurrent load.""" |
| print("\n[Test 3] Testing concurrent request limiting...") |
| |
| limiter = RateLimiter("5/second") |
| |
| async def make_request(i: int): |
| await limiter.acquire() |
| return time.monotonic() |
| |
| start = time.monotonic() |
| # Launch 10 concurrent requests |
| tasks = [make_request(i) for i in range(10)] |
| times = await asyncio.gather(*tasks) |
| |
| # Calculate distribution |
| relative_times = [t - start for t in times] |
| print(f" Request times: {[f'{t:.2f}s' for t in sorted(relative_times)]}") |
| |
| total = max(relative_times) |
| print(f" All 10 requests completed in {total:.2f}s (expected ~2s)") |
| |
| |
| async def main(): |
| await test_basic_limiter() |
| await test_pubmed_limiter() |
| await test_concurrent_requests() |
| |
| print("\n" + "=" * 60) |
| print("Demo complete!") |
| |
| |
| if __name__ == "__main__": |
| asyncio.run(main()) |
| ``` |
|
|
| --- |
|
|
| ## Verification Checklist |
|
|
| ### Unit Tests |
| ```bash |
| # Run rate limiting tests |
| uv run pytest tests/unit/tools/test_rate_limiting.py -v |
| |
| # Expected: All tests pass |
| ``` |
|
|
| ### Integration Test (Manual) |
| ```bash |
| # Run demo |
| uv run python examples/rate_limiting_demo.py |
| |
| # Expected: Requests properly spaced |
| ``` |
|
|
| ### Full Test Suite |
| ```bash |
| make check |
| # Expected: All tests pass, mypy clean |
| ``` |
|
|
| --- |
|
|
| ## Success Criteria |
|
|
| 1. **`limits` library installed**: Dependency added to pyproject.toml |
| 2. **RateLimiter class works**: Can create and use limiters |
| 3. **PubMed uses new limiter**: Shared limiter across instances |
| 4. **Rate adapts to API key**: 3/sec without, 10/sec with |
| 5. **Concurrent requests handled**: Multiple async requests properly queued |
| 6. **No regressions**: All existing tests pass |
|
|
| --- |
|
|
| ## API Rate Limit Reference |
|
|
| | API | Without Key | With Key | |
| |-----|-------------|----------| |
| | PubMed/NCBI | 3/sec | 10/sec | |
| | ClinicalTrials.gov | Undocumented (~5/sec safe) | N/A | |
| | Europe PMC | ~10-20/sec (generous) | N/A | |
| | OpenAlex | ~100k/day (no per-sec limit) | Faster with `mailto` | |
|
|
| --- |
|
|
| ## Notes |
|
|
| - `limits` library uses moving window algorithm (fairer than fixed window) |
| - Singleton pattern ensures all PubMed calls share the limit |
| - The factory pattern allows easy extension to other APIs |
| - Consider adding 429 response detection + exponential backoff |
| - In production, consider Redis storage for distributed rate limiting |
|
|