Spaces:
Running
Running
Asish Karthikeya Gogineni
Refactor: Upgraded to Agentic Chatbot with AST & Call Graph support
5b89d45 | """ | |
| Smart Rate Limiter with Adaptive Delays and Caching | |
| Helps maximize chat usage within free tier limits | |
| """ | |
| import time | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime, timedelta | |
| from functools import lru_cache | |
| import hashlib | |
| logger = logging.getLogger(__name__) | |
| class RateLimiter: | |
| """ | |
| Adaptive rate limiter that: | |
| 1. Tracks API usage per provider | |
| 2. Implements smart delays | |
| 3. Caches responses for repeated queries | |
| 4. Provides usage statistics | |
| """ | |
| def __init__(self, provider: str = "gemini"): | |
| self.provider = provider | |
| self.request_times = [] | |
| self.token_usage = {"input": 0, "output": 0, "total": 0} | |
| self.last_request_time = None | |
| # Load configuration (with fallbacks if config file missing) | |
| try: | |
| import rate_limit_config as config | |
| except ImportError: | |
| # Use defaults if config not found | |
| class config: | |
| GEMINI_RPM = 15 | |
| GEMINI_MIN_DELAY = 2.0 | |
| GEMINI_BURST_DELAY = 8.0 | |
| GROQ_RPM = 30 | |
| GROQ_MIN_DELAY = 1.0 | |
| GROQ_BURST_DELAY = 10.0 | |
| ENABLE_CACHE = True | |
| CACHE_TTL = 300 | |
| # Provider-specific limits | |
| self.limits = { | |
| "gemini": { | |
| "rpm": config.GEMINI_RPM, | |
| "min_delay": config.GEMINI_MIN_DELAY, | |
| "burst_delay": config.GEMINI_BURST_DELAY, | |
| }, | |
| "groq": { | |
| "rpm": config.GROQ_RPM, | |
| "min_delay": config.GROQ_MIN_DELAY, | |
| "burst_delay": config.GROQ_BURST_DELAY, | |
| } | |
| } | |
| self.response_cache = {} if config.ENABLE_CACHE else None | |
| self.cache_ttl = config.CACHE_TTL | |
| def get_cache_key(self, query: str, context_hash: str = "") -> str: | |
| """Generate cache key for a query""" | |
| combined = f"{query}:{context_hash}" | |
| return hashlib.md5(combined.encode()).hexdigest() | |
| def get_cached_response(self, cache_key: str) -> Optional[Dict[str, Any]]: | |
| """Check if we have a cached response""" | |
| if self.response_cache is None: | |
| return None | |
| if cache_key in self.response_cache: | |
| cached_data, timestamp = self.response_cache[cache_key] | |
| if time.time() - timestamp < self.cache_ttl: | |
| logger.info(f"🎯 Cache hit! Saved an API call.") | |
| return cached_data | |
| else: | |
| # Expired, remove it | |
| del self.response_cache[cache_key] | |
| return None | |
| def cache_response(self, cache_key: str, response: Dict[str, Any]): | |
| """Cache a response""" | |
| if self.response_cache is None: | |
| return | |
| self.response_cache[cache_key] = (response, time.time()) | |
| # Keep cache size manageable | |
| if len(self.response_cache) > 100: | |
| # Remove oldest entries | |
| sorted_items = sorted(self.response_cache.items(), key=lambda x: x[1][1]) | |
| for key, _ in sorted_items[:20]: # Remove 20 oldest | |
| del self.response_cache[key] | |
| def calculate_smart_delay(self) -> float: | |
| """ | |
| Calculate optimal delay based on recent usage. | |
| Returns delay in seconds. | |
| """ | |
| config = self.limits.get(self.provider, self.limits["gemini"]) | |
| # Clean old request times (older than 1 minute) | |
| cutoff = time.time() - 60 | |
| self.request_times = [t for t in self.request_times if t > cutoff] | |
| # Check if we're approaching the rate limit | |
| requests_last_minute = len(self.request_times) | |
| if requests_last_minute >= config["rpm"] * 0.9: # 90% of limit | |
| logger.warning(f"⚠️ Approaching rate limit ({requests_last_minute}/{config['rpm']} RPM)") | |
| return config["burst_delay"] | |
| elif requests_last_minute >= config["rpm"] * 0.7: # 70% of limit | |
| return config["min_delay"] * 1.5 | |
| else: | |
| return config["min_delay"] | |
| def wait_if_needed(self): | |
| """ | |
| Smart wait that adapts to usage patterns. | |
| Only waits when necessary to avoid rate limits. | |
| """ | |
| if self.last_request_time is None: | |
| self.last_request_time = time.time() | |
| self.request_times.append(time.time()) | |
| return | |
| delay = self.calculate_smart_delay() | |
| elapsed = time.time() - self.last_request_time | |
| if elapsed < delay: | |
| wait_time = delay - elapsed | |
| logger.info(f"⏱️ Smart delay: waiting {wait_time:.1f}s to avoid rate limit...") | |
| time.sleep(wait_time) | |
| self.last_request_time = time.time() | |
| self.request_times.append(time.time()) | |
| def record_usage(self, input_tokens: int = 0, output_tokens: int = 0): | |
| """Track token usage for statistics""" | |
| self.token_usage["input"] += input_tokens | |
| self.token_usage["output"] += output_tokens | |
| self.token_usage["total"] += (input_tokens + output_tokens) | |
| def get_usage_stats(self) -> Dict[str, Any]: | |
| """Get current usage statistics""" | |
| cutoff = time.time() - 60 | |
| recent_requests = len([t for t in self.request_times if t > cutoff]) | |
| return { | |
| "provider": self.provider, | |
| "requests_last_minute": recent_requests, | |
| "total_tokens": self.token_usage["total"], | |
| "input_tokens": self.token_usage["input"], | |
| "output_tokens": self.token_usage["output"], | |
| "cache_size": len(self.response_cache) if self.response_cache else 0 | |
| } | |
| def reset_stats(self): | |
| """Reset usage statistics""" | |
| self.token_usage = {"input": 0, "output": 0, "total": 0} | |
| self.request_times = [] | |
| logger.info("📊 Usage statistics reset") | |
| # Global rate limiters (one per provider) | |
| _rate_limiters: Dict[str, RateLimiter] = {} | |
| def get_rate_limiter(provider: str) -> RateLimiter: | |
| """Get or create rate limiter for a provider""" | |
| if provider not in _rate_limiters: | |
| _rate_limiters[provider] = RateLimiter(provider) | |
| return _rate_limiters[provider] | |