SecureCodeEnv / tasks /medium /rate_limiter.py
vishaldhakad's picture
frontend adding
7257069
Raw
History Blame Contribute Delete
4.87 kB
"""
SecureCodeEnv Task: Rate Limiter
Difficulty: Medium
CWE Targets: CWE-770 (Allocation Without Limits), CWE-400 (Uncontrolled Resource Consumption)
"""
TASK = {
"id": "medium_rate_limiter",
"difficulty": "medium",
"cwe_targets": ["CWE-770", "CWE-400"],
"component_name": "RateLimiter",
"filename": "src/middleware/rate_limiter.py",
"problem_statement": """Write a Python class:
`class RateLimiter:`
A thread-safe token bucket rate limiter.
REQUIREMENTS:
- `__init__(self, max_requests: int, window_seconds: int)`
- max_requests: max allowed calls in the time window
- window_seconds: size of the sliding window in seconds
- `is_allowed(self, client_id: str) -> bool`
- Returns True if the client is under the rate limit
- Returns False if the client has exceeded max_requests in window_seconds
- Each client_id is tracked independently
- Must be thread-safe (use threading.Lock)
- Must use time-based sliding window — NOT a fixed counter reset
- `get_remaining(self, client_id: str) -> int`
- Returns how many requests the client can still make in the current window
MUST handle concurrent requests correctly — no race conditions.""",
"starter_code": '''class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.counts = {} # NOT thread-safe!
def is_allowed(self, client_id: str) -> bool:
# TODO: Implement with proper sliding window and thread safety
count = self.counts.get(client_id, 0)
self.counts[client_id] = count + 1
return count < self.max_requests
def get_remaining(self, client_id: str) -> int:
count = self.counts.get(client_id, 0)
return max(0, self.max_requests - count)
''',
"test_cases": [
{"fn_class": "RateLimiter", "init_args": [5, 60], "method": "is_allowed", "input": ["user1"], "expected": True, "description": "First request allowed"},
{"fn_class": "RateLimiter", "init_args": [2, 60], "method": "is_allowed_multi", "calls": 3, "input": ["user1"], "expected_last": False, "description": "Third request blocked when limit is 2"},
{"fn_class": "RateLimiter", "init_args": [5, 60], "method": "get_remaining", "input": ["new_client"], "expected": 5, "description": "New client has full remaining"},
{"fn_class": "RateLimiter", "init_args": [3, 60], "method": "independent_clients", "description": "Different client IDs are tracked independently"},
],
"attack_type": "none",
"security_checks": [
{"type": "uses_threading_lock", "required": ["threading.Lock", "threading.RLock"]},
{"type": "uses_time", "required": ["time.time", "time.monotonic"]},
],
"naive_code": '''class RateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.counts = {}
def is_allowed(self, client_id):
c = self.counts.get(client_id, 0)
self.counts[client_id] = c + 1
return c < self.max_requests
def get_remaining(self, client_id):
return max(0, self.max_requests - self.counts.get(client_id, 0))
''',
"optimal_code": '''import threading
import time
from collections import deque
class RateLimiter:
"""Thread-safe sliding window rate limiter using token bucket pattern."""
def __init__(self, max_requests: int, window_seconds: int):
"""
Args:
max_requests: Maximum requests allowed per window
window_seconds: Length of the sliding window
"""
self.max_requests = max_requests
self.window_seconds = window_seconds
self._buckets: dict[str, deque] = {}
self._lock = threading.Lock()
def _prune(self, client_id: str, now: float) -> None:
"""Remove timestamps outside the current window. Must hold lock."""
cutoff = now - self.window_seconds
bucket = self._buckets.get(client_id, deque())
while bucket and bucket[0] < cutoff:
bucket.popleft()
self._buckets[client_id] = bucket
def is_allowed(self, client_id: str) -> bool:
"""Returns True and records the request if under rate limit."""
now = time.monotonic()
with self._lock:
self._prune(client_id, now)
bucket = self._buckets[client_id]
if len(bucket) < self.max_requests:
bucket.append(now)
return True
return False
def get_remaining(self, client_id: str) -> int:
"""Returns remaining requests in the current window."""
now = time.monotonic()
with self._lock:
self._prune(client_id, now)
used = len(self._buckets.get(client_id, deque()))
return max(0, self.max_requests - used)
''',
}