Spaces:
Sleeping
Sleeping
| """ | |
| Rate Limiting Handler for Web Search API | |
| This module provides: | |
| 1. Exponential backoff retry logic | |
| 2. Request caching | |
| 3. Rate limit detection and handling | |
| 4. Request queuing | |
| """ | |
| import asyncio | |
| import time | |
| from typing import List, Dict, Optional, Callable | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| import threading | |
| class RateLimitConfig: | |
| max_requests_per_minute: int = 30 | |
| max_requests_per_hour: int = 500 | |
| retry_base_delay: float = 2.0 | |
| max_retry_attempts: int = 3 | |
| cache_ttl_seconds: int = 300 | |
| class RateLimiter: | |
| """Token bucket rate limiter""" | |
| def __init__(self, config: RateLimitConfig = None): | |
| self.config = config or RateLimitConfig() | |
| self.tokens = self.config.max_requests_per_minute | |
| self.last_update = datetime.now() | |
| self.lock = threading.Lock() | |
| def acquire(self) -> bool: | |
| with self.lock: | |
| now = datetime.now() | |
| elapsed = (now - self.last_update).total_seconds() | |
| # Refill tokens | |
| tokens_to_add = elapsed * (self.config.max_requests_per_minute / 60) | |
| self.tokens = min( | |
| self.config.max_requests_per_minute, self.tokens + tokens_to_add | |
| ) | |
| if self.tokens >= 1: | |
| self.tokens -= 1 | |
| self.last_update = now | |
| return True | |
| return False | |
| def wait_for_token(self, timeout: float = 60) -> bool: | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| if self.acquire(): | |
| return True | |
| time.sleep(0.1) | |
| return False | |
| class RequestCache: | |
| """Simple TTL-based cache""" | |
| def __init__(self, ttl_seconds: int = 300): | |
| self.ttl = ttl_seconds | |
| self._cache: Dict[str, tuple] = {} | |
| self.lock = threading.Lock() | |
| def get(self, key: str) -> Optional[List[Dict]]: | |
| with self.lock: | |
| if key in self._cache: | |
| data, timestamp = self._cache[key] | |
| if (datetime.now() - timestamp).total_seconds() < self.ttl: | |
| return data | |
| del self._cache[key] | |
| return None | |
| def set(self, key: str, data: List[Dict]): | |
| with self.lock: | |
| self._cache[key] = (data, datetime.now()) | |
| # Clean old entries if cache is too large | |
| if len(self._cache) > 100: | |
| oldest = sorted(self._cache.items(), key=lambda x: x[1][1])[:10] | |
| for k, _ in oldest: | |
| del self._cache[k] | |
| def clear(self): | |
| with self.lock: | |
| self._cache.clear() | |
| class RateLimitedWebSearch: | |
| """Web search with rate limiting and caching""" | |
| def __init__(self, search_func: Callable, config: RateLimitConfig = None): | |
| self.config = config or RateLimitConfig() | |
| self.rate_limiter = RateLimiter(self.config) | |
| self.cache = RequestCache(self.config.cache_ttl_seconds) | |
| self.search_func = search_func | |
| async def search( | |
| self, query: str, max_results: int = 5, use_cache: bool = True | |
| ) -> List[Dict]: | |
| cache_key = f"{query}_{max_results}" | |
| # Check cache | |
| if use_cache: | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| # Wait for rate limit | |
| if not self.rate_limiter.wait_for_token(): | |
| return [] | |
| # Retry with exponential backoff | |
| for attempt in range(self.config.max_retry_attempts): | |
| try: | |
| results = await self.search_func(query, max_results) | |
| if results: | |
| if use_cache: | |
| self.cache.set(cache_key, results) | |
| return results | |
| # If empty results, might be rate limited | |
| await asyncio.sleep(self.config.retry_base_delay * (attempt + 1)) | |
| except Exception as e: | |
| if attempt < self.config.max_retry_attempts - 1: | |
| await asyncio.sleep(self.config.retry_base_delay * (2**attempt)) | |
| continue | |
| return [] | |
| def get_cache_stats(self) -> Dict: | |
| return { | |
| "cached_items": len(self.cache._cache), | |
| "ttl_seconds": self.config.cache_ttl_seconds, | |
| "rate_limit_per_minute": self.config.max_requests_per_minute, | |
| } | |
| def clear_cache(self): | |
| self.cache.clear() | |
| # Example usage with Serper API | |
| async def serper_search(query: str, max_results: int = 5) -> List[Dict]: | |
| """Example Serper API search function""" | |
| import aiohttp | |
| api_key = "92dc65b1fe92ca96ece7d0b02729f2d29f68f4fda5e31908e8d447a808e9797f" | |
| url = "https://serpapi.com/search" | |
| params = { | |
| "engine": "google", | |
| "q": query, | |
| "api_key": api_key, | |
| "num": max_results, | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| results = data.get("organic_results", []) | |
| return [ | |
| { | |
| "title": r.get("title", ""), | |
| "url": r.get("link", ""), | |
| "snippet": r.get("snippet", ""), | |
| "score": 0.8, | |
| } | |
| for r in results[:max_results] | |
| ] | |
| return [] | |
| # Create rate-limited instance | |
| rate_limited_search = RateLimitedWebSearch(serper_search) | |
| if __name__ == "__main__": | |
| async def test(): | |
| # Test the rate-limited search | |
| results = await rate_limited_search.search("Python programming", 3) | |
| print(f"Found {len(results)} results") | |
| print(f"Cache stats: {rate_limited_search.get_cache_stats()}") | |
| # Test cache hit | |
| results2 = await rate_limited_search.search("Python programming", 3) | |
| print(f"Cache hit: {len(results2)} results") | |
| asyncio.run(test()) | |