sock5 / app.py
Alvin3y1's picture
Create app.py
c884231 verified
import os
import sys
import threading
import time
from stem.process import launch_tor_with_config
from websockify import WebSocketProxy
# Try to import the default handler to subclass it.
# Depending on the websockify version, it might be in different spots.
try:
from websockify.websocketproxy import ProxyRequestHandler
except ImportError:
# Fallback: if we can't import it directly, we will grab it
# dynamically from the proxy instance later.
ProxyRequestHandler = None
# Configuration
TOR_SOCKS_PORT = 9050
WEBSOCKET_PORT = 7860
WEBSOCKET_LISTEN_HOST = '0.0.0.0'
TOR_DATA_DIR = os.path.join(os.getcwd(), "tor_data")
def print_status(msg):
print(f"[*] {msg}")
def start_tor():
"""
Starts the Tor process.
"""
print_status("Starting Tor process...")
if not os.path.exists(TOR_DATA_DIR):
os.makedirs(TOR_DATA_DIR)
try:
config = {
'SocksPort': str(TOR_SOCKS_PORT),
'DataDirectory': TOR_DATA_DIR,
}
tor_process = launch_tor_with_config(
config=config,
init_msg_handler=lambda line: print(f" [Tor] {line}") if "Bootstrapped" in line else None,
take_ownership=True,
)
print_status(f"Tor is running! SOCKS proxy at 127.0.0.1:{TOR_SOCKS_PORT}")
return tor_process
except Exception as e:
print_status(f"Error starting Tor: {e}")
sys.exit(1)
def create_custom_handler(base_handler_class):
"""
Creates a custom handler class inheriting from the provided base.
This allows us to serve "Hello World" on HTTP GET while keeping
WebSocket functionality for the proxy.
"""
class HelloHandler(base_handler_class):
def do_GET(self):
# Check if this is a WebSocket upgrade request
if self.headers.get('Upgrade') and self.headers.get('Upgrade').lower() == 'websocket':
# Pass it to the standard websockify handler
super().do_GET()
else:
# It's a normal HTTP request (e.g., from a browser)
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Hello World")
return HelloHandler
def start_websockify():
"""
Starts the Websockify proxy with a Custom Handler.
"""
print_status(f"Starting Websockify on {WEBSOCKET_LISTEN_HOST}:{WEBSOCKET_PORT}...")
# Initialize the proxy
proxy = WebSocketProxy(
listen_host=WEBSOCKET_LISTEN_HOST,
listen_port=WEBSOCKET_PORT,
target_host='127.0.0.1',
target_port=TOR_SOCKS_PORT
)
# --- MAGIC HAPPENS HERE ---
# We get the default handler class the proxy was going to use
default_handler = proxy.RequestHandlerClass
# We create our custom handler inheriting from the default one
custom_handler = create_custom_handler(default_handler)
# We tell the proxy to use our new handler instead
proxy.RequestHandlerClass = custom_handler
# --------------------------
try:
proxy.start_server()
except OSError as e:
print_status(f"Error starting Websockify: {e}")
os._exit(1)
def main():
tor_process = start_tor()
ws_thread = threading.Thread(target=start_websockify)
ws_thread.daemon = True
ws_thread.start()
print_status("Setup complete.")
print_status(f"1. Proxy (WS): ws://<IP>:{WEBSOCKET_PORT}/")
print_status(f"2. Check (HTTP): http://<IP>:{WEBSOCKET_PORT}/ -> Should say 'Hello World'")
print_status("Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print_status("Stopping...")
tor_process.terminate()
sys.exit(0)
if __name__ == "__main__":
main()