load-forge-pro / load_tester.py
Trae Assistant
Initial commit: Load Forge Pro with localized UI and robust features
a26cbc3
import time
import requests
import threading
import statistics
from concurrent.futures import ThreadPoolExecutor
class LoadTester:
def __init__(self):
self.running = False
self.stats = {
'total_requests': 0,
'success_count': 0,
'fail_count': 0,
'start_time': 0,
'end_time': 0,
'latencies': [],
'status_codes': {},
'errors': []
}
self.executor = None
self._lock = threading.Lock()
def _make_request(self, url, method, headers, body, timeout):
if not self.running:
return
start = time.time()
try:
if method == 'GET':
resp = requests.get(url, headers=headers, params=body, timeout=timeout)
elif method == 'POST':
resp = requests.post(url, headers=headers, json=body, timeout=timeout)
elif method == 'PUT':
resp = requests.put(url, headers=headers, json=body, timeout=timeout)
elif method == 'DELETE':
resp = requests.delete(url, headers=headers, json=body, timeout=timeout)
else:
resp = requests.request(method, url, headers=headers, data=body, timeout=timeout)
latency = (time.time() - start) * 1000 # ms
with self._lock:
self.stats['total_requests'] += 1
self.stats['latencies'].append(latency)
code = resp.status_code
self.stats['status_codes'][code] = self.stats['status_codes'].get(code, 0) + 1
if 200 <= code < 400:
self.stats['success_count'] += 1
else:
self.stats['fail_count'] += 1
except Exception as e:
with self._lock:
self.stats['total_requests'] += 1
self.stats['fail_count'] += 1
self.stats['errors'].append(str(e))
# Limit error log size
if len(self.stats['errors']) > 50:
self.stats['errors'].pop(0)
def _worker(self, url, method, headers, body, timeout):
while self.running:
self._make_request(url, method, headers, body, timeout)
# Small sleep to prevent complete CPU lockup if concurrency is high
time.sleep(0.01)
def start(self, config):
if self.running:
return False
self.running = True
self.stats = {
'total_requests': 0,
'success_count': 0,
'fail_count': 0,
'start_time': time.time(),
'end_time': 0,
'latencies': [],
'status_codes': {},
'errors': []
}
concurrency = int(config.get('concurrency', 10))
duration = int(config.get('duration', 60))
url = config.get('url')
method = config.get('method', 'GET')
headers = config.get('headers', {})
body = config.get('body', {})
timeout = int(config.get('timeout', 5))
self.executor = ThreadPoolExecutor(max_workers=concurrency)
for _ in range(concurrency):
self.executor.submit(self._worker, url, method, headers, body, timeout)
# Auto-stop timer
threading.Timer(duration, self.stop).start()
return True
def stop(self):
self.running = False
if self.executor:
self.executor.shutdown(wait=False)
self.stats['end_time'] = time.time()
def get_stats(self):
with self._lock:
now = time.time()
start = self.stats['start_time']
duration = (self.stats['end_time'] or now) - start
if duration <= 0:
duration = 0.001
rps = self.stats['total_requests'] / duration
latencies = self.stats['latencies']
avg_latency = statistics.mean(latencies) if latencies else 0
p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else avg_latency
p99 = statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 100 else p95
return {
'running': self.running,
'duration': round(duration, 2),
'total_requests': self.stats['total_requests'],
'success_count': self.stats['success_count'],
'fail_count': self.stats['fail_count'],
'rps': round(rps, 2),
'avg_latency': round(avg_latency, 2),
'p95_latency': round(p95, 2),
'p99_latency': round(p99, 2),
'status_codes': self.stats['status_codes'],
'recent_errors': self.stats['errors'][-5:]
}
# Singleton instance
tester = LoadTester()