import time import requests import threading import statistics from concurrent.futures import ThreadPoolExecutor class LoadTester: def __init__(self): self.running = False self.stats = { 'total_requests': 0, 'success_count': 0, 'fail_count': 0, 'start_time': 0, 'end_time': 0, 'latencies': [], 'status_codes': {}, 'errors': [] } self.executor = None self._lock = threading.Lock() def _make_request(self, url, method, headers, body, timeout): if not self.running: return start = time.time() try: if method == 'GET': resp = requests.get(url, headers=headers, params=body, timeout=timeout) elif method == 'POST': resp = requests.post(url, headers=headers, json=body, timeout=timeout) elif method == 'PUT': resp = requests.put(url, headers=headers, json=body, timeout=timeout) elif method == 'DELETE': resp = requests.delete(url, headers=headers, json=body, timeout=timeout) else: resp = requests.request(method, url, headers=headers, data=body, timeout=timeout) latency = (time.time() - start) * 1000 # ms with self._lock: self.stats['total_requests'] += 1 self.stats['latencies'].append(latency) code = resp.status_code self.stats['status_codes'][code] = self.stats['status_codes'].get(code, 0) + 1 if 200 <= code < 400: self.stats['success_count'] += 1 else: self.stats['fail_count'] += 1 except Exception as e: with self._lock: self.stats['total_requests'] += 1 self.stats['fail_count'] += 1 self.stats['errors'].append(str(e)) # Limit error log size if len(self.stats['errors']) > 50: self.stats['errors'].pop(0) def _worker(self, url, method, headers, body, timeout): while self.running: self._make_request(url, method, headers, body, timeout) # Small sleep to prevent complete CPU lockup if concurrency is high time.sleep(0.01) def start(self, config): if self.running: return False self.running = True self.stats = { 'total_requests': 0, 'success_count': 0, 'fail_count': 0, 'start_time': time.time(), 'end_time': 0, 'latencies': [], 'status_codes': {}, 'errors': [] } concurrency = int(config.get('concurrency', 10)) duration = int(config.get('duration', 60)) url = config.get('url') method = config.get('method', 'GET') headers = config.get('headers', {}) body = config.get('body', {}) timeout = int(config.get('timeout', 5)) self.executor = ThreadPoolExecutor(max_workers=concurrency) for _ in range(concurrency): self.executor.submit(self._worker, url, method, headers, body, timeout) # Auto-stop timer threading.Timer(duration, self.stop).start() return True def stop(self): self.running = False if self.executor: self.executor.shutdown(wait=False) self.stats['end_time'] = time.time() def get_stats(self): with self._lock: now = time.time() start = self.stats['start_time'] duration = (self.stats['end_time'] or now) - start if duration <= 0: duration = 0.001 rps = self.stats['total_requests'] / duration latencies = self.stats['latencies'] avg_latency = statistics.mean(latencies) if latencies else 0 p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else avg_latency p99 = statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 100 else p95 return { 'running': self.running, 'duration': round(duration, 2), 'total_requests': self.stats['total_requests'], 'success_count': self.stats['success_count'], 'fail_count': self.stats['fail_count'], 'rps': round(rps, 2), 'avg_latency': round(avg_latency, 2), 'p95_latency': round(p95, 2), 'p99_latency': round(p99, 2), 'status_codes': self.stats['status_codes'], 'recent_errors': self.stats['errors'][-5:] } # Singleton instance tester = LoadTester()