import gradio as gr import socket import requests import urllib3 import re import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed import os import tempfile from typing import Dict, List, Tuple, Optional import warnings from collections import defaultdict # Suppress all warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) warnings.filterwarnings('ignore') # GitHub repository details GITHUB_REPO_OWNER = "Wuhpondiscord" GITHUB_REPO_NAME = "ports" # --- CRITICAL FIX: Empty string if files are in repo root --- GITHUB_PORTS_DIR_PATH = "" # The directory within the repo where port files are located. Use "" for root. # Global variable to store parsed port definitions from all files # Format: {filename: {port_num: description}} ALL_GITHUB_PORT_DEFINITIONS: Dict[str, Dict[int, str]] = {} AVAILABLE_PORT_FILES: List[str] = [] # List of filenames from GitHub # --- GitHub File Listing and Caching --- def get_github_directory_contents(owner: str, repo: str, path: str) -> List[Dict]: """ Fetches the contents of a directory in a GitHub repository using the GitHub API. Returns a list of dictionaries, each representing a file or directory. If path is empty, it fetches the root contents. """ # Construct API URL. No trailing slash if path is empty. if path: api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" else: api_url = f"https://api.github.com/repos/{owner}/{repo}/contents" try: print(f"🌐 Fetching GitHub directory listing from: {api_url}") response = requests.get(api_url, timeout=10) response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) return response.json() except requests.exceptions.RequestException as e: print(f"⚠️ Error fetching GitHub directory listing for '{path or 'repo root'}': {e}") return [] def parse_single_port_file_content(content: str) -> Dict[int, str]: """ Parses the content of a single port definition file. Returns a dictionary of {port: description}. """ single_file_port_map = {} service_ports = defaultdict(list) # For multi-port detection within this file's context # Strategy 1: Continuous format parser continuous_pattern = r'(\d+)\s*-\s*([A-Za-z][\w\s\-\.\(\)]*?)(?=\d+\s*-|$)' matches = re.findall(continuous_pattern, content, re.MULTILINE) for port_str, desc in matches: try: port_num = int(port_str) if 1 <= port_num <= 65535: desc = desc.strip().strip('-').strip() desc = re.sub(r'\s+', ' ', desc) if desc: base_service = desc.split('-')[0].strip() service_ports[base_service].append(port_num) single_file_port_map[port_num] = desc except ValueError: continue # Strategy 2: Line-by-line format for line in content.split('\n'): line = line.strip() if not line or line.startswith('#'): continue if re.match(r'^\d+\s*-\s*[A-Za-z]', line): # Skip if already matched by continuous pattern continue patterns = [ r'^(.+?)\s*[:=]\s*(\d+)', # e.g., MySQL: 3306 r'port\s*[:=]?\s*(\d+)\s+(.+)', # e.g., port 22 SSH r'(\d+)\s*/\s*\w+\s+(.+)', # e.g., 22/tcp SSH r'^(\d+)\s+(.+)$', # e.g., 22 SSH ] for pattern in patterns: match = re.match(pattern, line, re.IGNORECASE) if match: groups = match.groups() if groups[0].isdigit(): port_str, desc = groups[0], groups[1] else: desc, port_str = groups[0], groups[1] try: port_num = int(port_str) if 1 <= port_num <= 65535: desc = re.sub(r'[^\w\s\-\.]+', '', desc).strip() desc = re.sub(r'\s+', ' ', desc) if desc: base_service = desc.split('-')[0].strip() service_ports[base_service].append(port_num) single_file_port_map[port_num] = desc break except ValueError: continue # Add multi-port info to descriptions (within this single file's context) for service, ports in service_ports.items(): if len(ports) > 1: ports_sorted = sorted(ports) for port in ports_sorted: if port in single_file_port_map: current_desc = single_file_port_map[port] if "also on" not in current_desc.lower(): other_ports = [str(p) for p in ports_sorted if p != port] if other_ports: single_file_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})" return single_file_port_map def cache_all_github_port_files(): """ Fetches and parses all port definition files from GitHub, storing them in the global ALL_GITHUB_PORT_DEFINITIONS and AVAILABLE_PORT_FILES. """ global ALL_GITHUB_PORT_DEFINITIONS, AVAILABLE_PORT_FILES ALL_GITHUB_PORT_DEFINITIONS.clear() AVAILABLE_PORT_FILES.clear() # Pass GITHUB_PORTS_DIR_PATH (which is now empty for root) repo_contents = get_github_directory_contents(GITHUB_REPO_OWNER, GITHUB_REPO_NAME, GITHUB_PORTS_DIR_PATH) port_files_to_fetch = [] for item in repo_contents: if item.get("type") == "file" and item.get("name", "").endswith(('.txt', '.csv', '.conf', '.list')): port_files_to_fetch.append((item['name'], item['download_url'])) if not port_files_to_fetch: print(f"⚠️ No port definition files found in '{GITHUB_PORTS_DIR_PATH or 'repository root'}' on GitHub.") return print(f"✅ Found {len(port_files_to_fetch)} port definition files to cache.") for filename, file_url in port_files_to_fetch: try: print(f"🌐 Caching {filename} from {file_url}") response = requests.get(file_url, timeout=10) response.raise_for_status() content = response.text ALL_GITHUB_PORT_DEFINITIONS[filename] = parse_single_port_file_content(content) AVAILABLE_PORT_FILES.append(filename) print(f"✅ Cached {filename} with {len(ALL_GITHUB_PORT_DEFINITIONS[filename])} ports.") except requests.exceptions.RequestException as e: print(f"⚠️ Error fetching {filename} for caching: {e}") except Exception as e: print(f"⚠️ Error parsing {filename} for caching: {e}") AVAILABLE_PORT_FILES.sort() # Keep the list sorted for UI print(f"Finished caching. Total {len(AVAILABLE_PORT_FILES)} files available.") # --- Dynamic Port Map Construction --- def get_selected_port_map(selected_files: List[str]) -> Dict[int, str]: """ Combines port definitions from selected files into a single port_map. """ combined_port_map = {80: "HTTP", 443: "HTTPS"} # Always include defaults all_service_ports = defaultdict(list) # To track multi-port across all selected files if not selected_files: print("No port files selected. Using default ports (80, 443).") return combined_port_map for filename in selected_files: if filename in ALL_GITHUB_PORT_DEFINITIONS: file_ports = ALL_GITHUB_PORT_DEFINITIONS[filename] for port_num, desc in file_ports.items(): if port_num not in combined_port_map: combined_port_map[port_num] = desc else: # If port exists, append new description if different, or choose more verbose existing_desc = combined_port_map[port_num] if desc != existing_desc and len(desc) > len(existing_desc): combined_port_map[port_num] = desc # Update service_ports for multi-port detection across combined map base_service = desc.split('-')[0].strip() if port_num not in all_service_ports[base_service]: all_service_ports[base_service].append(port_num) # Re-apply multi-port info based on the combined map for service, ports in all_service_ports.items(): if len(ports) > 1: ports_sorted = sorted(ports) for port in ports_sorted: if port in combined_port_map: current_desc = combined_port_map[port] if "also on" not in current_desc.lower(): other_ports = [str(p) for p in ports_sorted if p != port] if other_ports: combined_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})" print(f"Generated port map from {len(selected_files)} selected files with {len(combined_port_map)} total port definitions.") return combined_port_map # --- IP Extraction --- def extract_ips(file_path: str) -> List[str]: """Enhanced IP extractor with validation.""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() found_ips = set(re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content)) valid_ips = [] for ip in found_ips: octets = [int(x) for x in ip.split('.')] # Basic validation: octets must be 0-255 if any(o < 0 or o > 255 for o in octets): continue # Filter out private IPs, localhost, and broadcast addresses (IPv4) if (octets[0] == 0 or # 0.0.0.0/8 octets[0] == 10 or # 10.0.0.0/8 (octets[0] == 172 and 16 <= octets[1] <= 31) or # 172.16.0.0/12 (octets[0] == 192 and octets[1] == 168) or # 192.168.0.0/16 (octets[0] == 169 and octets[1] == 254) or # 169.254.0.0/16 (APIPA) octets[0] == 127 or # 127.0.0.0/8 (Loopback) all(o == 255 for o in octets)): # 255.255.255.255 (Broadcast) continue valid_ips.append(ip) return sorted(valid_ips) except Exception as e: print(f"Error extracting IPs: {e}") return [] # --- C2 Detection --- C2_SIGNATURES = { "Cobalt Strike": ["404 Not Found", "application/ocsp-response", "BeEF", "cobaltstrike"], "Metasploit": ["Metasploit", "Mettle", "meterpreter"], "Covenant": ["covenant", "GruntHTTP", "Auth/Login"], "Empire": ["Empire", "session_id", "admin/login.php"], "Sliver": ["sliver", "implant"], "Mythic": ["mythic", "agent_message"] } def probe_target(ip: str, port: int, port_desc: str) -> Optional[Dict]: """Enhanced probe with better banner detection and multiple protocol attempts.""" sock = None try: # Port connectivity check sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1.2) result = sock.connect_ex((ip, port)) if result != 0: return None # Try to get raw banner first (works for many services) banner = "N/A" try: sock.send(b'\r\n') raw_banner = sock.recv(1024).decode('utf-8', errors='ignore').strip() if raw_banner and len(raw_banner) > 3: banner = raw_banner[:200] # First 200 chars except: pass finally: if sock: # Ensure socket is closed after raw banner attempt sock.close() sock = None # Try HTTP/HTTPS for protocol in ["https", "http"]: # Smart protocol selection: don't aggressively try HTTPS on non-standard ports if protocol == "https" and port not in [443, 8443, 2083, 2087, 9443, 8000, 4443]: # Skip HTTPS for low ports unless they are common HTTPS ports if port < 8000 and port != 443: continue try: resp = requests.get( f"{protocol}://{ip}:{port}", timeout=1.8, verify=False, # Disable SSL verification for self-signed certs allow_redirects=False, headers={'User-Agent': 'Mozilla/5.0'} ) # Get server banner server = resp.headers.get('Server', '') powered_by = resp.headers.get('X-Powered-By', '') if server or powered_by: banner = f"{server} {powered_by}".strip() elif banner == "N/A": banner = f"HTTP {resp.status_code}" content = resp.text[:5000] # Limit content to avoid memory issues category = f"Web ({port_desc})" # C2 Detection for name, sigs in C2_SIGNATURES.items(): if any(sig.lower() in content.lower() or sig.lower() in str(resp.headers).lower() for sig in sigs): category = f"🚨 POTENTIAL {name}" break return { "IP": ip, "Port": port, "Service": port_desc, "Type": category, "Banner": banner } except requests.exceptions.SSLError: # SSL error on HTTP, try HTTPS if protocol == "http": continue else: # SSL error on HTTPS, break and mark as open break except requests.exceptions.RequestException: # Catch all request errors continue # Port is open but not HTTP or encountered issues. Return with raw banner if available. return { "IP": ip, "Port": port, "Service": port_desc, "Type": "Open (Non-HTTP)", "Banner": banner if banner != "N/A" else "Unknown" } except Exception as e: # print(f"Error probing {ip}:{port}: {e}") # Uncomment for debugging return None finally: if sock: try: sock.close() except: pass def start_analysis(file_obj, max_threads: int, selected_port_files: List[str], progress=gr.Progress()): """Main analysis with pure threading.""" if file_obj is None: return pd.DataFrame([{"Error": "No file uploaded"}]), None if not selected_port_files: return pd.DataFrame([{"Error": "Please select at least one port list to scan."}]), None try: progress(0, desc="📋 Generating port map from selected lists...") port_map = get_selected_port_map(selected_port_files) progress(0.1, desc="🔍 Extracting IP addresses...") ips = extract_ips(file_obj.name) if not ips: return pd.DataFrame([{"Error": "No public IPs found in file."}]), None scan_list = [] for ip in ips: for port, desc in port_map.items(): scan_list.append((ip, port, desc)) total = len(scan_list) progress(0.2, desc=f"🎯 Scanning {len(ips)} IPs × {len(port_map)} ports = {total} probes...") results = [] with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = { executor.submit(probe_target, ip, port, desc): (ip, port) for ip, port, desc in scan_list } completed = 0 for future in as_completed(futures): result = future.result() if result: results.append(result) completed += 1 if completed % 50 == 0 or completed == total: progress(0.2 + (0.7 * completed / total), desc=f"⚡ {completed}/{total} • Found {len(results)} active") progress(0.9, desc="📊 Generating report...") if not results: return pd.DataFrame([{"Result": "No active services found."}]), None df = pd.DataFrame(results) df['_threat'] = df['Type'].str.contains('POTENTIAL', case=False, na=False) df = df.sort_values(['_threat', 'IP', 'Port'], ascending=[False, True, True]) df = df.drop('_threat', axis=1) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as tmp: df.to_csv(tmp.name, index=False) csv_path = tmp.name progress(1.0, desc="✅ Complete!") return df, csv_path except Exception as e: return pd.DataFrame([{"Error": f"Analysis failed: {str(e)}"}]), None # --- Gradio UI --- custom_css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 2rem; border-radius: 12px; color: white; margin-bottom: 2rem; box-shadow: 0 8px 16px rgba(0,0,0,0.1); } .main-header h1 { margin: 0; font-size: 2.2rem; font-weight: 700; } .main-header p { margin: 0.5rem 0 0 0; opacity: 0.95; } """ # --- CRITICAL CHANGE: Cache port files BEFORE Gradio UI is defined --- # This ensures AVAILABLE_PORT_FILES is populated when gr.CheckboxGroup is instantiated cache_all_github_port_files() demo = gr.Blocks(title="C2 Deep Scanner Pro", css=custom_css) with demo: gr.HTML("""

