Spaces:
Sleeping
Sleeping
| import time | |
| import requests | |
| import threading | |
| import statistics | |
| from concurrent.futures import ThreadPoolExecutor | |
| class LoadTester: | |
| def __init__(self): | |
| self.running = False | |
| self.stats = { | |
| 'total_requests': 0, | |
| 'success_count': 0, | |
| 'fail_count': 0, | |
| 'start_time': 0, | |
| 'end_time': 0, | |
| 'latencies': [], | |
| 'status_codes': {}, | |
| 'errors': [] | |
| } | |
| self.executor = None | |
| self._lock = threading.Lock() | |
| def _make_request(self, url, method, headers, body, timeout): | |
| if not self.running: | |
| return | |
| start = time.time() | |
| try: | |
| if method == 'GET': | |
| resp = requests.get(url, headers=headers, params=body, timeout=timeout) | |
| elif method == 'POST': | |
| resp = requests.post(url, headers=headers, json=body, timeout=timeout) | |
| elif method == 'PUT': | |
| resp = requests.put(url, headers=headers, json=body, timeout=timeout) | |
| elif method == 'DELETE': | |
| resp = requests.delete(url, headers=headers, json=body, timeout=timeout) | |
| else: | |
| resp = requests.request(method, url, headers=headers, data=body, timeout=timeout) | |
| latency = (time.time() - start) * 1000 # ms | |
| with self._lock: | |
| self.stats['total_requests'] += 1 | |
| self.stats['latencies'].append(latency) | |
| code = resp.status_code | |
| self.stats['status_codes'][code] = self.stats['status_codes'].get(code, 0) + 1 | |
| if 200 <= code < 400: | |
| self.stats['success_count'] += 1 | |
| else: | |
| self.stats['fail_count'] += 1 | |
| except Exception as e: | |
| with self._lock: | |
| self.stats['total_requests'] += 1 | |
| self.stats['fail_count'] += 1 | |
| self.stats['errors'].append(str(e)) | |
| # Limit error log size | |
| if len(self.stats['errors']) > 50: | |
| self.stats['errors'].pop(0) | |
| def _worker(self, url, method, headers, body, timeout): | |
| while self.running: | |
| self._make_request(url, method, headers, body, timeout) | |
| # Small sleep to prevent complete CPU lockup if concurrency is high | |
| time.sleep(0.01) | |
| def start(self, config): | |
| if self.running: | |
| return False | |
| self.running = True | |
| self.stats = { | |
| 'total_requests': 0, | |
| 'success_count': 0, | |
| 'fail_count': 0, | |
| 'start_time': time.time(), | |
| 'end_time': 0, | |
| 'latencies': [], | |
| 'status_codes': {}, | |
| 'errors': [] | |
| } | |
| concurrency = int(config.get('concurrency', 10)) | |
| duration = int(config.get('duration', 60)) | |
| url = config.get('url') | |
| method = config.get('method', 'GET') | |
| headers = config.get('headers', {}) | |
| body = config.get('body', {}) | |
| timeout = int(config.get('timeout', 5)) | |
| self.executor = ThreadPoolExecutor(max_workers=concurrency) | |
| for _ in range(concurrency): | |
| self.executor.submit(self._worker, url, method, headers, body, timeout) | |
| # Auto-stop timer | |
| threading.Timer(duration, self.stop).start() | |
| return True | |
| def stop(self): | |
| self.running = False | |
| if self.executor: | |
| self.executor.shutdown(wait=False) | |
| self.stats['end_time'] = time.time() | |
| def get_stats(self): | |
| with self._lock: | |
| now = time.time() | |
| start = self.stats['start_time'] | |
| duration = (self.stats['end_time'] or now) - start | |
| if duration <= 0: | |
| duration = 0.001 | |
| rps = self.stats['total_requests'] / duration | |
| latencies = self.stats['latencies'] | |
| avg_latency = statistics.mean(latencies) if latencies else 0 | |
| p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else avg_latency | |
| p99 = statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 100 else p95 | |
| return { | |
| 'running': self.running, | |
| 'duration': round(duration, 2), | |
| 'total_requests': self.stats['total_requests'], | |
| 'success_count': self.stats['success_count'], | |
| 'fail_count': self.stats['fail_count'], | |
| 'rps': round(rps, 2), | |
| 'avg_latency': round(avg_latency, 2), | |
| 'p95_latency': round(p95, 2), | |
| 'p99_latency': round(p99, 2), | |
| 'status_codes': self.stats['status_codes'], | |
| 'recent_errors': self.stats['errors'][-5:] | |
| } | |
| # Singleton instance | |
| tester = LoadTester() | |