| |
| """ |
| PyFlood Suite v1.2.5 |
| Educational Network Testing Tool |
| Command Line Interface Version |
| """ |
|
|
| import threading |
| import time |
| import json |
| import random |
| import os |
| from datetime import datetime |
| from typing import List, Dict, Optional |
| from dataclasses import dataclass, asdict |
| from enum import Enum |
| import argparse |
|
|
| |
| class Colors: |
| HEADER = '\033[95m' |
| BLUE = '\033[94m' |
| CYAN = '\033[96m' |
| GREEN = '\033[92m' |
| YELLOW = '\033[93m' |
| RED = '\033[91m' |
| MAGENTA = '\033[35m' |
| BOLD = '\033[1m' |
| UNDERLINE = '\033[4m' |
| END = '\033[0m' |
|
|
| class AttackType(Enum): |
| STEAM_FRIEND = "steam_friend" |
| STEAM_MESSAGE = "steam_message" |
| STEAM_COMMENT = "steam_comment" |
| IP_UDP = "ip_udp" |
| IP_SYN = "ip_syn" |
| IP_HTTP = "ip_http" |
| IP_ICMP = "ip_icmp" |
| EMAIL_FLOOD = "email_flood" |
|
|
| @dataclass |
| class SteamConfig: |
| targets: List[str] |
| attack_type: str |
| count: int |
| message: str |
|
|
| @dataclass |
| class IPConfig: |
| targets: List[str] |
| attack_type: str |
| duration: int |
| port: int = 80 |
|
|
| @dataclass |
| class EmailConfig: |
| targets: List[str] |
| emails_per_target: int |
| delay: int |
| subject: str |
| body: str |
|
|
| class ConsoleLogger: |
| def __init__(self): |
| self.log_history = [] |
| |
| def log(self, message: str, color: str = Colors.END, prefix: str = ">>"): |
| timestamp = datetime.now().strftime("%H:%M:%S") |
| formatted = f"{Colors.CYAN}[{timestamp}]{Colors.END} {color}{prefix} {message}{Colors.END}" |
| print(formatted) |
| self.log_history.append({ |
| "time": timestamp, |
| "message": message, |
| "type": color |
| }) |
| |
| def success(self, message: str): |
| self.log(message, Colors.GREEN) |
| |
| def error(self, message: str): |
| self.log(message, Colors.RED) |
| |
| def warning(self, message: str): |
| self.log(message, Colors.YELLOW) |
| |
| def info(self, message: str): |
| self.log(message, Colors.BLUE) |
| |
| def debug(self, message: str): |
| self.log(message, Colors.MAGENTA) |
|
|
| class AttackSimulator: |
| def __init__(self, logger: ConsoleLogger): |
| self.logger = logger |
| self.active_attacks = {} |
| self.attack_counter = 0 |
| self.lock = threading.Lock() |
| |
| def steam_flood(self, config: SteamConfig, attack_id: str): |
| """Simulate Steam ID flooding""" |
| self.logger.info(f"Initializing Steam ID flood attack #{attack_id}...") |
| self.logger.info(f"Targets: {len(config.targets)} Steam IDs") |
| self.logger.info(f"Attack type: {config.attack_type}") |
| |
| time.sleep(1) |
| |
| if config.attack_type == "Friend Requests": |
| self.logger.info("Connecting to Steam API proxy...") |
| time.sleep(0.8) |
| self.logger.success("Connected to Steam relay servers") |
| |
| for i in range(config.count): |
| if attack_id not in self.active_attacks: |
| self.logger.warning("Attack stopped by user") |
| break |
| |
| target = random.choice(config.targets) |
| |
| with self.lock: |
| self.attack_counter += 1 |
| |
| if config.attack_type == "Friend Requests": |
| actions = [ |
| f"Sent friend request to {target}", |
| f"Sending add friend packet to Steam ID {target}", |
| f"Friend request queued for {target}" |
| ] |
| elif config.attack_type == "Message Flood": |
| actions = [ |
| f"Sent message to {target}: '{config.message}'", |
| f"Message delivered to {target}", |
| f"Chat packet sent to {target}" |
| ] |
| else: |
| actions = [ |
| f"Posted comment on {target}'s profile", |
| f"Profile comment submitted to {target}", |
| f"Comment flood sent to {target}" |
| ] |
| |
| self.logger.success(random.choice(actions)) |
| time.sleep(random.uniform(0.5, 1.5)) |
| |
| self.logger.info(f"Steam attack #{attack_id} completed. Sent {config.count} requests.") |
| |
| def ip_flood(self, config: IPConfig, attack_id: str): |
| """Simulate IP flooding""" |
| self.logger.info(f"Initializing IP flood attack #{attack_id}...") |
| self.logger.warning(f"Target IPs: {', '.join(config.targets)}") |
| self.logger.info(f"Attack vector: {config.attack_type}") |
| |
| time.sleep(1) |
| |
| if config.attack_type == "UDP Flood": |
| self.logger.info("Preparing UDP payload...") |
| elif config.attack_type == "SYN Flood": |
| self.logger.info("Initializing SYN packet generator...") |
| elif config.attack_type == "HTTP Flood": |
| self.logger.info("Preparing HTTP request headers...") |
| elif config.attack_type == "ICMP Flood": |
| self.logger.info("Loading ICMP packet structure...") |
| |
| self.logger.success("Attack modules loaded successfully") |
| time.sleep(0.5) |
| |
| end_time = time.time() + config.duration |
| packet_count = 0 |
| |
| while time.time() < end_time: |
| if attack_id not in self.active_attacks: |
| self.logger.warning("Attack interrupted by user") |
| break |
| |
| target = random.choice(config.targets) |
| |
| if config.attack_type == "UDP Flood": |
| size = random.randint(64, 1024) |
| msg = f"UDP packet sent to {target}:{config.port} ({size} bytes)" |
| elif config.attack_type == "SYN Flood": |
| msg = f"SYN request sent to {target}:{config.port} [SYN_SENT]" |
| elif config.attack_type == "HTTP Flood": |
| path = random.choice(["/", "/index.html", "/api", "/login", "/admin"]) |
| msg = f"HTTP GET {path} sent to {target}:{config.port} [200 OK]" |
| else: |
| msg = f"ICMP echo request to {target} [ttl=64]" |
| |
| self.logger.success(msg) |
| packet_count += 1 |
| |
| |
| time.sleep(random.uniform(0.01, 0.1)) |
| |
| self.logger.info(f"IP flood completed. Sent {packet_count} packets over {config.duration} seconds.") |
| |
| def email_flood(self, config: EmailConfig, attack_id: str): |
| """Simulate Email flooding""" |
| self.logger.info(f"Initializing Email flood attack #{attack_id}...") |
| self.logger.info(f"Target addresses: {len(config.targets)}") |
| self.logger.info(f"Emails per target: {config.emails_per_target}") |
| |
| time.sleep(1) |
| |
| self.logger.info("Connecting to SMTP relay...") |
| time.sleep(0.8) |
| self.logger.success("SMTP connection established") |
| self.logger.info(f"Subject: {config.subject}") |
| |
| sent_count = 0 |
| |
| for target in config.targets: |
| for i in range(config.emails_per_target): |
| if attack_id not in self.active_attacks: |
| self.logger.warning("Attack stopped by user") |
| return |
| |
| with self.lock: |
| sent_count += 1 |
| |
| self.logger.success(f"Email #{i+1} sent to {target}") |
| |
| if config.delay > 0: |
| time.sleep(config.delay / 1000) |
| |
| self.logger.info(f"Email flood completed. Sent {sent_count} messages.") |
|
|
| class PyFloodSuite: |
| def __init__(self): |
| self.logger = ConsoleLogger() |
| self.simulator = AttackSimulator(self.logger) |
| self.running = True |
| self.config_file = "pyflood_config.json" |
| |
| def clear_screen(self): |
| os.system('cls' if os.name == 'nt' else 'clear') |
| |
| def print_banner(self): |
| banner = f""" |
| {Colors.BLUE}{Colors.BOLD} |
| ╔═══════════════════════════════════════════════════════════════╗ |
| ║ ║ |
| ║ ██████╗ ██╗ ██╗███████╗██╗ ██████╗ ██████╗ ██████╗ ║ |
| ║ ██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██╔═══██╗██╔══██╗██╔══██╗ ║ |
| ║ ██████╔╝ ╚████╔╝ █████╗ ██║ ██║ ██║██║ ██║██║ ██║ ║ |
| ║ ██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║ ██║██║ ██║██║ ██║ ║ |
| ║ ██║ ██║ ██║ ███████╗╚██████╔╝██████╔╝██████╔╝ ║ |
| ║ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═════╝ ║ |
| ║ ║ |
| ║ Steam ID/IP/Email Testing Suite v1.2.5 ║ |
| ║ [Educational Use Only] ║ |
| ╚═══════════════════════════════════════════════════════════════╝ |
| {Colors.END}""" |
| print(banner) |
| |
| def print_menu(self): |
| menu = f""" |
| {Colors.YELLOW}{Colors.BOLD}Available Commands:{Colors.END} |
| {Colors.CYAN}1.{Colors.END} Steam Tools - Configure Steam ID flooding |
| {Colors.CYAN}2.{Colors.END} IP Tools - Configure IP address flooding |
| {Colors.CYAN}3.{Colors.END} Email Tools - Configure Email flooding |
| {Colors.CYAN}4.{Colors.END} Start Attack - Begin configured attack |
| {Colors.CYAN}5.{Colors.END} Stop Attack - Halt current operations |
| {Colors.CYAN}6.{Colors.END} Status - View attack statistics |
| {Colors.CYAN}7.{Colors.END} Save Config - Export current configuration |
| {Colors.CYAN}8.{Colors.END} Load Config - Import saved configuration |
| {Colors.CYAN}9.{Colors.END} Clear Console - Clear terminal output |
| {Colors.CYAN}0.{Colors.END} Exit - Close application |
| |
| {Colors.GREEN}Current Status:{Colors.END} {self.get_status()} |
| """ |
| print(menu) |
| |
| def get_status(self): |
| if self.simulator.active_attacks: |
| return f"{Colors.GREEN}{Colors.BOLD}ATTACKING{Colors.END} ({len(self.simulator.active_attacks)} active)" |
| return f"{Colors.RED}IDLE{Colors.END}" |
| |
| def steam_tools_menu(self): |
| print(f"\n{Colors.BLUE}{Colors.BOLD}=== Steam ID Flooder Configuration ==={Colors.END}\n") |
| |
| targets_input = input(f"{Colors.CYAN}Enter Steam IDs (one per line, empty line to finish):{Colors.END}\n") |
| targets = [] |
| while targets_input.strip(): |
| targets.append(targets_input.strip()) |
| targets_input = input() |
| |
| if not targets: |
| self.logger.error("No targets specified") |
| return None |
| |
| print(f"\n{Colors.YELLOW}Request Type:{Colors.END}") |
| print(" 1. Friend Requests") |
| print(" 2. Message Flood") |
| print(" 3. Profile Comments") |
| choice = input("Select (1-3): ") |
| |
| types = ["Friend Requests", "Message Flood", "Profile Comments"] |
| attack_type = types[int(choice)-1] if choice in ['1','2','3'] else "Friend Requests" |
| |
| count = input(f"\nNumber of attacks per target [50]: ") or "50" |
| message = "" |
| if attack_type == "Message Flood": |
| message = input(f"Custom message [Hello from PyFlood]: ") or "Hello from PyFlood" |
| |
| config = SteamConfig( |
| targets=targets, |
| attack_type=attack_type, |
| count=int(count), |
| message=message |
| ) |
| |
| self.logger.success(f"Steam configuration saved: {len(targets)} targets, {count} requests each") |
| return config |
| |
| def ip_tools_menu(self): |
| print(f"\n{Colors.PURPLE}{Colors.BOLD}=== IP Flooder Configuration ==={Colors.END}\n") |
| |
| targets_input = input(f"{Colors.CYAN}Enter IP addresses (one per line, empty line to finish):{Colors.END}\n") |
| targets = [] |
| while targets_input.strip(): |
| targets.append(targets_input.strip()) |
| targets_input = input() |
| |
| if not targets: |
| self.logger.error("No targets specified") |
| return None |
| |
| print(f"\n{Colors.YELLOW}Attack Type:{Colors.END}") |
| print(" 1. UDP Flood") |
| print(" 2. SYN Flood") |
| print(" 3. HTTP Flood") |
| print(" 4. ICMP Flood") |
| choice = input("Select (1-4): ") |
| |
| types = ["UDP Flood", "SYN Flood", "HTTP Flood", "ICMP Flood"] |
| attack_type = types[int(choice)-1] if choice in ['1','2','3','4'] else "UDP Flood" |
| |
| duration = input(f"\nDuration in seconds [60]: ") or "60" |
| port = 80 |
| if attack_type in ["UDP Flood", "SYN Flood", "HTTP Flood"]: |
| port = input(f"Target port [80]: ") or "80" |
| |
| config = IPConfig( |
| targets=targets, |
| attack_type=attack_type, |
| duration=int(duration), |
| port=int(port) |
| ) |
| |
| self.logger.success(f"IP configuration saved: {len(targets)} targets, {duration}s duration") |
| return config |
| |
| def email_tools_menu(self): |
| print(f"\n{Colors.GREEN}{Colors.BOLD}=== Email Flooder Configuration ==={Colors.END}\n") |
| |
| targets_input = input(f"{Colors.CYAN}Enter email addresses (one per line, empty line to finish):{Colors.END}\n") |
| targets = [] |
| while targets_input.strip(): |
| targets.append(targets_input.strip()) |
| targets_input = input() |
| |
| if not targets: |
| self.logger.error("No targets specified") |
| return None |
| |
| emails_per = input(f"\nEmails per target [20]: ") or "20" |
| delay = input(f"Delay between emails in ms [250]: ") or "250" |
| subject = input(f"Subject line [Important notification]: ") or "Important notification" |
| body = input(f"Email body [Default message]: ") or "This is a test message from PyFlood Suite" |
| |
| config = EmailConfig( |
| targets=targets, |
| emails_per_target=int(emails_per), |
| delay=int(delay), |
| subject=subject, |
| body=body |
| ) |
| |
| self.logger.success(f"Email configuration saved: {len(targets)} targets, {emails_per} emails each") |
| return config |
| |
| def start_attack(self, config): |
| if isinstance(config, SteamConfig): |
| attack_id = f"steam_{int(time.time())}" |
| self.simulator.active_attacks[attack_id] = True |
| thread = threading.Thread( |
| target=self.simulator.steam_flood, |
| args=(config, attack_id) |
| ) |
| thread.daemon = True |
| thread.start() |
| self.logger.info(f"Steam attack thread started: {attack_id}") |
| |
| elif isinstance(config, IPConfig): |
| attack_id = f"ip_{int(time.time())}" |
| self.simulator.active_attacks[attack_id] = True |
| thread = threading.Thread( |
| target=self.simulator.ip_flood, |
| args=(config, attack_id) |
| ) |
| thread.daemon = True |
| thread.start() |
| self.logger.info(f"IP attack thread started: {attack_id}") |
| |
| elif isinstance(config, EmailConfig): |
| attack_id = f"email_{int(time.time())}" |
| self.simulator.active_attacks[attack_id] = True |
| thread = threading.Thread( |
| target=self.simulator.email_flood, |
| args=(config, attack_id) |
| ) |
| thread.daemon = True |
| thread.start() |
| self.logger.info(f"Email attack thread started: {attack_id}") |
| |
| def stop_all_attacks(self): |
| count = len(self.simulator.active_attacks) |
| self.simulator.active_attacks.clear() |
| self.logger.warning(f"Stopped {count} active attack(s)") |
| |
| def save_configuration(self, configs: Dict): |
| try: |
| data = {} |
| for key, config in configs.items(): |
| if config: |
| data[key] = asdict(config) |
| |
| with open(self.config_file, 'w') as f: |
| json.dump(data, f, indent=2) |
| self.logger.success(f"Configuration saved to {self.config_file}") |
| except Exception as e: |
| self.logger.error(f"Failed to save config: {e}") |
| |
| def load_configuration(self): |
| try: |
| if not os.path.exists(self.config_file): |
| self.logger.error("No configuration file found") |
| return {} |
| |
| with open(self.config_file, 'r') as f: |
| data = json.load(f) |
| |
| configs = {} |
| if 'steam' in data: |
| configs['steam'] = SteamConfig(**data['steam']) |
| if 'ip' in data: |
| configs['ip'] = IPConfig(**data['ip']) |
| if 'email' in data: |
| configs['email'] = EmailConfig(**data['email']) |
| |
| self.logger.success("Configuration loaded successfully") |
| return configs |
| except Exception as e: |
| self.logger.error(f"Failed to load config: {e}") |
| return {} |
| |
| def show_status(self): |
| print(f"\n{Colors.BOLD}=== System Status ==={Colors.END}") |
| print(f"Active Attacks: {len(self.simulator.active_attacks)}") |
| print(f"Total Requests Sent: {self.simulator.attack_counter}") |
| print(f"Uptime: {time.strftime('%H:%M:%S', time.gmtime(time.time() - self.start_time))}") |
| print(f"Security: AES-256 Encrypted Connection") |
| print("") |
| |
| def run(self): |
| self.start_time = time.time() |
| current_config = {} |
| |
| self.clear_screen() |
| self.print_banner() |
| |
| self.logger.info("PyFlood Suite v1.2.5 initialized") |
| self.logger.info("Security protocol: AES-256 encrypted") |
| self.logger.info("Ready to initiate flood attacks") |
| self.logger.warning("Use responsibly - Educational purposes only") |
| |
| while self.running: |
| self.print_menu() |
| choice = input(f"{Colors.BOLD}pyflood>{Colors.END} ").strip().lower() |
| |
| if choice in ['1', 'steam', 'steam tools']: |
| config = self.steam_tools_menu() |
| if config: |
| current_config['steam'] = config |
| |
| elif choice in ['2', 'ip', 'ip tools']: |
| config = self.ip_tools_menu() |
| if config: |
| current_config['ip'] = config |
| |
| elif choice in ['3', 'email', 'email tools']: |
| config = self.email_tools_menu() |
| if config: |
| current_config['email'] = config |
| |
| elif choice in ['4', 'start', 'start attack']: |
| if not current_config: |
| self.logger.error("No configuration loaded. Configure tools first.") |
| else: |
| print(f"\n{Colors.YELLOW}Available configurations:{Colors.END}") |
| for i, (key, _) in enumerate(current_config.items(), 1): |
| print(f" {i}. {key.upper()}") |
| print(" 0. ALL") |
| |
| attack_choice = input("Select attack to start: ") |
| if attack_choice == '0': |
| for config in current_config.values(): |
| self.start_attack(config) |
| elif attack_choice.isdigit(): |
| idx = int(attack_choice) - 1 |
| if 0 <= idx < len(current_config): |
| key = list(current_config.keys())[idx] |
| self.start_attack(current_config[key]) |
| |
| elif choice in ['5', 'stop', 'stop attack']: |
| self.stop_all_attacks() |
| |
| elif choice in ['6', 'status']: |
| self.show_status() |
| |
| elif choice in ['7', 'save', 'save config']: |
| self.save_configuration(current_config) |
| |
| elif choice in ['8', 'load', 'load config']: |
| loaded = self.load_configuration() |
| current_config.update(loaded) |
| |
| elif choice in ['9', 'clear', 'clear console']: |
| self.clear_screen() |
| self.print_banner() |
| |
| elif choice in ['0', 'exit', 'quit']: |
| self.running = False |
| self.stop_all_attacks() |
| self.logger.info("Shutting down PyFlood Suite...") |
| time.sleep(1) |
| |
| elif choice == 'help': |
| print(f"\n{Colors.CYAN}Detailed Help:{Colors.END}") |
| print(" This tool simulates network testing scenarios.") |
| print(" Configure targets, select attack vectors, and monitor output.") |
| print(" All traffic is simulated for educational demonstration.") |
| print("") |
| |
| else: |
| self.logger.error(f"Unknown command: {choice}") |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='PyFlood Suite - Network Testing Tool') |
| parser.add_argument('--config', '-c', help='Load configuration file') |
| parser.add_argument('--steam', '-s', action='store_true', help='Steam mode') |
| parser.add_argument('--ip', '-i', action='store_true', help='IP mode') |
| parser.add_argument('--email', '-e', action='store_true', help='Email mode') |
| args = parser.parse_args() |
| |
| suite = PyFloodSuite() |
| |
| if args.config: |
| configs = suite.load_configuration() |
| |
| pass |
| |
| try: |
| suite.run() |
| except KeyboardInterrupt: |
| print(f"\n{Colors.YELLOW}Interrupted by user{Colors.END}") |
| suite.stop_all_attacks() |
| print(f"{Colors.GREEN}Cleanup complete. Exiting.{Colors.END}") |
|
|
| if __name__ == "__main__": |
| main() |