Spaces:
Runtime error
Runtime error
| """ | |
| Shadowsocks Protocol Implementation | |
| """ | |
| import os | |
| import asyncio | |
| import hashlib | |
| from typing import Optional, Tuple | |
| from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class ShadowsocksProtocol: | |
| CHUNK_SIZE = 8192 | |
| def __init__(self, access_key: str): | |
| self.access_key = access_key | |
| self.cipher = self._create_cipher() | |
| self.buffer = bytearray() | |
| def _create_cipher(self) -> ChaCha20Poly1305: | |
| """Create ChaCha20-Poly1305 cipher""" | |
| key = hashlib.sha256(self.access_key.encode()).digest() | |
| return ChaCha20Poly1305(key) | |
| async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): | |
| """Handle client connection""" | |
| try: | |
| # Read and decrypt initial packet | |
| data = await reader.read(self.CHUNK_SIZE) | |
| if not data: | |
| return | |
| # Extract target address | |
| decrypted = self._decrypt_packet(data) | |
| target_addr = self._extract_address(decrypted) | |
| if not target_addr: | |
| logger.error("Invalid target address") | |
| return | |
| # Connect to target | |
| target_reader, target_writer = await asyncio.open_connection( | |
| target_addr[0], target_addr[1] | |
| ) | |
| # Start bidirectional forwarding | |
| await self._proxy_data(reader, writer, target_reader, target_writer) | |
| except Exception as e: | |
| logger.error(f"Connection error: {e}") | |
| finally: | |
| writer.close() | |
| await writer.wait_closed() | |
| async def _proxy_data(self, | |
| client_reader: asyncio.StreamReader, | |
| client_writer: asyncio.StreamWriter, | |
| target_reader: asyncio.StreamReader, | |
| target_writer: asyncio.StreamWriter): | |
| """Handle bidirectional data forwarding""" | |
| async def forward(reader: asyncio.StreamReader, | |
| writer: asyncio.StreamWriter, | |
| encrypt: bool = False): | |
| try: | |
| while True: | |
| data = await reader.read(self.CHUNK_SIZE) | |
| if not data: | |
| break | |
| if encrypt: | |
| data = self._encrypt_packet(data) | |
| writer.write(data) | |
| await writer.drain() | |
| except Exception as e: | |
| logger.error(f"Forward error: {e}") | |
| finally: | |
| writer.close() | |
| await writer.wait_closed() | |
| await asyncio.gather( | |
| forward(client_reader, target_writer, encrypt=False), | |
| forward(target_reader, client_writer, encrypt=True) | |
| ) | |
| def _encrypt_packet(self, data: bytes) -> bytes: | |
| """Encrypt a packet""" | |
| nonce = os.urandom(12) | |
| encrypted = self.cipher.encrypt(nonce, data, None) | |
| return nonce + encrypted | |
| def _decrypt_packet(self, data: bytes) -> bytes: | |
| """Decrypt a packet""" | |
| nonce, ciphertext = data[:12], data[12:] | |
| return self.cipher.decrypt(nonce, ciphertext, None) | |
| def _extract_address(self, data: bytes) -> Optional[Tuple[str, int]]: | |
| """Extract address from Shadowsocks address header""" | |
| try: | |
| atyp = data[0] # Address type | |
| if atyp == 1: # IPv4 | |
| addr = '.'.join(str(b) for b in data[1:5]) | |
| port = int.from_bytes(data[5:7], 'big') | |
| payload_start = 7 | |
| elif atyp == 3: # Domain name | |
| length = data[1] | |
| addr = data[2:2+length].decode() | |
| port = int.from_bytes(data[2+length:4+length], 'big') | |
| payload_start = 4 + length | |
| elif atyp == 4: # IPv6 | |
| addr = ':'.join(format(b, '02x') for b in data[1:17]) | |
| port = int.from_bytes(data[17:19], 'big') | |
| payload_start = 19 | |
| else: | |
| return None | |
| return addr, port | |
| except Exception as e: | |
| logger.error(f"Error extracting address: {e}") | |
| return None | |