Spaces:
Sleeping
Sleeping
| """ | |
| SecureCodeEnv Task: Rate Limiter | |
| Difficulty: Medium | |
| CWE Targets: CWE-770 (Allocation Without Limits), CWE-400 (Uncontrolled Resource Consumption) | |
| """ | |
| TASK = { | |
| "id": "medium_rate_limiter", | |
| "difficulty": "medium", | |
| "cwe_targets": ["CWE-770", "CWE-400"], | |
| "component_name": "RateLimiter", | |
| "filename": "src/middleware/rate_limiter.py", | |
| "problem_statement": """Write a Python class: | |
| `class RateLimiter:` | |
| A thread-safe token bucket rate limiter. | |
| REQUIREMENTS: | |
| - `__init__(self, max_requests: int, window_seconds: int)` | |
| - max_requests: max allowed calls in the time window | |
| - window_seconds: size of the sliding window in seconds | |
| - `is_allowed(self, client_id: str) -> bool` | |
| - Returns True if the client is under the rate limit | |
| - Returns False if the client has exceeded max_requests in window_seconds | |
| - Each client_id is tracked independently | |
| - Must be thread-safe (use threading.Lock) | |
| - Must use time-based sliding window — NOT a fixed counter reset | |
| - `get_remaining(self, client_id: str) -> int` | |
| - Returns how many requests the client can still make in the current window | |
| MUST handle concurrent requests correctly — no race conditions.""", | |
| "starter_code": '''class RateLimiter: | |
| def __init__(self, max_requests: int, window_seconds: int): | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self.counts = {} # NOT thread-safe! | |
| def is_allowed(self, client_id: str) -> bool: | |
| # TODO: Implement with proper sliding window and thread safety | |
| count = self.counts.get(client_id, 0) | |
| self.counts[client_id] = count + 1 | |
| return count < self.max_requests | |
| def get_remaining(self, client_id: str) -> int: | |
| count = self.counts.get(client_id, 0) | |
| return max(0, self.max_requests - count) | |
| ''', | |
| "test_cases": [ | |
| {"fn_class": "RateLimiter", "init_args": [5, 60], "method": "is_allowed", "input": ["user1"], "expected": True, "description": "First request allowed"}, | |
| {"fn_class": "RateLimiter", "init_args": [2, 60], "method": "is_allowed_multi", "calls": 3, "input": ["user1"], "expected_last": False, "description": "Third request blocked when limit is 2"}, | |
| {"fn_class": "RateLimiter", "init_args": [5, 60], "method": "get_remaining", "input": ["new_client"], "expected": 5, "description": "New client has full remaining"}, | |
| {"fn_class": "RateLimiter", "init_args": [3, 60], "method": "independent_clients", "description": "Different client IDs are tracked independently"}, | |
| ], | |
| "attack_type": "none", | |
| "security_checks": [ | |
| {"type": "uses_threading_lock", "required": ["threading.Lock", "threading.RLock"]}, | |
| {"type": "uses_time", "required": ["time.time", "time.monotonic"]}, | |
| ], | |
| "naive_code": '''class RateLimiter: | |
| def __init__(self, max_requests, window_seconds): | |
| self.max_requests = max_requests | |
| self.counts = {} | |
| def is_allowed(self, client_id): | |
| c = self.counts.get(client_id, 0) | |
| self.counts[client_id] = c + 1 | |
| return c < self.max_requests | |
| def get_remaining(self, client_id): | |
| return max(0, self.max_requests - self.counts.get(client_id, 0)) | |
| ''', | |
| "optimal_code": '''import threading | |
| import time | |
| from collections import deque | |
| class RateLimiter: | |
| """Thread-safe sliding window rate limiter using token bucket pattern.""" | |
| def __init__(self, max_requests: int, window_seconds: int): | |
| """ | |
| Args: | |
| max_requests: Maximum requests allowed per window | |
| window_seconds: Length of the sliding window | |
| """ | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self._buckets: dict[str, deque] = {} | |
| self._lock = threading.Lock() | |
| def _prune(self, client_id: str, now: float) -> None: | |
| """Remove timestamps outside the current window. Must hold lock.""" | |
| cutoff = now - self.window_seconds | |
| bucket = self._buckets.get(client_id, deque()) | |
| while bucket and bucket[0] < cutoff: | |
| bucket.popleft() | |
| self._buckets[client_id] = bucket | |
| def is_allowed(self, client_id: str) -> bool: | |
| """Returns True and records the request if under rate limit.""" | |
| now = time.monotonic() | |
| with self._lock: | |
| self._prune(client_id, now) | |
| bucket = self._buckets[client_id] | |
| if len(bucket) < self.max_requests: | |
| bucket.append(now) | |
| return True | |
| return False | |
| def get_remaining(self, client_id: str) -> int: | |
| """Returns remaining requests in the current window.""" | |
| now = time.monotonic() | |
| with self._lock: | |
| self._prune(client_id, now) | |
| used = len(self._buckets.get(client_id, deque())) | |
| return max(0, self.max_requests - used) | |
| ''', | |
| } | |