test0 / app.py
cnmksjs's picture
Update app.py
8075dba verified
from flask import Flask, jsonify, request
import threading
import time
import random
import socket
import sys
from urllib.parse import urlparse
from datetime import datetime
app = Flask(__name__)
# ====== 全局状态 ======
attack_status = {
'running': False,
'target_url': '',
'threads': 50,
'connections_per_thread': 3,
'duration': 30,
'request_interval': 0.1,
'start_time': None,
'stats': {
'total_requests': 0,
'successful': 0,
'failed': 0,
'active_connections': 0,
'current_rps': 0,
'last_update': None
}
}
attack_threads = []
status_lock = threading.Lock()
# ====== 压力测试核心类 ======
class PressureWorker:
def __init__(self, worker_id, target_url, config):
self.worker_id = worker_id
self.target_url = target_url
self.config = config
self.running = True
# 解析URL
parsed = urlparse(target_url)
self.hostname = parsed.hostname
self.port = parsed.port or (443 if parsed.scheme == 'https' else 80)
self.path = parsed.path or '/'
if parsed.query:
self.path += f"?{parsed.query}"
# 解析IP
try:
self.ip = socket.gethostbyname(self.hostname)
except:
self.ip = None
def create_connection(self):
"""创建TCP连接"""
if not self.ip:
return None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((self.ip, self.port))
with status_lock:
attack_status['stats']['active_connections'] += 1
return sock
except:
return None
def create_request(self):
"""创建HTTP请求"""
user_agents = [
"Mozilla/5.0",
"TestBot/1.0",
f"Worker-{self.worker_id}"
]
headers = [
f"GET {self.path} HTTP/1.1",
f"Host: {self.hostname}",
f"User-Agent: {random.choice(user_agents)}",
"Accept: */*",
"Connection: keep-alive",
"\r\n"
]
return "\r\n".join(headers).encode()
def run(self):
"""工作线程主循环"""
connections = []
# 建立初始连接
for _ in range(self.config['connections_per_thread']):
sock = self.create_connection()
if sock:
connections.append(sock)
time.sleep(0.05)
end_time = time.time() + self.config['duration']
# 主循环
while self.running and time.time() < end_time and connections:
for sock in connections[:]:
try:
request = self.create_request()
sock.send(request)
with status_lock:
attack_status['stats']['total_requests'] += 1
attack_status['stats']['successful'] += 1
attack_status['stats']['last_update'] = time.time()
# 随机接收响应
if random.random() > 0.3:
sock.settimeout(1)
try:
sock.recv(1024)
except:
pass
time.sleep(random.uniform(
self.config['request_interval'] * 0.5,
self.config['request_interval'] * 1.5
))
except Exception as e:
# 连接失败
connections.remove(sock)
with status_lock:
attack_status['stats']['active_connections'] -= 1
attack_status['stats']['failed'] += 1
# 重建连接
new_sock = self.create_connection()
if new_sock:
connections.append(new_sock)
# 清理
for sock in connections:
try:
sock.close()
with status_lock:
attack_status['stats']['active_connections'] -= 1
except:
pass
# ====== Flask路由 ======
@app.route('/')
def index():
"""首页 - 显示API信息"""
return jsonify({
'service': '压力测试API',
'version': '1.0',
'status': 'running' if attack_status['running'] else 'stopped',
'endpoints': {
'/': '此帮助信息',
'/status': '获取当前状态',
'/start': '启动压力测试 (POST)',
'/stop': '停止压力测试',
'/config': '查看/更新配置 (GET/POST)'
}
})
@app.route('/status', methods=['GET'])
def get_status():
"""获取当前状态"""
with status_lock:
# 计算RPS
if attack_status['stats']['last_update']:
time_diff = time.time() - attack_status['stats']['last_update']
if time_diff < 5: # 最近5秒内有更新
attack_status['stats']['current_rps'] = attack_status['stats']['total_requests'] / max(time.time() - attack_status['start_time'], 1)
status_copy = attack_status.copy()
if status_copy['start_time']:
status_copy['uptime'] = time.time() - status_copy['start_time']
else:
status_copy['uptime'] = 0
return jsonify(status_copy)
@app.route('/start', methods=['POST'])
def start_attack():
"""启动压力测试"""
global attack_threads, attack_status
if attack_status['running']:
return jsonify({'error': '压力测试已在运行中'}), 400
data = request.json or {}
# 获取目标URL
target_url = data.get('target_url', attack_status['target_url'])
if not target_url:
return jsonify({'error': '必须提供target_url参数'}), 400
# 验证URL
try:
parsed = urlparse(target_url)
if not parsed.hostname:
return jsonify({'error': '无效的URL格式'}), 400
except:
return jsonify({'error': '无法解析URL'}), 400
# 更新配置
with status_lock:
attack_status['running'] = True
attack_status['target_url'] = target_url
attack_status['threads'] = data.get('threads', attack_status['threads'])
attack_status['connections_per_thread'] = data.get('connections_per_thread', attack_status['connections_per_thread'])
attack_status['duration'] = data.get('duration', attack_status['duration'])
attack_status['request_interval'] = data.get('request_interval', attack_status['request_interval'])
attack_status['start_time'] = time.time()
# 重置统计数据
attack_status['stats'] = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'active_connections': 0,
'current_rps': 0,
'last_update': None
}
# 创建并启动工作线程
attack_threads = []
for i in range(attack_status['threads']):
worker = PressureWorker(
worker_id=i,
target_url=target_url,
config={
'connections_per_thread': attack_status['connections_per_thread'],
'duration': attack_status['duration'],
'request_interval': attack_status['request_interval']
}
)
thread = threading.Thread(target=worker.run, daemon=True)
thread.start()
attack_threads.append((worker, thread))
return jsonify({
'message': '压力测试已启动',
'config': {
'target_url': attack_status['target_url'],
'threads': attack_status['threads'],
'connections_per_thread': attack_status['connections_per_thread'],
'duration': attack_status['duration'],
'request_interval': attack_status['request_interval']
}
})
@app.route('/stop', methods=['POST'])
def stop_attack():
"""停止压力测试"""
global attack_status, attack_threads
with status_lock:
if not attack_status['running']:
return jsonify({'error': '没有正在运行的压力测试'}), 400
attack_status['running'] = False
# 停止所有工作线程
for worker, thread in attack_threads:
worker.running = False
# 等待线程结束
time.sleep(2)
with status_lock:
attack_status['start_time'] = None
return jsonify({
'message': '压力测试已停止',
'final_stats': attack_status['stats']
})
@app.route('/config', methods=['GET', 'POST'])
def manage_config():
"""查看或更新配置"""
if request.method == 'GET':
return jsonify({
'current_config': {
'target_url': attack_status['target_url'],
'threads': attack_status['threads'],
'connections_per_thread': attack_status['connections_per_thread'],
'duration': attack_status['duration'],
'request_interval': attack_status['request_interval']
},
'running': attack_status['running']
})
else: # POST
if attack_status['running']:
return jsonify({'error': '运行中无法修改配置'}), 400
data = request.json or {}
with status_lock:
if 'target_url' in data:
attack_status['target_url'] = data['target_url']
if 'threads' in data:
attack_status['threads'] = max(1, min(data['threads'], 1000))
if 'connections_per_thread' in data:
attack_status['connections_per_thread'] = max(1, min(data['connections_per_thread'], 20))
if 'duration' in data:
attack_status['duration'] = max(1, min(data['duration'], 3600))
if 'request_interval' in data:
attack_status['request_interval'] = max(0.01, min(data['request_interval'], 1.0))
return jsonify({
'message': '配置已更新',
'new_config': {
'target_url': attack_status['target_url'],
'threads': attack_status['threads'],
'connections_per_thread': attack_status['connections_per_thread'],
'duration': attack_status['duration'],
'request_interval': attack_status['request_interval']
}
})
# ====== 主程序 ======
if __name__ == '__main__':
# 显示启动信息
print("=" * 60)
print("🚀 压力测试API服务器")
print("=" * 60)
print("API端点:")
print(" GET / - 查看API信息")
print(" GET /status - 获取当前状态")
print(" POST /start - 启动压力测试")
print(" POST /stop - 停止压力测试")
print(" GET/POST /config - 查看/更新配置")
print("=" * 60)
print("⚠️ 警告: 仅用于合法的压力测试!")
print("=" * 60)
# 启动Flask服务器
app.run(
host='0.0.0.0',
port=7860,
debug=False,
threaded=True
)