🕵️‍♂️ C2 Infrastructure Deep Scanner Pro

Multi-threaded network reconnaissance with enhanced banner detection

""") # Removed the large feature/input files markdown sections gr.Markdown("---") with gr.Row(): with gr.Column(scale=2): file_input = gr.File( label="📤 Upload Target File", file_types=['.txt', '.json', '.log', '.csv'] ) max_threads = gr.Slider( minimum=30, maximum=100, value=60, step=10, label="⚙️ Concurrent Threads", info="60-80 recommended for best speed" ) run_btn = gr.Button( "🚀 Start Deep Analysis", variant="primary", size="lg" ) with gr.Column(scale=1): # Port List Selector # Update label to reflect root directory port_file_selector = gr.CheckboxGroup( choices=AVAILABLE_PORT_FILES, # Now populated value=AVAILABLE_PORT_FILES, # Select all by default, now populated label="🔧 Select Port Lists (from repository root on GitHub)", info="Choose which port definition files to use for scanning." ) gr.Markdown(""" **Port Format Support:** - **Continuous** (e.g., `135 - MS SQL1433 - MSSQL`) - **Standard formats** (e.g., `80 - HTTP`, `MySQL: 3306`, `22/tcp SSH`) Multi-port services shown as: `MySQL (also on 3307,3308)` """) gr.Markdown("---") gr.Markdown("### 📊 Scan Results") output_table = gr.Dataframe( label="Detected Infrastructure", wrap=True ) download_btn = gr.File(label="💾 Download CSV Report") run_btn.click( fn=start_analysis, inputs=[file_input, max_threads, port_file_selector], # Added port_file_selector outputs=[output_table, download_btn] ) gr.Markdown(""" --- 💡 **Banner Info:** Shows Server headers, X-Powered-By, or raw socket responses 🚨 **Threats:** Results marked with 🚨 indicate potential C2 infrastructure 📌 **Multi-Port:** Services on multiple ports show related ports in parentheses """) if __name__ == "__main__": demo.queue(max_size=10) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, theme=gr.themes.Soft(), ssr_mode=False )