| import gradio as gr |
| import socket |
| import requests |
| import urllib3 |
| import re |
| import pandas as pd |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import os |
| import tempfile |
| from typing import Dict, List, Tuple, Optional |
| import warnings |
| from collections import defaultdict |
|
|
| |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| warnings.filterwarnings('ignore') |
|
|
| |
| GITHUB_REPO_OWNER = "Wuhpondiscord" |
| GITHUB_REPO_NAME = "ports" |
| |
| GITHUB_PORTS_DIR_PATH = "" |
|
|
| |
| |
| ALL_GITHUB_PORT_DEFINITIONS: Dict[str, Dict[int, str]] = {} |
| AVAILABLE_PORT_FILES: List[str] = [] |
|
|
| |
|
|
| def get_github_directory_contents(owner: str, repo: str, path: str) -> List[Dict]: |
| """ |
| Fetches the contents of a directory in a GitHub repository using the GitHub API. |
| Returns a list of dictionaries, each representing a file or directory. |
| If path is empty, it fetches the root contents. |
| """ |
| |
| if path: |
| api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" |
| else: |
| api_url = f"https://api.github.com/repos/{owner}/{repo}/contents" |
|
|
| try: |
| print(f"π Fetching GitHub directory listing from: {api_url}") |
| response = requests.get(api_url, timeout=10) |
| response.raise_for_status() |
| return response.json() |
| except requests.exceptions.RequestException as e: |
| print(f"β οΈ Error fetching GitHub directory listing for '{path or 'repo root'}': {e}") |
| return [] |
|
|
| def parse_single_port_file_content(content: str) -> Dict[int, str]: |
| """ |
| Parses the content of a single port definition file. |
| Returns a dictionary of {port: description}. |
| """ |
| single_file_port_map = {} |
| service_ports = defaultdict(list) |
|
|
| |
| continuous_pattern = r'(\d+)\s*-\s*([A-Za-z][\w\s\-\.\(\)]*?)(?=\d+\s*-|$)' |
| matches = re.findall(continuous_pattern, content, re.MULTILINE) |
| |
| for port_str, desc in matches: |
| try: |
| port_num = int(port_str) |
| if 1 <= port_num <= 65535: |
| desc = desc.strip().strip('-').strip() |
| desc = re.sub(r'\s+', ' ', desc) |
| if desc: |
| base_service = desc.split('-')[0].strip() |
| service_ports[base_service].append(port_num) |
| single_file_port_map[port_num] = desc |
| except ValueError: |
| continue |
| |
| |
| for line in content.split('\n'): |
| line = line.strip() |
| if not line or line.startswith('#'): |
| continue |
| |
| if re.match(r'^\d+\s*-\s*[A-Za-z]', line): |
| continue |
| |
| patterns = [ |
| r'^(.+?)\s*[:=]\s*(\d+)', |
| r'port\s*[:=]?\s*(\d+)\s+(.+)', |
| r'(\d+)\s*/\s*\w+\s+(.+)', |
| r'^(\d+)\s+(.+)$', |
| ] |
| |
| for pattern in patterns: |
| match = re.match(pattern, line, re.IGNORECASE) |
| if match: |
| groups = match.groups() |
| |
| if groups[0].isdigit(): |
| port_str, desc = groups[0], groups[1] |
| else: |
| desc, port_str = groups[0], groups[1] |
| |
| try: |
| port_num = int(port_str) |
| if 1 <= port_num <= 65535: |
| desc = re.sub(r'[^\w\s\-\.]+', '', desc).strip() |
| desc = re.sub(r'\s+', ' ', desc) |
| if desc: |
| base_service = desc.split('-')[0].strip() |
| service_ports[base_service].append(port_num) |
| single_file_port_map[port_num] = desc |
| break |
| except ValueError: |
| continue |
| |
| |
| for service, ports in service_ports.items(): |
| if len(ports) > 1: |
| ports_sorted = sorted(ports) |
| for port in ports_sorted: |
| if port in single_file_port_map: |
| current_desc = single_file_port_map[port] |
| if "also on" not in current_desc.lower(): |
| other_ports = [str(p) for p in ports_sorted if p != port] |
| if other_ports: |
| single_file_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})" |
| |
| return single_file_port_map |
|
|
| def cache_all_github_port_files(): |
| """ |
| Fetches and parses all port definition files from GitHub, storing them in |
| the global ALL_GITHUB_PORT_DEFINITIONS and AVAILABLE_PORT_FILES. |
| """ |
| global ALL_GITHUB_PORT_DEFINITIONS, AVAILABLE_PORT_FILES |
| ALL_GITHUB_PORT_DEFINITIONS.clear() |
| AVAILABLE_PORT_FILES.clear() |
|
|
| |
| repo_contents = get_github_directory_contents(GITHUB_REPO_OWNER, GITHUB_REPO_NAME, GITHUB_PORTS_DIR_PATH) |
| |
| port_files_to_fetch = [] |
| for item in repo_contents: |
| if item.get("type") == "file" and item.get("name", "").endswith(('.txt', '.csv', '.conf', '.list')): |
| port_files_to_fetch.append((item['name'], item['download_url'])) |
|
|
| if not port_files_to_fetch: |
| print(f"β οΈ No port definition files found in '{GITHUB_PORTS_DIR_PATH or 'repository root'}' on GitHub.") |
| return |
|
|
| print(f"β
Found {len(port_files_to_fetch)} port definition files to cache.") |
|
|
| for filename, file_url in port_files_to_fetch: |
| try: |
| print(f"π Caching {filename} from {file_url}") |
| response = requests.get(file_url, timeout=10) |
| response.raise_for_status() |
| content = response.text |
| ALL_GITHUB_PORT_DEFINITIONS[filename] = parse_single_port_file_content(content) |
| AVAILABLE_PORT_FILES.append(filename) |
| print(f"β
Cached {filename} with {len(ALL_GITHUB_PORT_DEFINITIONS[filename])} ports.") |
| except requests.exceptions.RequestException as e: |
| print(f"β οΈ Error fetching {filename} for caching: {e}") |
| except Exception as e: |
| print(f"β οΈ Error parsing {filename} for caching: {e}") |
|
|
| AVAILABLE_PORT_FILES.sort() |
| print(f"Finished caching. Total {len(AVAILABLE_PORT_FILES)} files available.") |
|
|
| |
|
|
| def get_selected_port_map(selected_files: List[str]) -> Dict[int, str]: |
| """ |
| Combines port definitions from selected files into a single port_map. |
| """ |
| combined_port_map = {80: "HTTP", 443: "HTTPS"} |
| all_service_ports = defaultdict(list) |
|
|
| if not selected_files: |
| print("No port files selected. Using default ports (80, 443).") |
| return combined_port_map |
|
|
| for filename in selected_files: |
| if filename in ALL_GITHUB_PORT_DEFINITIONS: |
| file_ports = ALL_GITHUB_PORT_DEFINITIONS[filename] |
| for port_num, desc in file_ports.items(): |
| if port_num not in combined_port_map: |
| combined_port_map[port_num] = desc |
| else: |
| |
| existing_desc = combined_port_map[port_num] |
| if desc != existing_desc and len(desc) > len(existing_desc): |
| combined_port_map[port_num] = desc |
| |
| |
| base_service = desc.split('-')[0].strip() |
| if port_num not in all_service_ports[base_service]: |
| all_service_ports[base_service].append(port_num) |
|
|
| |
| for service, ports in all_service_ports.items(): |
| if len(ports) > 1: |
| ports_sorted = sorted(ports) |
| for port in ports_sorted: |
| if port in combined_port_map: |
| current_desc = combined_port_map[port] |
| if "also on" not in current_desc.lower(): |
| other_ports = [str(p) for p in ports_sorted if p != port] |
| if other_ports: |
| combined_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})" |
|
|
| print(f"Generated port map from {len(selected_files)} selected files with {len(combined_port_map)} total port definitions.") |
| return combined_port_map |
|
|
| |
|
|
| def extract_ips(file_path: str) -> List[str]: |
| """Enhanced IP extractor with validation.""" |
| try: |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
| content = f.read() |
| |
| found_ips = set(re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content)) |
| |
| valid_ips = [] |
| for ip in found_ips: |
| octets = [int(x) for x in ip.split('.')] |
| |
| |
| if any(o < 0 or o > 255 for o in octets): |
| continue |
| |
| |
| if (octets[0] == 0 or |
| octets[0] == 10 or |
| (octets[0] == 172 and 16 <= octets[1] <= 31) or |
| (octets[0] == 192 and octets[1] == 168) or |
| (octets[0] == 169 and octets[1] == 254) or |
| octets[0] == 127 or |
| all(o == 255 for o in octets)): |
| continue |
| |
| valid_ips.append(ip) |
| |
| return sorted(valid_ips) |
| except Exception as e: |
| print(f"Error extracting IPs: {e}") |
| return [] |
|
|
|
|
| |
|
|
| C2_SIGNATURES = { |
| "Cobalt Strike": ["404 Not Found", "application/ocsp-response", "BeEF", "cobaltstrike"], |
| "Metasploit": ["Metasploit", "Mettle", "meterpreter"], |
| "Covenant": ["covenant", "GruntHTTP", "Auth/Login"], |
| "Empire": ["Empire", "session_id", "admin/login.php"], |
| "Sliver": ["sliver", "implant"], |
| "Mythic": ["mythic", "agent_message"] |
| } |
|
|
|
|
| def probe_target(ip: str, port: int, port_desc: str) -> Optional[Dict]: |
| """Enhanced probe with better banner detection and multiple protocol attempts.""" |
| sock = None |
| try: |
| |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.settimeout(1.2) |
| result = sock.connect_ex((ip, port)) |
| |
| if result != 0: |
| return None |
| |
| |
| banner = "N/A" |
| try: |
| sock.send(b'\r\n') |
| raw_banner = sock.recv(1024).decode('utf-8', errors='ignore').strip() |
| if raw_banner and len(raw_banner) > 3: |
| banner = raw_banner[:200] |
| except: |
| pass |
| finally: |
| if sock: |
| sock.close() |
| sock = None |
| |
| |
| for protocol in ["https", "http"]: |
| |
| if protocol == "https" and port not in [443, 8443, 2083, 2087, 9443, 8000, 4443]: |
| |
| if port < 8000 and port != 443: |
| continue |
| |
| try: |
| resp = requests.get( |
| f"{protocol}://{ip}:{port}", |
| timeout=1.8, |
| verify=False, |
| allow_redirects=False, |
| headers={'User-Agent': 'Mozilla/5.0'} |
| ) |
| |
| |
| server = resp.headers.get('Server', '') |
| powered_by = resp.headers.get('X-Powered-By', '') |
| |
| if server or powered_by: |
| banner = f"{server} {powered_by}".strip() |
| elif banner == "N/A": |
| banner = f"HTTP {resp.status_code}" |
| |
| content = resp.text[:5000] |
| |
| category = f"Web ({port_desc})" |
| |
| |
| for name, sigs in C2_SIGNATURES.items(): |
| if any(sig.lower() in content.lower() or |
| sig.lower() in str(resp.headers).lower() for sig in sigs): |
| category = f"π¨ POTENTIAL {name}" |
| break |
| |
| return { |
| "IP": ip, |
| "Port": port, |
| "Service": port_desc, |
| "Type": category, |
| "Banner": banner |
| } |
| except requests.exceptions.SSLError: |
| |
| if protocol == "http": |
| continue |
| else: |
| break |
| except requests.exceptions.RequestException: |
| continue |
| |
| |
| return { |
| "IP": ip, |
| "Port": port, |
| "Service": port_desc, |
| "Type": "Open (Non-HTTP)", |
| "Banner": banner if banner != "N/A" else "Unknown" |
| } |
| |
| except Exception as e: |
| |
| return None |
| finally: |
| if sock: |
| try: |
| sock.close() |
| except: |
| pass |
|
|
|
|
| def start_analysis(file_obj, max_threads: int, selected_port_files: List[str], progress=gr.Progress()): |
| """Main analysis with pure threading.""" |
| if file_obj is None: |
| return pd.DataFrame([{"Error": "No file uploaded"}]), None |
| |
| if not selected_port_files: |
| return pd.DataFrame([{"Error": "Please select at least one port list to scan."}]), None |
|
|
| try: |
| progress(0, desc="π Generating port map from selected lists...") |
| port_map = get_selected_port_map(selected_port_files) |
| |
| progress(0.1, desc="π Extracting IP addresses...") |
| ips = extract_ips(file_obj.name) |
| |
| if not ips: |
| return pd.DataFrame([{"Error": "No public IPs found in file."}]), None |
| |
| scan_list = [] |
| for ip in ips: |
| for port, desc in port_map.items(): |
| scan_list.append((ip, port, desc)) |
| |
| total = len(scan_list) |
| progress(0.2, desc=f"π― Scanning {len(ips)} IPs Γ {len(port_map)} ports = {total} probes...") |
| |
| results = [] |
| |
| with ThreadPoolExecutor(max_workers=max_threads) as executor: |
| futures = { |
| executor.submit(probe_target, ip, port, desc): (ip, port) |
| for ip, port, desc in scan_list |
| } |
| |
| completed = 0 |
| for future in as_completed(futures): |
| result = future.result() |
| if result: |
| results.append(result) |
| |
| completed += 1 |
| if completed % 50 == 0 or completed == total: |
| progress(0.2 + (0.7 * completed / total), |
| desc=f"β‘ {completed}/{total} β’ Found {len(results)} active") |
| |
| progress(0.9, desc="π Generating report...") |
| |
| if not results: |
| return pd.DataFrame([{"Result": "No active services found."}]), None |
| |
| df = pd.DataFrame(results) |
| df['_threat'] = df['Type'].str.contains('POTENTIAL', case=False, na=False) |
| df = df.sort_values(['_threat', 'IP', 'Port'], ascending=[False, True, True]) |
| df = df.drop('_threat', axis=1) |
| |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as tmp: |
| df.to_csv(tmp.name, index=False) |
| csv_path = tmp.name |
| |
| progress(1.0, desc="β
Complete!") |
| return df, csv_path |
| |
| except Exception as e: |
| return pd.DataFrame([{"Error": f"Analysis failed: {str(e)}"}]), None |
|
|
|
|
| |
|
|
| custom_css = """ |
| .gradio-container { |
| font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; |
| } |
| .main-header { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| padding: 2rem; |
| border-radius: 12px; |
| color: white; |
| margin-bottom: 2rem; |
| box-shadow: 0 8px 16px rgba(0,0,0,0.1); |
| } |
| .main-header h1 { |
| margin: 0; |
| font-size: 2.2rem; |
| font-weight: 700; |
| } |
| .main-header p { |
| margin: 0.5rem 0 0 0; |
| opacity: 0.95; |
| } |
| """ |
|
|
| |
| |
| cache_all_github_port_files() |
|
|
| demo = gr.Blocks(title="C2 Deep Scanner Pro", css=custom_css) |
|
|
| with demo: |
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>π΅οΈββοΈ C2 Infrastructure Deep Scanner Pro</h1> |
| <p>Multi-threaded network reconnaissance with enhanced banner detection</p> |
| </div> |
| """) |
| |
| |
| |
| gr.Markdown("---") |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| file_input = gr.File( |
| label="π€ Upload Target File", |
| file_types=['.txt', '.json', '.log', '.csv'] |
| ) |
| |
| max_threads = gr.Slider( |
| minimum=30, |
| maximum=100, |
| value=60, |
| step=10, |
| label="βοΈ Concurrent Threads", |
| info="60-80 recommended for best speed" |
| ) |
| |
| run_btn = gr.Button( |
| "π Start Deep Analysis", |
| variant="primary", |
| size="lg" |
| ) |
| |
| with gr.Column(scale=1): |
| |
| |
| port_file_selector = gr.CheckboxGroup( |
| choices=AVAILABLE_PORT_FILES, |
| value=AVAILABLE_PORT_FILES, |
| label="π§ Select Port Lists (from repository root on GitHub)", |
| info="Choose which port definition files to use for scanning." |
| ) |
| gr.Markdown(""" |
| **Port Format Support:** |
| - **Continuous** (e.g., `135 - MS SQL1433 - MSSQL`) |
| - **Standard formats** (e.g., `80 - HTTP`, `MySQL: 3306`, `22/tcp SSH`) |
| |
| Multi-port services shown as: `MySQL (also on 3307,3308)` |
| """) |
| |
| gr.Markdown("---") |
| gr.Markdown("### π Scan Results") |
| |
| output_table = gr.Dataframe( |
| label="Detected Infrastructure", |
| wrap=True |
| ) |
| |
| download_btn = gr.File(label="πΎ Download CSV Report") |
| |
| run_btn.click( |
| fn=start_analysis, |
| inputs=[file_input, max_threads, port_file_selector], |
| outputs=[output_table, download_btn] |
| ) |
| |
| gr.Markdown(""" |
| --- |
| π‘ **Banner Info:** Shows Server headers, X-Powered-By, or raw socket responses |
| π¨ **Threats:** Results marked with π¨ indicate potential C2 infrastructure |
| π **Multi-Port:** Services on multiple ports show related ports in parentheses |
| """) |
|
|
|
|
| if __name__ == "__main__": |
| demo.queue(max_size=10) |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| show_error=True, |
| theme=gr.themes.Soft(), |
| ssr_mode=False |
| ) |