scanner / app.py
wuhp's picture
Update app.py
f8e29da verified
import gradio as gr
import socket
import requests
import urllib3
import re
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import tempfile
from typing import Dict, List, Tuple, Optional
import warnings
from collections import defaultdict
# Suppress all warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore')
# GitHub repository details
GITHUB_REPO_OWNER = "Wuhpondiscord"
GITHUB_REPO_NAME = "ports"
# --- CRITICAL FIX: Empty string if files are in repo root ---
GITHUB_PORTS_DIR_PATH = "" # The directory within the repo where port files are located. Use "" for root.
# Global variable to store parsed port definitions from all files
# Format: {filename: {port_num: description}}
ALL_GITHUB_PORT_DEFINITIONS: Dict[str, Dict[int, str]] = {}
AVAILABLE_PORT_FILES: List[str] = [] # List of filenames from GitHub
# --- GitHub File Listing and Caching ---
def get_github_directory_contents(owner: str, repo: str, path: str) -> List[Dict]:
"""
Fetches the contents of a directory in a GitHub repository using the GitHub API.
Returns a list of dictionaries, each representing a file or directory.
If path is empty, it fetches the root contents.
"""
# Construct API URL. No trailing slash if path is empty.
if path:
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
else:
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents"
try:
print(f"🌐 Fetching GitHub directory listing from: {api_url}")
response = requests.get(api_url, timeout=10)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
return response.json()
except requests.exceptions.RequestException as e:
print(f"⚠️ Error fetching GitHub directory listing for '{path or 'repo root'}': {e}")
return []
def parse_single_port_file_content(content: str) -> Dict[int, str]:
"""
Parses the content of a single port definition file.
Returns a dictionary of {port: description}.
"""
single_file_port_map = {}
service_ports = defaultdict(list) # For multi-port detection within this file's context
# Strategy 1: Continuous format parser
continuous_pattern = r'(\d+)\s*-\s*([A-Za-z][\w\s\-\.\(\)]*?)(?=\d+\s*-|$)'
matches = re.findall(continuous_pattern, content, re.MULTILINE)
for port_str, desc in matches:
try:
port_num = int(port_str)
if 1 <= port_num <= 65535:
desc = desc.strip().strip('-').strip()
desc = re.sub(r'\s+', ' ', desc)
if desc:
base_service = desc.split('-')[0].strip()
service_ports[base_service].append(port_num)
single_file_port_map[port_num] = desc
except ValueError:
continue
# Strategy 2: Line-by-line format
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
if re.match(r'^\d+\s*-\s*[A-Za-z]', line): # Skip if already matched by continuous pattern
continue
patterns = [
r'^(.+?)\s*[:=]\s*(\d+)', # e.g., MySQL: 3306
r'port\s*[:=]?\s*(\d+)\s+(.+)', # e.g., port 22 SSH
r'(\d+)\s*/\s*\w+\s+(.+)', # e.g., 22/tcp SSH
r'^(\d+)\s+(.+)$', # e.g., 22 SSH
]
for pattern in patterns:
match = re.match(pattern, line, re.IGNORECASE)
if match:
groups = match.groups()
if groups[0].isdigit():
port_str, desc = groups[0], groups[1]
else:
desc, port_str = groups[0], groups[1]
try:
port_num = int(port_str)
if 1 <= port_num <= 65535:
desc = re.sub(r'[^\w\s\-\.]+', '', desc).strip()
desc = re.sub(r'\s+', ' ', desc)
if desc:
base_service = desc.split('-')[0].strip()
service_ports[base_service].append(port_num)
single_file_port_map[port_num] = desc
break
except ValueError:
continue
# Add multi-port info to descriptions (within this single file's context)
for service, ports in service_ports.items():
if len(ports) > 1:
ports_sorted = sorted(ports)
for port in ports_sorted:
if port in single_file_port_map:
current_desc = single_file_port_map[port]
if "also on" not in current_desc.lower():
other_ports = [str(p) for p in ports_sorted if p != port]
if other_ports:
single_file_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})"
return single_file_port_map
def cache_all_github_port_files():
"""
Fetches and parses all port definition files from GitHub, storing them in
the global ALL_GITHUB_PORT_DEFINITIONS and AVAILABLE_PORT_FILES.
"""
global ALL_GITHUB_PORT_DEFINITIONS, AVAILABLE_PORT_FILES
ALL_GITHUB_PORT_DEFINITIONS.clear()
AVAILABLE_PORT_FILES.clear()
# Pass GITHUB_PORTS_DIR_PATH (which is now empty for root)
repo_contents = get_github_directory_contents(GITHUB_REPO_OWNER, GITHUB_REPO_NAME, GITHUB_PORTS_DIR_PATH)
port_files_to_fetch = []
for item in repo_contents:
if item.get("type") == "file" and item.get("name", "").endswith(('.txt', '.csv', '.conf', '.list')):
port_files_to_fetch.append((item['name'], item['download_url']))
if not port_files_to_fetch:
print(f"⚠️ No port definition files found in '{GITHUB_PORTS_DIR_PATH or 'repository root'}' on GitHub.")
return
print(f"βœ… Found {len(port_files_to_fetch)} port definition files to cache.")
for filename, file_url in port_files_to_fetch:
try:
print(f"🌐 Caching {filename} from {file_url}")
response = requests.get(file_url, timeout=10)
response.raise_for_status()
content = response.text
ALL_GITHUB_PORT_DEFINITIONS[filename] = parse_single_port_file_content(content)
AVAILABLE_PORT_FILES.append(filename)
print(f"βœ… Cached {filename} with {len(ALL_GITHUB_PORT_DEFINITIONS[filename])} ports.")
except requests.exceptions.RequestException as e:
print(f"⚠️ Error fetching {filename} for caching: {e}")
except Exception as e:
print(f"⚠️ Error parsing {filename} for caching: {e}")
AVAILABLE_PORT_FILES.sort() # Keep the list sorted for UI
print(f"Finished caching. Total {len(AVAILABLE_PORT_FILES)} files available.")
# --- Dynamic Port Map Construction ---
def get_selected_port_map(selected_files: List[str]) -> Dict[int, str]:
"""
Combines port definitions from selected files into a single port_map.
"""
combined_port_map = {80: "HTTP", 443: "HTTPS"} # Always include defaults
all_service_ports = defaultdict(list) # To track multi-port across all selected files
if not selected_files:
print("No port files selected. Using default ports (80, 443).")
return combined_port_map
for filename in selected_files:
if filename in ALL_GITHUB_PORT_DEFINITIONS:
file_ports = ALL_GITHUB_PORT_DEFINITIONS[filename]
for port_num, desc in file_ports.items():
if port_num not in combined_port_map:
combined_port_map[port_num] = desc
else:
# If port exists, append new description if different, or choose more verbose
existing_desc = combined_port_map[port_num]
if desc != existing_desc and len(desc) > len(existing_desc):
combined_port_map[port_num] = desc
# Update service_ports for multi-port detection across combined map
base_service = desc.split('-')[0].strip()
if port_num not in all_service_ports[base_service]:
all_service_ports[base_service].append(port_num)
# Re-apply multi-port info based on the combined map
for service, ports in all_service_ports.items():
if len(ports) > 1:
ports_sorted = sorted(ports)
for port in ports_sorted:
if port in combined_port_map:
current_desc = combined_port_map[port]
if "also on" not in current_desc.lower():
other_ports = [str(p) for p in ports_sorted if p != port]
if other_ports:
combined_port_map[port] = f"{current_desc} (also on {','.join(other_ports[:3])}{'...' if len(other_ports) > 3 else ''})"
print(f"Generated port map from {len(selected_files)} selected files with {len(combined_port_map)} total port definitions.")
return combined_port_map
# --- IP Extraction ---
def extract_ips(file_path: str) -> List[str]:
"""Enhanced IP extractor with validation."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
found_ips = set(re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', content))
valid_ips = []
for ip in found_ips:
octets = [int(x) for x in ip.split('.')]
# Basic validation: octets must be 0-255
if any(o < 0 or o > 255 for o in octets):
continue
# Filter out private IPs, localhost, and broadcast addresses (IPv4)
if (octets[0] == 0 or # 0.0.0.0/8
octets[0] == 10 or # 10.0.0.0/8
(octets[0] == 172 and 16 <= octets[1] <= 31) or # 172.16.0.0/12
(octets[0] == 192 and octets[1] == 168) or # 192.168.0.0/16
(octets[0] == 169 and octets[1] == 254) or # 169.254.0.0/16 (APIPA)
octets[0] == 127 or # 127.0.0.0/8 (Loopback)
all(o == 255 for o in octets)): # 255.255.255.255 (Broadcast)
continue
valid_ips.append(ip)
return sorted(valid_ips)
except Exception as e:
print(f"Error extracting IPs: {e}")
return []
# --- C2 Detection ---
C2_SIGNATURES = {
"Cobalt Strike": ["404 Not Found", "application/ocsp-response", "BeEF", "cobaltstrike"],
"Metasploit": ["Metasploit", "Mettle", "meterpreter"],
"Covenant": ["covenant", "GruntHTTP", "Auth/Login"],
"Empire": ["Empire", "session_id", "admin/login.php"],
"Sliver": ["sliver", "implant"],
"Mythic": ["mythic", "agent_message"]
}
def probe_target(ip: str, port: int, port_desc: str) -> Optional[Dict]:
"""Enhanced probe with better banner detection and multiple protocol attempts."""
sock = None
try:
# Port connectivity check
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1.2)
result = sock.connect_ex((ip, port))
if result != 0:
return None
# Try to get raw banner first (works for many services)
banner = "N/A"
try:
sock.send(b'\r\n')
raw_banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
if raw_banner and len(raw_banner) > 3:
banner = raw_banner[:200] # First 200 chars
except:
pass
finally:
if sock: # Ensure socket is closed after raw banner attempt
sock.close()
sock = None
# Try HTTP/HTTPS
for protocol in ["https", "http"]:
# Smart protocol selection: don't aggressively try HTTPS on non-standard ports
if protocol == "https" and port not in [443, 8443, 2083, 2087, 9443, 8000, 4443]:
# Skip HTTPS for low ports unless they are common HTTPS ports
if port < 8000 and port != 443:
continue
try:
resp = requests.get(
f"{protocol}://{ip}:{port}",
timeout=1.8,
verify=False, # Disable SSL verification for self-signed certs
allow_redirects=False,
headers={'User-Agent': 'Mozilla/5.0'}
)
# Get server banner
server = resp.headers.get('Server', '')
powered_by = resp.headers.get('X-Powered-By', '')
if server or powered_by:
banner = f"{server} {powered_by}".strip()
elif banner == "N/A":
banner = f"HTTP {resp.status_code}"
content = resp.text[:5000] # Limit content to avoid memory issues
category = f"Web ({port_desc})"
# C2 Detection
for name, sigs in C2_SIGNATURES.items():
if any(sig.lower() in content.lower() or
sig.lower() in str(resp.headers).lower() for sig in sigs):
category = f"🚨 POTENTIAL {name}"
break
return {
"IP": ip,
"Port": port,
"Service": port_desc,
"Type": category,
"Banner": banner
}
except requests.exceptions.SSLError:
# SSL error on HTTP, try HTTPS
if protocol == "http":
continue
else: # SSL error on HTTPS, break and mark as open
break
except requests.exceptions.RequestException: # Catch all request errors
continue
# Port is open but not HTTP or encountered issues. Return with raw banner if available.
return {
"IP": ip,
"Port": port,
"Service": port_desc,
"Type": "Open (Non-HTTP)",
"Banner": banner if banner != "N/A" else "Unknown"
}
except Exception as e:
# print(f"Error probing {ip}:{port}: {e}") # Uncomment for debugging
return None
finally:
if sock:
try:
sock.close()
except:
pass
def start_analysis(file_obj, max_threads: int, selected_port_files: List[str], progress=gr.Progress()):
"""Main analysis with pure threading."""
if file_obj is None:
return pd.DataFrame([{"Error": "No file uploaded"}]), None
if not selected_port_files:
return pd.DataFrame([{"Error": "Please select at least one port list to scan."}]), None
try:
progress(0, desc="πŸ“‹ Generating port map from selected lists...")
port_map = get_selected_port_map(selected_port_files)
progress(0.1, desc="πŸ” Extracting IP addresses...")
ips = extract_ips(file_obj.name)
if not ips:
return pd.DataFrame([{"Error": "No public IPs found in file."}]), None
scan_list = []
for ip in ips:
for port, desc in port_map.items():
scan_list.append((ip, port, desc))
total = len(scan_list)
progress(0.2, desc=f"🎯 Scanning {len(ips)} IPs Γ— {len(port_map)} ports = {total} probes...")
results = []
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {
executor.submit(probe_target, ip, port, desc): (ip, port)
for ip, port, desc in scan_list
}
completed = 0
for future in as_completed(futures):
result = future.result()
if result:
results.append(result)
completed += 1
if completed % 50 == 0 or completed == total:
progress(0.2 + (0.7 * completed / total),
desc=f"⚑ {completed}/{total} β€’ Found {len(results)} active")
progress(0.9, desc="πŸ“Š Generating report...")
if not results:
return pd.DataFrame([{"Result": "No active services found."}]), None
df = pd.DataFrame(results)
df['_threat'] = df['Type'].str.contains('POTENTIAL', case=False, na=False)
df = df.sort_values(['_threat', 'IP', 'Port'], ascending=[False, True, True])
df = df.drop('_threat', axis=1)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as tmp:
df.to_csv(tmp.name, index=False)
csv_path = tmp.name
progress(1.0, desc="βœ… Complete!")
return df, csv_path
except Exception as e:
return pd.DataFrame([{"Error": f"Analysis failed: {str(e)}"}]), None
# --- Gradio UI ---
custom_css = """
.gradio-container {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.main-header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 2rem;
border-radius: 12px;
color: white;
margin-bottom: 2rem;
box-shadow: 0 8px 16px rgba(0,0,0,0.1);
}
.main-header h1 {
margin: 0;
font-size: 2.2rem;
font-weight: 700;
}
.main-header p {
margin: 0.5rem 0 0 0;
opacity: 0.95;
}
"""
# --- CRITICAL CHANGE: Cache port files BEFORE Gradio UI is defined ---
# This ensures AVAILABLE_PORT_FILES is populated when gr.CheckboxGroup is instantiated
cache_all_github_port_files()
demo = gr.Blocks(title="C2 Deep Scanner Pro", css=custom_css)
with demo:
gr.HTML("""
<div class="main-header">
<h1>πŸ•΅οΈβ€β™‚οΈ C2 Infrastructure Deep Scanner Pro</h1>
<p>Multi-threaded network reconnaissance with enhanced banner detection</p>
</div>
""")
# Removed the large feature/input files markdown sections
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(
label="πŸ“€ Upload Target File",
file_types=['.txt', '.json', '.log', '.csv']
)
max_threads = gr.Slider(
minimum=30,
maximum=100,
value=60,
step=10,
label="βš™οΈ Concurrent Threads",
info="60-80 recommended for best speed"
)
run_btn = gr.Button(
"πŸš€ Start Deep Analysis",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
# Port List Selector
# Update label to reflect root directory
port_file_selector = gr.CheckboxGroup(
choices=AVAILABLE_PORT_FILES, # Now populated
value=AVAILABLE_PORT_FILES, # Select all by default, now populated
label="πŸ”§ Select Port Lists (from repository root on GitHub)",
info="Choose which port definition files to use for scanning."
)
gr.Markdown("""
**Port Format Support:**
- **Continuous** (e.g., `135 - MS SQL1433 - MSSQL`)
- **Standard formats** (e.g., `80 - HTTP`, `MySQL: 3306`, `22/tcp SSH`)
Multi-port services shown as: `MySQL (also on 3307,3308)`
""")
gr.Markdown("---")
gr.Markdown("### πŸ“Š Scan Results")
output_table = gr.Dataframe(
label="Detected Infrastructure",
wrap=True
)
download_btn = gr.File(label="πŸ’Ύ Download CSV Report")
run_btn.click(
fn=start_analysis,
inputs=[file_input, max_threads, port_file_selector], # Added port_file_selector
outputs=[output_table, download_btn]
)
gr.Markdown("""
---
πŸ’‘ **Banner Info:** Shows Server headers, X-Powered-By, or raw socket responses
🚨 **Threats:** Results marked with 🚨 indicate potential C2 infrastructure
πŸ“Œ **Multi-Port:** Services on multiple ports show related ports in parentheses
""")
if __name__ == "__main__":
demo.queue(max_size=10)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
theme=gr.themes.Soft(),
ssr_mode=False
)