File size: 2,586 Bytes
1bda23f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import socket
import threading
import select
import sys
# Конфигурация
PROXY_HOST = '0.0.0.0'
PROXY_PORT = 8888
BUFFER_SIZE = 4096
def handle_client(client_socket):
# Получаем первый пакет (заголовки)
data = client_socket.recv(BUFFER_SIZE)
# Анализ запроса (HTTP или HTTPS)
try:
if data.startswith(b'CONNECT'):
# HTTPS CONNECT запрос
target = data.split(b' ')[1].split(b':')[0].decode()
port = int(data.split(b':')[2].split(b' ')[0])
else:
# HTTP запрос
target = data.split(b'Host: ')[1].split(b'\r\n')[0].decode()
port = 80
except Exception:
client_socket.close()
return
# Подключаемся к целевому серверу
try:
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((target, port))
# Для HTTPS отправляем подтверждение
if data.startswith(b'CONNECT'):
client_socket.send(b'HTTP/1.1 200 Connection Established\r\n\r\n')
else:
remote_socket.send(data)
except Exception as e:
client_socket.close()
return
# Туннелируем данные в обоих направлениях
sockets = [client_socket, remote_socket]
while True:
try:
read_sockets, _, _ = select.select(sockets, [], [])
for sock in read_sockets:
data = sock.recv(BUFFER_SIZE)
if not data:
break
if sock is client_socket:
remote_socket.send(data)
else:
client_socket.send(data)
except Exception:
break
client_socket.close()
remote_socket.close()
def start_proxy():
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
proxy_socket.bind((PROXY_HOST, PROXY_PORT))
proxy_socket.listen(50)
print(f"[*] Прокси запущен на {PROXY_HOST}:{PROXY_PORT}")
while True:
client_socket, addr = proxy_socket.accept()
print(f"[>] Подключено: {addr[0]}:{addr[1]}")
proxy_thread = threading.Thread(
target=handle_client,
args=(client_socket,)
)
proxy_thread.daemon = True
proxy_thread.start()
if __name__ == "__main__":
start_proxy() |