JRNET / core /shadowsocks_protocol.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
Shadowsocks Protocol Implementation
"""
import os
import asyncio
import hashlib
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
import logging
logger = logging.getLogger(__name__)
class ShadowsocksProtocol:
CHUNK_SIZE = 8192
def __init__(self, access_key: str):
self.access_key = access_key
self.cipher = self._create_cipher()
self.buffer = bytearray()
def _create_cipher(self) -> ChaCha20Poly1305:
"""Create ChaCha20-Poly1305 cipher"""
key = hashlib.sha256(self.access_key.encode()).digest()
return ChaCha20Poly1305(key)
async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Handle client connection"""
try:
# Read and decrypt initial packet
data = await reader.read(self.CHUNK_SIZE)
if not data:
return
# Extract target address
decrypted = self._decrypt_packet(data)
target_addr = self._extract_address(decrypted)
if not target_addr:
logger.error("Invalid target address")
return
# Connect to target
target_reader, target_writer = await asyncio.open_connection(
target_addr[0], target_addr[1]
)
# Start bidirectional forwarding
await self._proxy_data(reader, writer, target_reader, target_writer)
except Exception as e:
logger.error(f"Connection error: {e}")
finally:
writer.close()
await writer.wait_closed()
async def _proxy_data(self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
target_reader: asyncio.StreamReader,
target_writer: asyncio.StreamWriter):
"""Handle bidirectional data forwarding"""
async def forward(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
encrypt: bool = False):
try:
while True:
data = await reader.read(self.CHUNK_SIZE)
if not data:
break
if encrypt:
data = self._encrypt_packet(data)
writer.write(data)
await writer.drain()
except Exception as e:
logger.error(f"Forward error: {e}")
finally:
writer.close()
await writer.wait_closed()
await asyncio.gather(
forward(client_reader, target_writer, encrypt=False),
forward(target_reader, client_writer, encrypt=True)
)
def _encrypt_packet(self, data: bytes) -> bytes:
"""Encrypt a packet"""
nonce = os.urandom(12)
encrypted = self.cipher.encrypt(nonce, data, None)
return nonce + encrypted
def _decrypt_packet(self, data: bytes) -> bytes:
"""Decrypt a packet"""
nonce, ciphertext = data[:12], data[12:]
return self.cipher.decrypt(nonce, ciphertext, None)
def _extract_address(self, data: bytes) -> Optional[Tuple[str, int]]:
"""Extract address from Shadowsocks address header"""
try:
atyp = data[0] # Address type
if atyp == 1: # IPv4
addr = '.'.join(str(b) for b in data[1:5])
port = int.from_bytes(data[5:7], 'big')
payload_start = 7
elif atyp == 3: # Domain name
length = data[1]
addr = data[2:2+length].decode()
port = int.from_bytes(data[2+length:4+length], 'big')
payload_start = 4 + length
elif atyp == 4: # IPv6
addr = ':'.join(format(b, '02x') for b in data[1:17])
port = int.from_bytes(data[17:19], 'big')
payload_start = 19
else:
return None
return addr, port
except Exception as e:
logger.error(f"Error extracting address: {e}")
return None