# Phase 17: Rate Limiting with `limits` Library **Priority**: P0 CRITICAL - Prevents API blocks **Effort**: ~1 hour **Dependencies**: None --- ## CRITICAL: Async Safety Requirements **WARNING**: The rate limiter MUST be async-safe. Blocking the event loop will freeze: - The Gradio UI - All parallel searches - The orchestrator **Rules**: 1. **NEVER use `time.sleep()`** - Always use `await asyncio.sleep()` 2. **NEVER use blocking while loops** - Use async-aware polling 3. **The `limits` library check is synchronous** - Wrap it carefully The implementation below uses a polling pattern that: - Checks the limit (synchronous, fast) - If exceeded, `await asyncio.sleep()` (non-blocking) - Retry the check **Alternative**: If `limits` proves problematic, use `aiolimiter` which is pure-async. --- ## Overview Replace naive `asyncio.sleep` rate limiting with proper rate limiter using the `limits` library, which provides: - Moving window rate limiting - Per-API configurable limits - Thread-safe storage - Already used in reference repo **Why This Matters?** - NCBI will block us without proper rate limiting (3/sec without key, 10/sec with) - Current implementation only has simple sleep delay - Need coordinated limits across all PubMed calls - Professional-grade rate limiting prevents production issues --- ## Current State ### What We Have (`src/tools/pubmed.py:20-21, 34-41`) ```python RATE_LIMIT_DELAY = 0.34 # ~3 requests/sec without API key async def _rate_limit(self) -> None: """Enforce NCBI rate limiting.""" loop = asyncio.get_running_loop() now = loop.time() elapsed = now - self._last_request_time if elapsed < self.RATE_LIMIT_DELAY: await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed) self._last_request_time = loop.time() ``` ### Problems 1. **Not shared across instances**: Each `PubMedTool()` has its own counter 2. **Simple delay vs moving window**: Doesn't handle bursts properly 3. **Hardcoded rate**: Doesn't adapt to API key presence 4. **No backoff on 429**: Just retries blindly --- ## TDD Implementation Plan ### Step 1: Add Dependency **File**: `pyproject.toml` ```toml dependencies = [ # ... existing deps ... "limits>=3.0", ] ``` Then run: ```bash uv sync ``` --- ### Step 2: Write the Tests First **File**: `tests/unit/tools/test_rate_limiting.py` ```python """Tests for rate limiting functionality.""" import asyncio import time import pytest from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter class TestRateLimiter: """Test suite for rate limiter.""" def test_create_limiter_without_api_key(self) -> None: """Should create 3/sec limiter without API key.""" limiter = RateLimiter(rate="3/second") assert limiter.rate == "3/second" def test_create_limiter_with_api_key(self) -> None: """Should create 10/sec limiter with API key.""" limiter = RateLimiter(rate="10/second") assert limiter.rate == "10/second" @pytest.mark.asyncio async def test_limiter_allows_requests_under_limit(self) -> None: """Should allow requests under the rate limit.""" limiter = RateLimiter(rate="10/second") # 3 requests should all succeed immediately for _ in range(3): allowed = await limiter.acquire() assert allowed is True @pytest.mark.asyncio async def test_limiter_blocks_when_exceeded(self) -> None: """Should wait when rate limit exceeded.""" limiter = RateLimiter(rate="2/second") # First 2 should be instant await limiter.acquire() await limiter.acquire() # Third should block briefly start = time.monotonic() await limiter.acquire() elapsed = time.monotonic() - start # Should have waited ~0.5 seconds (half second window for 2/sec) assert elapsed >= 0.3 @pytest.mark.asyncio async def test_limiter_resets_after_window(self) -> None: """Rate limit should reset after time window.""" limiter = RateLimiter(rate="5/second") # Use up the limit for _ in range(5): await limiter.acquire() # Wait for window to pass await asyncio.sleep(1.1) # Should be allowed again start = time.monotonic() await limiter.acquire() elapsed = time.monotonic() - start assert elapsed < 0.1 # Should be nearly instant class TestGetPubmedLimiter: """Test PubMed-specific limiter factory.""" def test_limiter_without_api_key(self) -> None: """Should return 3/sec limiter without key.""" limiter = get_pubmed_limiter(api_key=None) assert "3" in limiter.rate def test_limiter_with_api_key(self) -> None: """Should return 10/sec limiter with key.""" limiter = get_pubmed_limiter(api_key="my-api-key") assert "10" in limiter.rate def test_limiter_is_singleton(self) -> None: """Same API key should return same limiter instance.""" limiter1 = get_pubmed_limiter(api_key="key1") limiter2 = get_pubmed_limiter(api_key="key1") assert limiter1 is limiter2 def test_different_keys_different_limiters(self) -> None: """Different API keys should return different limiters.""" limiter1 = get_pubmed_limiter(api_key="key1") limiter2 = get_pubmed_limiter(api_key="key2") # Clear cache for clean test # Actually, different keys SHOULD share the same limiter # since we're limiting against the same API assert limiter1 is limiter2 # Shared NCBI rate limit ``` --- ### Step 3: Create Rate Limiter Module **File**: `src/tools/rate_limiter.py` ```python """Rate limiting utilities using the limits library.""" import asyncio from typing import ClassVar from limits import RateLimitItem, parse from limits.storage import MemoryStorage from limits.strategies import MovingWindowRateLimiter class RateLimiter: """ Async-compatible rate limiter using limits library. Uses moving window algorithm for smooth rate limiting. """ def __init__(self, rate: str) -> None: """ Initialize rate limiter. Args: rate: Rate string like "3/second" or "10/second" """ self.rate = rate self._storage = MemoryStorage() self._limiter = MovingWindowRateLimiter(self._storage) self._rate_limit: RateLimitItem = parse(rate) self._identity = "default" # Single identity for shared limiting async def acquire(self, wait: bool = True) -> bool: """ Acquire permission to make a request. ASYNC-SAFE: Uses asyncio.sleep(), never time.sleep(). The polling pattern allows other coroutines to run while waiting. Args: wait: If True, wait until allowed. If False, return immediately. Returns: True if allowed, False if not (only when wait=False) """ while True: # Check if we can proceed (synchronous, fast - ~microseconds) if self._limiter.hit(self._rate_limit, self._identity): return True if not wait: return False # CRITICAL: Use asyncio.sleep(), NOT time.sleep() # This yields control to the event loop, allowing other # coroutines (UI, parallel searches) to run await asyncio.sleep(0.1) def reset(self) -> None: """Reset the rate limiter (for testing).""" self._storage.reset() # Singleton limiter for PubMed/NCBI _pubmed_limiter: RateLimiter | None = None def get_pubmed_limiter(api_key: str | None = None) -> RateLimiter: """ Get the shared PubMed rate limiter. Rate depends on whether API key is provided: - Without key: 3 requests/second - With key: 10 requests/second Args: api_key: NCBI API key (optional) Returns: Shared RateLimiter instance """ global _pubmed_limiter if _pubmed_limiter is None: rate = "10/second" if api_key else "3/second" _pubmed_limiter = RateLimiter(rate) return _pubmed_limiter def reset_pubmed_limiter() -> None: """Reset the PubMed limiter (for testing).""" global _pubmed_limiter _pubmed_limiter = None # Factory for other APIs class RateLimiterFactory: """Factory for creating/getting rate limiters for different APIs.""" _limiters: ClassVar[dict[str, RateLimiter]] = {} @classmethod def get(cls, api_name: str, rate: str) -> RateLimiter: """ Get or create a rate limiter for an API. Args: api_name: Unique identifier for the API rate: Rate limit string (e.g., "10/second") Returns: RateLimiter instance (shared for same api_name) """ if api_name not in cls._limiters: cls._limiters[api_name] = RateLimiter(rate) return cls._limiters[api_name] @classmethod def reset_all(cls) -> None: """Reset all limiters (for testing).""" cls._limiters.clear() ``` --- ### Step 4: Update PubMed Tool **File**: `src/tools/pubmed.py` (replace rate limiting code) ```python # Replace imports and rate limiting from src.tools.rate_limiter import get_pubmed_limiter class PubMedTool: """Search tool for PubMed/NCBI.""" BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" HTTP_TOO_MANY_REQUESTS = 429 def __init__(self, api_key: str | None = None) -> None: self.api_key = api_key or settings.ncbi_api_key if self.api_key == "your-ncbi-key-here": self.api_key = None # Use shared rate limiter self._limiter = get_pubmed_limiter(self.api_key) async def _rate_limit(self) -> None: """Enforce NCBI rate limiting using shared limiter.""" await self._limiter.acquire() # ... rest of class unchanged ... ``` --- ### Step 5: Add Rate Limiters for Other APIs **File**: `src/tools/clinicaltrials.py` (optional) ```python from src.tools.rate_limiter import RateLimiterFactory class ClinicalTrialsTool: def __init__(self) -> None: # ClinicalTrials.gov doesn't document limits, but be conservative self._limiter = RateLimiterFactory.get("clinicaltrials", "5/second") async def search(self, query: str, max_results: int = 10) -> list[Evidence]: await self._limiter.acquire() # ... rest of method ... ``` **File**: `src/tools/europepmc.py` (optional) ```python from src.tools.rate_limiter import RateLimiterFactory class EuropePMCTool: def __init__(self) -> None: # Europe PMC is generous, but still be respectful self._limiter = RateLimiterFactory.get("europepmc", "10/second") async def search(self, query: str, max_results: int = 10) -> list[Evidence]: await self._limiter.acquire() # ... rest of method ... ``` --- ## Demo Script **File**: `examples/rate_limiting_demo.py` ```python #!/usr/bin/env python3 """Demo script to verify rate limiting works correctly.""" import asyncio import time from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter, reset_pubmed_limiter from src.tools.pubmed import PubMedTool async def test_basic_limiter(): """Test basic rate limiter behavior.""" print("=" * 60) print("Rate Limiting Demo") print("=" * 60) # Test 1: Basic limiter print("\n[Test 1] Testing 3/second limiter...") limiter = RateLimiter("3/second") start = time.monotonic() for i in range(6): await limiter.acquire() elapsed = time.monotonic() - start print(f" Request {i+1} at {elapsed:.2f}s") total = time.monotonic() - start print(f" Total time for 6 requests: {total:.2f}s (expected ~2s)") async def test_pubmed_limiter(): """Test PubMed-specific limiter.""" print("\n[Test 2] Testing PubMed limiter (shared)...") reset_pubmed_limiter() # Clean state # Without API key: 3/sec limiter = get_pubmed_limiter(api_key=None) print(f" Rate without key: {limiter.rate}") # Multiple tools should share the same limiter tool1 = PubMedTool() tool2 = PubMedTool() # Verify they share the limiter print(f" Tools share limiter: {tool1._limiter is tool2._limiter}") async def test_concurrent_requests(): """Test rate limiting under concurrent load.""" print("\n[Test 3] Testing concurrent request limiting...") limiter = RateLimiter("5/second") async def make_request(i: int): await limiter.acquire() return time.monotonic() start = time.monotonic() # Launch 10 concurrent requests tasks = [make_request(i) for i in range(10)] times = await asyncio.gather(*tasks) # Calculate distribution relative_times = [t - start for t in times] print(f" Request times: {[f'{t:.2f}s' for t in sorted(relative_times)]}") total = max(relative_times) print(f" All 10 requests completed in {total:.2f}s (expected ~2s)") async def main(): await test_basic_limiter() await test_pubmed_limiter() await test_concurrent_requests() print("\n" + "=" * 60) print("Demo complete!") if __name__ == "__main__": asyncio.run(main()) ``` --- ## Verification Checklist ### Unit Tests ```bash # Run rate limiting tests uv run pytest tests/unit/tools/test_rate_limiting.py -v # Expected: All tests pass ``` ### Integration Test (Manual) ```bash # Run demo uv run python examples/rate_limiting_demo.py # Expected: Requests properly spaced ``` ### Full Test Suite ```bash make check # Expected: All tests pass, mypy clean ``` --- ## Success Criteria 1. **`limits` library installed**: Dependency added to pyproject.toml 2. **RateLimiter class works**: Can create and use limiters 3. **PubMed uses new limiter**: Shared limiter across instances 4. **Rate adapts to API key**: 3/sec without, 10/sec with 5. **Concurrent requests handled**: Multiple async requests properly queued 6. **No regressions**: All existing tests pass --- ## API Rate Limit Reference | API | Without Key | With Key | |-----|-------------|----------| | PubMed/NCBI | 3/sec | 10/sec | | ClinicalTrials.gov | Undocumented (~5/sec safe) | N/A | | Europe PMC | ~10-20/sec (generous) | N/A | | OpenAlex | ~100k/day (no per-sec limit) | Faster with `mailto` | --- ## Notes - `limits` library uses moving window algorithm (fairer than fixed window) - Singleton pattern ensures all PubMed calls share the limit - The factory pattern allows easy extension to other APIs - Consider adding 429 response detection + exponential backoff - In production, consider Redis storage for distributed rate limiting