Spaces:
Sleeping
Sleeping
File size: 4,914 Bytes
a26cbc3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | import time
import requests
import threading
import statistics
from concurrent.futures import ThreadPoolExecutor
class LoadTester:
def __init__(self):
self.running = False
self.stats = {
'total_requests': 0,
'success_count': 0,
'fail_count': 0,
'start_time': 0,
'end_time': 0,
'latencies': [],
'status_codes': {},
'errors': []
}
self.executor = None
self._lock = threading.Lock()
def _make_request(self, url, method, headers, body, timeout):
if not self.running:
return
start = time.time()
try:
if method == 'GET':
resp = requests.get(url, headers=headers, params=body, timeout=timeout)
elif method == 'POST':
resp = requests.post(url, headers=headers, json=body, timeout=timeout)
elif method == 'PUT':
resp = requests.put(url, headers=headers, json=body, timeout=timeout)
elif method == 'DELETE':
resp = requests.delete(url, headers=headers, json=body, timeout=timeout)
else:
resp = requests.request(method, url, headers=headers, data=body, timeout=timeout)
latency = (time.time() - start) * 1000 # ms
with self._lock:
self.stats['total_requests'] += 1
self.stats['latencies'].append(latency)
code = resp.status_code
self.stats['status_codes'][code] = self.stats['status_codes'].get(code, 0) + 1
if 200 <= code < 400:
self.stats['success_count'] += 1
else:
self.stats['fail_count'] += 1
except Exception as e:
with self._lock:
self.stats['total_requests'] += 1
self.stats['fail_count'] += 1
self.stats['errors'].append(str(e))
# Limit error log size
if len(self.stats['errors']) > 50:
self.stats['errors'].pop(0)
def _worker(self, url, method, headers, body, timeout):
while self.running:
self._make_request(url, method, headers, body, timeout)
# Small sleep to prevent complete CPU lockup if concurrency is high
time.sleep(0.01)
def start(self, config):
if self.running:
return False
self.running = True
self.stats = {
'total_requests': 0,
'success_count': 0,
'fail_count': 0,
'start_time': time.time(),
'end_time': 0,
'latencies': [],
'status_codes': {},
'errors': []
}
concurrency = int(config.get('concurrency', 10))
duration = int(config.get('duration', 60))
url = config.get('url')
method = config.get('method', 'GET')
headers = config.get('headers', {})
body = config.get('body', {})
timeout = int(config.get('timeout', 5))
self.executor = ThreadPoolExecutor(max_workers=concurrency)
for _ in range(concurrency):
self.executor.submit(self._worker, url, method, headers, body, timeout)
# Auto-stop timer
threading.Timer(duration, self.stop).start()
return True
def stop(self):
self.running = False
if self.executor:
self.executor.shutdown(wait=False)
self.stats['end_time'] = time.time()
def get_stats(self):
with self._lock:
now = time.time()
start = self.stats['start_time']
duration = (self.stats['end_time'] or now) - start
if duration <= 0:
duration = 0.001
rps = self.stats['total_requests'] / duration
latencies = self.stats['latencies']
avg_latency = statistics.mean(latencies) if latencies else 0
p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else avg_latency
p99 = statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 100 else p95
return {
'running': self.running,
'duration': round(duration, 2),
'total_requests': self.stats['total_requests'],
'success_count': self.stats['success_count'],
'fail_count': self.stats['fail_count'],
'rps': round(rps, 2),
'avg_latency': round(avg_latency, 2),
'p95_latency': round(p95, 2),
'p99_latency': round(p99, 2),
'status_codes': self.stats['status_codes'],
'recent_errors': self.stats['errors'][-5:]
}
# Singleton instance
tester = LoadTester()
|