File size: 4,914 Bytes
a26cbc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import time
import requests
import threading
import statistics
from concurrent.futures import ThreadPoolExecutor

class LoadTester:
    def __init__(self):
        self.running = False
        self.stats = {
            'total_requests': 0,
            'success_count': 0,
            'fail_count': 0,
            'start_time': 0,
            'end_time': 0,
            'latencies': [],
            'status_codes': {},
            'errors': []
        }
        self.executor = None
        self._lock = threading.Lock()

    def _make_request(self, url, method, headers, body, timeout):
        if not self.running:
            return

        start = time.time()
        try:
            if method == 'GET':
                resp = requests.get(url, headers=headers, params=body, timeout=timeout)
            elif method == 'POST':
                resp = requests.post(url, headers=headers, json=body, timeout=timeout)
            elif method == 'PUT':
                resp = requests.put(url, headers=headers, json=body, timeout=timeout)
            elif method == 'DELETE':
                resp = requests.delete(url, headers=headers, json=body, timeout=timeout)
            else:
                resp = requests.request(method, url, headers=headers, data=body, timeout=timeout)
            
            latency = (time.time() - start) * 1000  # ms
            
            with self._lock:
                self.stats['total_requests'] += 1
                self.stats['latencies'].append(latency)
                code = resp.status_code
                self.stats['status_codes'][code] = self.stats['status_codes'].get(code, 0) + 1
                
                if 200 <= code < 400:
                    self.stats['success_count'] += 1
                else:
                    self.stats['fail_count'] += 1

        except Exception as e:
            with self._lock:
                self.stats['total_requests'] += 1
                self.stats['fail_count'] += 1
                self.stats['errors'].append(str(e))
                # Limit error log size
                if len(self.stats['errors']) > 50:
                    self.stats['errors'].pop(0)

    def _worker(self, url, method, headers, body, timeout):
        while self.running:
            self._make_request(url, method, headers, body, timeout)
            # Small sleep to prevent complete CPU lockup if concurrency is high
            time.sleep(0.01)

    def start(self, config):
        if self.running:
            return False

        self.running = True
        self.stats = {
            'total_requests': 0,
            'success_count': 0,
            'fail_count': 0,
            'start_time': time.time(),
            'end_time': 0,
            'latencies': [],
            'status_codes': {},
            'errors': []
        }
        
        concurrency = int(config.get('concurrency', 10))
        duration = int(config.get('duration', 60))
        url = config.get('url')
        method = config.get('method', 'GET')
        headers = config.get('headers', {})
        body = config.get('body', {})
        timeout = int(config.get('timeout', 5))

        self.executor = ThreadPoolExecutor(max_workers=concurrency)
        
        for _ in range(concurrency):
            self.executor.submit(self._worker, url, method, headers, body, timeout)

        # Auto-stop timer
        threading.Timer(duration, self.stop).start()
        return True

    def stop(self):
        self.running = False
        if self.executor:
            self.executor.shutdown(wait=False)
        self.stats['end_time'] = time.time()

    def get_stats(self):
        with self._lock:
            now = time.time()
            start = self.stats['start_time']
            duration = (self.stats['end_time'] or now) - start
            if duration <= 0:
                duration = 0.001

            rps = self.stats['total_requests'] / duration
            
            latencies = self.stats['latencies']
            avg_latency = statistics.mean(latencies) if latencies else 0
            p95 = statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else avg_latency
            p99 = statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 100 else p95
            
            return {
                'running': self.running,
                'duration': round(duration, 2),
                'total_requests': self.stats['total_requests'],
                'success_count': self.stats['success_count'],
                'fail_count': self.stats['fail_count'],
                'rps': round(rps, 2),
                'avg_latency': round(avg_latency, 2),
                'p95_latency': round(p95, 2),
                'p99_latency': round(p99, 2),
                'status_codes': self.stats['status_codes'],
                'recent_errors': self.stats['errors'][-5:]
            }

# Singleton instance
tester = LoadTester()