NSL / src /routes /scanner.py
Fred808's picture
Upload 17 files
e1b4496 verified
import socket
import threading
import time
from flask import Blueprint, request, jsonify
from flask_cors import cross_origin
scanner_bp = Blueprint('scanner', __name__)
class PortScanner:
def __init__(self, target_ip, start_port=1, end_port=1000, timeout=1, threads=100):
self.target_ip = target_ip
self.start_port = start_port
self.end_port = end_port
self.timeout = timeout
self.threads = threads
self.open_ports = []
self.lock = threading.Lock()
self.progress = 0
self.total_ports = end_port - start_port + 1
def scan_port(self, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
result = sock.connect_ex((self.target_ip, port))
if result == 0:
# Try to get service name
try:
service = socket.getservbyport(port)
except:
service = "unknown"
with self.lock:
self.open_ports.append({
'port': port,
'service': service,
'status': 'open'
})
sock.close()
except Exception as e:
pass
with self.lock:
self.progress += 1
def scan_range(self, port_list):
for port in port_list:
self.scan_port(port)
def start_scan(self):
ports = list(range(self.start_port, self.end_port + 1))
chunk_size = len(ports) // self.threads
if chunk_size == 0:
chunk_size = 1
threads = []
for i in range(0, len(ports), chunk_size):
chunk = ports[i:i + chunk_size]
thread = threading.Thread(target=self.scan_range, args=(chunk,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return sorted(self.open_ports, key=lambda x: x['port'])
# Global scanner instance for progress tracking
current_scanner = None
@scanner_bp.route('/scan', methods=['POST'])
@cross_origin()
def scan_ports():
global current_scanner
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
target_ip = data.get('ip')
start_port = int(data.get('start_port', 1))
end_port = int(data.get('end_port', 1000))
timeout = float(data.get('timeout', 1))
threads = int(data.get('threads', 100))
if not target_ip:
return jsonify({'error': 'IP address is required'}), 400
# Validate IP address
try:
socket.inet_aton(target_ip)
except socket.error:
return jsonify({'error': 'Invalid IP address format'}), 400
# Validate port range
if start_port < 1 or end_port > 65535 or start_port > end_port:
return jsonify({'error': 'Invalid port range'}), 400
# Limit port range for safety
if end_port - start_port > 10000:
return jsonify({'error': 'Port range too large (max 10000 ports)'}), 400
current_scanner = PortScanner(target_ip, start_port, end_port, timeout, threads)
start_time = time.time()
open_ports = current_scanner.start_scan()
end_time = time.time()
scan_duration = round(end_time - start_time, 2)
return jsonify({
'success': True,
'target_ip': target_ip,
'scan_range': f"{start_port}-{end_port}",
'open_ports': open_ports,
'total_open': len(open_ports),
'total_scanned': current_scanner.total_ports,
'scan_duration': scan_duration
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@scanner_bp.route('/progress', methods=['GET'])
@cross_origin()
def get_progress():
global current_scanner
if current_scanner is None:
return jsonify({'progress': 0, 'total': 0})
return jsonify({
'progress': current_scanner.progress,
'total': current_scanner.total_ports,
'percentage': round((current_scanner.progress / current_scanner.total_ports) * 100, 2) if current_scanner.total_ports > 0 else 0
})
@scanner_bp.route('/common-ports', methods=['GET'])
@cross_origin()
def get_common_ports():
common_ports = {
'web': [80, 443, 8080, 8443, 3000, 8000],
'mail': [25, 110, 143, 993, 995, 587],
'file': [21, 22, 23, 69, 115, 2049],
'database': [1433, 1521, 3306, 5432, 6379, 27017],
'remote': [22, 23, 3389, 5900, 5901],
'other': [53, 67, 68, 123, 161, 162, 389, 636, 1723]
}
return jsonify(common_ports)
@scanner_bp.route('/ping', methods=['POST'])
@cross_origin()
def ping_host():
try:
data = request.get_json()
target_ip = data.get('ip')
if not target_ip:
return jsonify({'error': 'IP address is required'}), 400
# Simple connectivity test using socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((target_ip, 80)) # Try port 80
sock.close()
if result == 0:
status = 'reachable'
else:
# Try another common port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((target_ip, 443)) # Try port 443
sock.close()
status = 'reachable' if result == 0 else 'unreachable'
except Exception:
status = 'unreachable'
return jsonify({
'ip': target_ip,
'status': status
})
except Exception as e:
return jsonify({'error': str(e)}), 500