from flask import Flask, jsonify, request import threading import time import random import socket import sys from urllib.parse import urlparse from datetime import datetime app = Flask(__name__) # ====== 全局状态 ====== attack_status = { 'running': False, 'target_url': '', 'threads': 50, 'connections_per_thread': 3, 'duration': 30, 'request_interval': 0.1, 'start_time': None, 'stats': { 'total_requests': 0, 'successful': 0, 'failed': 0, 'active_connections': 0, 'current_rps': 0, 'last_update': None } } attack_threads = [] status_lock = threading.Lock() # ====== 压力测试核心类 ====== class PressureWorker: def __init__(self, worker_id, target_url, config): self.worker_id = worker_id self.target_url = target_url self.config = config self.running = True # 解析URL parsed = urlparse(target_url) self.hostname = parsed.hostname self.port = parsed.port or (443 if parsed.scheme == 'https' else 80) self.path = parsed.path or '/' if parsed.query: self.path += f"?{parsed.query}" # 解析IP try: self.ip = socket.gethostbyname(self.hostname) except: self.ip = None def create_connection(self): """创建TCP连接""" if not self.ip: return None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((self.ip, self.port)) with status_lock: attack_status['stats']['active_connections'] += 1 return sock except: return None def create_request(self): """创建HTTP请求""" user_agents = [ "Mozilla/5.0", "TestBot/1.0", f"Worker-{self.worker_id}" ] headers = [ f"GET {self.path} HTTP/1.1", f"Host: {self.hostname}", f"User-Agent: {random.choice(user_agents)}", "Accept: */*", "Connection: keep-alive", "\r\n" ] return "\r\n".join(headers).encode() def run(self): """工作线程主循环""" connections = [] # 建立初始连接 for _ in range(self.config['connections_per_thread']): sock = self.create_connection() if sock: connections.append(sock) time.sleep(0.05) end_time = time.time() + self.config['duration'] # 主循环 while self.running and time.time() < end_time and connections: for sock in connections[:]: try: request = self.create_request() sock.send(request) with status_lock: attack_status['stats']['total_requests'] += 1 attack_status['stats']['successful'] += 1 attack_status['stats']['last_update'] = time.time() # 随机接收响应 if random.random() > 0.3: sock.settimeout(1) try: sock.recv(1024) except: pass time.sleep(random.uniform( self.config['request_interval'] * 0.5, self.config['request_interval'] * 1.5 )) except Exception as e: # 连接失败 connections.remove(sock) with status_lock: attack_status['stats']['active_connections'] -= 1 attack_status['stats']['failed'] += 1 # 重建连接 new_sock = self.create_connection() if new_sock: connections.append(new_sock) # 清理 for sock in connections: try: sock.close() with status_lock: attack_status['stats']['active_connections'] -= 1 except: pass # ====== Flask路由 ====== @app.route('/') def index(): """首页 - 显示API信息""" return jsonify({ 'service': '压力测试API', 'version': '1.0', 'status': 'running' if attack_status['running'] else 'stopped', 'endpoints': { '/': '此帮助信息', '/status': '获取当前状态', '/start': '启动压力测试 (POST)', '/stop': '停止压力测试', '/config': '查看/更新配置 (GET/POST)' } }) @app.route('/status', methods=['GET']) def get_status(): """获取当前状态""" with status_lock: # 计算RPS if attack_status['stats']['last_update']: time_diff = time.time() - attack_status['stats']['last_update'] if time_diff < 5: # 最近5秒内有更新 attack_status['stats']['current_rps'] = attack_status['stats']['total_requests'] / max(time.time() - attack_status['start_time'], 1) status_copy = attack_status.copy() if status_copy['start_time']: status_copy['uptime'] = time.time() - status_copy['start_time'] else: status_copy['uptime'] = 0 return jsonify(status_copy) @app.route('/start', methods=['POST']) def start_attack(): """启动压力测试""" global attack_threads, attack_status if attack_status['running']: return jsonify({'error': '压力测试已在运行中'}), 400 data = request.json or {} # 获取目标URL target_url = data.get('target_url', attack_status['target_url']) if not target_url: return jsonify({'error': '必须提供target_url参数'}), 400 # 验证URL try: parsed = urlparse(target_url) if not parsed.hostname: return jsonify({'error': '无效的URL格式'}), 400 except: return jsonify({'error': '无法解析URL'}), 400 # 更新配置 with status_lock: attack_status['running'] = True attack_status['target_url'] = target_url attack_status['threads'] = data.get('threads', attack_status['threads']) attack_status['connections_per_thread'] = data.get('connections_per_thread', attack_status['connections_per_thread']) attack_status['duration'] = data.get('duration', attack_status['duration']) attack_status['request_interval'] = data.get('request_interval', attack_status['request_interval']) attack_status['start_time'] = time.time() # 重置统计数据 attack_status['stats'] = { 'total_requests': 0, 'successful': 0, 'failed': 0, 'active_connections': 0, 'current_rps': 0, 'last_update': None } # 创建并启动工作线程 attack_threads = [] for i in range(attack_status['threads']): worker = PressureWorker( worker_id=i, target_url=target_url, config={ 'connections_per_thread': attack_status['connections_per_thread'], 'duration': attack_status['duration'], 'request_interval': attack_status['request_interval'] } ) thread = threading.Thread(target=worker.run, daemon=True) thread.start() attack_threads.append((worker, thread)) return jsonify({ 'message': '压力测试已启动', 'config': { 'target_url': attack_status['target_url'], 'threads': attack_status['threads'], 'connections_per_thread': attack_status['connections_per_thread'], 'duration': attack_status['duration'], 'request_interval': attack_status['request_interval'] } }) @app.route('/stop', methods=['POST']) def stop_attack(): """停止压力测试""" global attack_status, attack_threads with status_lock: if not attack_status['running']: return jsonify({'error': '没有正在运行的压力测试'}), 400 attack_status['running'] = False # 停止所有工作线程 for worker, thread in attack_threads: worker.running = False # 等待线程结束 time.sleep(2) with status_lock: attack_status['start_time'] = None return jsonify({ 'message': '压力测试已停止', 'final_stats': attack_status['stats'] }) @app.route('/config', methods=['GET', 'POST']) def manage_config(): """查看或更新配置""" if request.method == 'GET': return jsonify({ 'current_config': { 'target_url': attack_status['target_url'], 'threads': attack_status['threads'], 'connections_per_thread': attack_status['connections_per_thread'], 'duration': attack_status['duration'], 'request_interval': attack_status['request_interval'] }, 'running': attack_status['running'] }) else: # POST if attack_status['running']: return jsonify({'error': '运行中无法修改配置'}), 400 data = request.json or {} with status_lock: if 'target_url' in data: attack_status['target_url'] = data['target_url'] if 'threads' in data: attack_status['threads'] = max(1, min(data['threads'], 1000)) if 'connections_per_thread' in data: attack_status['connections_per_thread'] = max(1, min(data['connections_per_thread'], 20)) if 'duration' in data: attack_status['duration'] = max(1, min(data['duration'], 3600)) if 'request_interval' in data: attack_status['request_interval'] = max(0.01, min(data['request_interval'], 1.0)) return jsonify({ 'message': '配置已更新', 'new_config': { 'target_url': attack_status['target_url'], 'threads': attack_status['threads'], 'connections_per_thread': attack_status['connections_per_thread'], 'duration': attack_status['duration'], 'request_interval': attack_status['request_interval'] } }) # ====== 主程序 ====== if __name__ == '__main__': # 显示启动信息 print("=" * 60) print("🚀 压力测试API服务器") print("=" * 60) print("API端点:") print(" GET / - 查看API信息") print(" GET /status - 获取当前状态") print(" POST /start - 启动压力测试") print(" POST /stop - 停止压力测试") print(" GET/POST /config - 查看/更新配置") print("=" * 60) print("⚠️ 警告: 仅用于合法的压力测试!") print("=" * 60) # 启动Flask服务器 app.run( host='0.0.0.0', port=7860, debug=False, threaded=True )