|
|
import socket |
|
|
import threading |
|
|
import time |
|
|
from flask import Blueprint, request, jsonify |
|
|
from flask_cors import cross_origin |
|
|
|
|
|
scanner_bp = Blueprint('scanner', __name__) |
|
|
|
|
|
class PortScanner: |
|
|
def __init__(self, target_ip, start_port=1, end_port=1000, timeout=1, threads=100): |
|
|
self.target_ip = target_ip |
|
|
self.start_port = start_port |
|
|
self.end_port = end_port |
|
|
self.timeout = timeout |
|
|
self.threads = threads |
|
|
self.open_ports = [] |
|
|
self.lock = threading.Lock() |
|
|
self.progress = 0 |
|
|
self.total_ports = end_port - start_port + 1 |
|
|
|
|
|
def scan_port(self, port): |
|
|
try: |
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
sock.settimeout(self.timeout) |
|
|
result = sock.connect_ex((self.target_ip, port)) |
|
|
|
|
|
if result == 0: |
|
|
|
|
|
try: |
|
|
service = socket.getservbyport(port) |
|
|
except: |
|
|
service = "unknown" |
|
|
|
|
|
with self.lock: |
|
|
self.open_ports.append({ |
|
|
'port': port, |
|
|
'service': service, |
|
|
'status': 'open' |
|
|
}) |
|
|
|
|
|
sock.close() |
|
|
|
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
with self.lock: |
|
|
self.progress += 1 |
|
|
|
|
|
def scan_range(self, port_list): |
|
|
for port in port_list: |
|
|
self.scan_port(port) |
|
|
|
|
|
def start_scan(self): |
|
|
ports = list(range(self.start_port, self.end_port + 1)) |
|
|
chunk_size = len(ports) // self.threads |
|
|
|
|
|
if chunk_size == 0: |
|
|
chunk_size = 1 |
|
|
|
|
|
threads = [] |
|
|
|
|
|
for i in range(0, len(ports), chunk_size): |
|
|
chunk = ports[i:i + chunk_size] |
|
|
thread = threading.Thread(target=self.scan_range, args=(chunk,)) |
|
|
threads.append(thread) |
|
|
thread.start() |
|
|
|
|
|
for thread in threads: |
|
|
thread.join() |
|
|
|
|
|
return sorted(self.open_ports, key=lambda x: x['port']) |
|
|
|
|
|
|
|
|
current_scanner = None |
|
|
|
|
|
@scanner_bp.route('/scan', methods=['POST']) |
|
|
@cross_origin() |
|
|
def scan_ports(): |
|
|
global current_scanner |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not data: |
|
|
return jsonify({'error': 'No data provided'}), 400 |
|
|
|
|
|
target_ip = data.get('ip') |
|
|
start_port = int(data.get('start_port', 1)) |
|
|
end_port = int(data.get('end_port', 1000)) |
|
|
timeout = float(data.get('timeout', 1)) |
|
|
threads = int(data.get('threads', 100)) |
|
|
|
|
|
if not target_ip: |
|
|
return jsonify({'error': 'IP address is required'}), 400 |
|
|
|
|
|
|
|
|
try: |
|
|
socket.inet_aton(target_ip) |
|
|
except socket.error: |
|
|
return jsonify({'error': 'Invalid IP address format'}), 400 |
|
|
|
|
|
|
|
|
if start_port < 1 or end_port > 65535 or start_port > end_port: |
|
|
return jsonify({'error': 'Invalid port range'}), 400 |
|
|
|
|
|
|
|
|
if end_port - start_port > 10000: |
|
|
return jsonify({'error': 'Port range too large (max 10000 ports)'}), 400 |
|
|
|
|
|
current_scanner = PortScanner(target_ip, start_port, end_port, timeout, threads) |
|
|
|
|
|
start_time = time.time() |
|
|
open_ports = current_scanner.start_scan() |
|
|
end_time = time.time() |
|
|
|
|
|
scan_duration = round(end_time - start_time, 2) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'target_ip': target_ip, |
|
|
'scan_range': f"{start_port}-{end_port}", |
|
|
'open_ports': open_ports, |
|
|
'total_open': len(open_ports), |
|
|
'total_scanned': current_scanner.total_ports, |
|
|
'scan_duration': scan_duration |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@scanner_bp.route('/progress', methods=['GET']) |
|
|
@cross_origin() |
|
|
def get_progress(): |
|
|
global current_scanner |
|
|
|
|
|
if current_scanner is None: |
|
|
return jsonify({'progress': 0, 'total': 0}) |
|
|
|
|
|
return jsonify({ |
|
|
'progress': current_scanner.progress, |
|
|
'total': current_scanner.total_ports, |
|
|
'percentage': round((current_scanner.progress / current_scanner.total_ports) * 100, 2) if current_scanner.total_ports > 0 else 0 |
|
|
}) |
|
|
|
|
|
@scanner_bp.route('/common-ports', methods=['GET']) |
|
|
@cross_origin() |
|
|
def get_common_ports(): |
|
|
common_ports = { |
|
|
'web': [80, 443, 8080, 8443, 3000, 8000], |
|
|
'mail': [25, 110, 143, 993, 995, 587], |
|
|
'file': [21, 22, 23, 69, 115, 2049], |
|
|
'database': [1433, 1521, 3306, 5432, 6379, 27017], |
|
|
'remote': [22, 23, 3389, 5900, 5901], |
|
|
'other': [53, 67, 68, 123, 161, 162, 389, 636, 1723] |
|
|
} |
|
|
|
|
|
return jsonify(common_ports) |
|
|
|
|
|
@scanner_bp.route('/ping', methods=['POST']) |
|
|
@cross_origin() |
|
|
def ping_host(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
target_ip = data.get('ip') |
|
|
|
|
|
if not target_ip: |
|
|
return jsonify({'error': 'IP address is required'}), 400 |
|
|
|
|
|
|
|
|
try: |
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
sock.settimeout(3) |
|
|
result = sock.connect_ex((target_ip, 80)) |
|
|
sock.close() |
|
|
|
|
|
if result == 0: |
|
|
status = 'reachable' |
|
|
else: |
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
sock.settimeout(3) |
|
|
result = sock.connect_ex((target_ip, 443)) |
|
|
sock.close() |
|
|
status = 'reachable' if result == 0 else 'unreachable' |
|
|
|
|
|
except Exception: |
|
|
status = 'unreachable' |
|
|
|
|
|
return jsonify({ |
|
|
'ip': target_ip, |
|
|
'status': status |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|