File size: 2,246 Bytes
dfcb128
 
 
 
 
 
 
ab403f1
dfcb128
 
ab403f1
dfcb128
 
ab403f1
dfcb128
 
ab403f1
dfcb128
ab403f1
dfcb128
 
 
 
 
 
 
 
 
ab403f1
dfcb128
 
ab403f1
 
dfcb128
 
 
 
ab403f1
dfcb128
ab403f1
dfcb128
 
 
 
 
 
 
 
 
ab403f1
dfcb128
ab403f1
dfcb128
ab403f1
dfcb128
ab403f1
dfcb128
 
 
 
 
 
 
ab403f1
dfcb128
 
ab403f1
dfcb128
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import time
from collections import defaultdict
import threading

class QueryRateLimiter:
    def __init__(self, max_queries_per_hour: int = 10):
        """
        Initialize rate limiter for global queries per hour.
        
        Args:
            max_queries_per_hour: Maximum number of queries allowed per hour globally
        """
        self.max_queries = max_queries_per_hour
        self.queries = []  # List of timestamps for all queries
        self.lock = threading.Lock()
    
    def is_allowed(self, _: str = None) -> bool:
        """
        Check if another query is allowed within the hourly limit.
        
        Returns:
            bool: True if query is allowed, False if rate limited
        """
        current_time = time.time()
        hour_ago = current_time - 3600  # 1 hour in seconds
        
        with self.lock:
            # Remove queries older than 1 hour
            self.queries = [t for t in self.queries if t > hour_ago]
            
            # Check if under rate limit
            if len(self.queries) < self.max_queries:
                self.queries.append(current_time)
                return True
            
            return False
    
    def get_remaining_queries(self, _: str = None) -> int:
        """
        Get number of remaining queries in the current hour.
        
        Returns:
            int: Number of remaining queries
        """
        current_time = time.time()
        hour_ago = current_time - 3600
        
        with self.lock:
            # Remove queries older than 1 hour
            self.queries = [t for t in self.queries if t > hour_ago]
            
            return self.max_queries - len(self.queries)
    
    def get_time_until_reset(self, _: str = None) -> float:
        """
        Get time in seconds until the rate limit resets.
        
        Returns:
            float: Seconds until rate limit reset
        """
        current_time = time.time()
        
        with self.lock:
            if not self.queries:
                return 0.0
            
            oldest_query = min(self.queries)
            reset_time = oldest_query + 3600  # 1 hour in seconds
            
            return max(0.0, reset_time - current_time)