Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HTTP/HTTPS Forward Proxy Server | |
| A simple but functional HTTP proxy that supports both HTTP and HTTPS (via CONNECT tunnel). | |
| """ | |
| import socket | |
| import threading | |
| import select | |
| import logging | |
| import argparse | |
| import json | |
| from urllib.parse import urlparse | |
| from urllib.request import urlopen | |
| from urllib.error import URLError | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def get_ip_info(): | |
| """Get IP information from ip-api.com""" | |
| try: | |
| url = "http://ip-api.com/json/?fields=status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,query,proxy,hosting,mobile" | |
| with urlopen(url, timeout=10) as response: | |
| data = json.loads(response.read().decode()) | |
| if data.get("status") != "success": | |
| return {"success": False, "message": data.get("message", "Failed to get IP info")} | |
| # Determine IP type | |
| is_hosting = data.get("hosting", False) | |
| is_proxy = data.get("proxy", False) | |
| is_mobile = data.get("mobile", False) | |
| if is_hosting: | |
| ip_type = "datacenter" | |
| ip_type_label = "Data Center (Server/Cloud)" | |
| ip_type_icon = "β οΈ" | |
| elif is_proxy: | |
| ip_type = "proxy" | |
| ip_type_label = "Proxy/VPN" | |
| ip_type_icon = "β οΈ" | |
| elif is_mobile: | |
| ip_type = "mobile" | |
| ip_type_label = "Mobile Network (3G/4G/5G)" | |
| ip_type_icon = "π±" | |
| else: | |
| ip_type = "residential" | |
| ip_type_label = "Residential (Home Network)" | |
| ip_type_icon = "π " | |
| return { | |
| "success": True, | |
| "ip": data.get("query"), | |
| "country": data.get("country"), | |
| "country_code": data.get("countryCode"), | |
| "region": data.get("regionName"), | |
| "city": data.get("city"), | |
| "isp": data.get("isp"), | |
| "org": data.get("org"), | |
| "timezone": data.get("timezone"), | |
| "lat": data.get("lat"), | |
| "lon": data.get("lon"), | |
| "ip_type": ip_type, | |
| "ip_type_label": ip_type_label, | |
| "ip_type_icon": ip_type_icon, | |
| "is_hosting": is_hosting, | |
| "is_proxy": is_proxy, | |
| "is_mobile": is_mobile | |
| } | |
| except Exception as e: | |
| return {"success": False, "message": str(e)} | |
| def display_ip_info(): | |
| """Display IP information in a formatted way""" | |
| logger.info("π‘ Fetching IP information...") | |
| info = get_ip_info() | |
| if not info.get("success"): | |
| logger.warning(f"β Could not get IP info: {info.get('message')}") | |
| return | |
| # Create a nice formatted display | |
| print("\n" + "=" * 60) | |
| print(" π PROXY IP INFORMATION") | |
| print("=" * 60) | |
| print(f" IP Address : {info['ip']}") | |
| print(f" Location : {info['city']}, {info['region']}, {info['country']} ({info['country_code']})") | |
| print(f" ISP : {info['isp']}") | |
| print(f" Organization : {info['org']}") | |
| print(f" Timezone : {info['timezone']}") | |
| print(f" Coordinates : {info['lat']}, {info['lon']}") | |
| print("-" * 60) | |
| print(f" {info['ip_type_icon']} IP Type : {info['ip_type_label']}") | |
| # Additional flags | |
| flags = [] | |
| if info['is_hosting']: | |
| flags.append("π₯οΈ Hosting/DC") | |
| if info['is_proxy']: | |
| flags.append("π Proxy/VPN") | |
| if info['is_mobile']: | |
| flags.append("π± Mobile") | |
| if flags: | |
| print(f" Flags : {' | '.join(flags)}") | |
| print("=" * 60 + "\n") | |
| return info | |
| class ProxyServer: | |
| """HTTP/HTTPS Forward Proxy Server""" | |
| BUFFER_SIZE = 8192 | |
| def __init__(self, host: str = '0.0.0.0', port: int = 8080): | |
| self.host = host | |
| self.port = port | |
| self.server_socket = None | |
| self.running = False | |
| def start(self): | |
| """Start the proxy server""" | |
| self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| try: | |
| self.server_socket.bind((self.host, self.port)) | |
| self.server_socket.listen(100) | |
| self.running = True | |
| logger.info(f"π Proxy server started on {self.host}:{self.port}") | |
| logger.info("Press Ctrl+C to stop") | |
| while self.running: | |
| try: | |
| client_socket, client_address = self.server_socket.accept() | |
| # Handle each client in a separate thread (no connection log to reduce noise) | |
| client_thread = threading.Thread( | |
| target=self.handle_client, | |
| args=(client_socket, client_address) | |
| ) | |
| client_thread.daemon = True | |
| client_thread.start() | |
| except socket.error: | |
| if self.running: | |
| raise | |
| except KeyboardInterrupt: | |
| logger.info("\nβΉοΈ Stopping proxy server...") | |
| finally: | |
| self.stop() | |
| def stop(self): | |
| """Stop the proxy server""" | |
| self.running = False | |
| if self.server_socket: | |
| self.server_socket.close() | |
| logger.info("β Proxy server stopped") | |
| def handle_client(self, client_socket: socket.socket, client_address: tuple): | |
| """Handle incoming client connection""" | |
| try: | |
| # Receive the request from client | |
| request = client_socket.recv(self.BUFFER_SIZE) | |
| if not request: | |
| return | |
| # Parse the first line of the request | |
| first_line = request.decode('utf-8', errors='ignore').split('\n')[0] | |
| method = first_line.split(' ')[0] | |
| if method == 'CONNECT': | |
| # HTTPS tunnel | |
| self.handle_https_tunnel(client_socket, first_line, client_address) | |
| else: | |
| # HTTP request | |
| self.handle_http_request(client_socket, request, first_line, client_address) | |
| except Exception as e: | |
| logger.error(f"β Error handling client {client_address}: {e}") | |
| finally: | |
| client_socket.close() | |
| def handle_https_tunnel(self, client_socket: socket.socket, first_line: str, client_address: tuple): | |
| """Handle HTTPS CONNECT tunnel""" | |
| try: | |
| # Parse CONNECT request: CONNECT host:port HTTP/1.1 | |
| url = first_line.split(' ')[1] | |
| host, port = url.split(':') | |
| port = int(port) | |
| logger.info(f"π HTTPS tunnel to {host}:{port}") | |
| # Connect to the target server | |
| server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| server_socket.settimeout(10) | |
| server_socket.connect((host, port)) | |
| # Send connection established response to client | |
| client_socket.send(b'HTTP/1.1 200 Connection Established\r\n\r\n') | |
| # Tunnel data between client and server | |
| self.tunnel_data(client_socket, server_socket) | |
| except Exception as e: | |
| logger.error(f"β HTTPS tunnel error: {e}") | |
| client_socket.send(b'HTTP/1.1 502 Bad Gateway\r\n\r\n') | |
| def handle_http_request(self, client_socket: socket.socket, request: bytes, first_line: str, client_address: tuple): | |
| """Handle HTTP request""" | |
| try: | |
| # Parse the URL | |
| parts = first_line.split(' ') | |
| if len(parts) < 2: | |
| return | |
| method = parts[0] | |
| url = parts[1] | |
| # Handle health check / direct requests (not proxy requests) | |
| # These are requests without http:// prefix - likely HuggingFace health checks | |
| if not url.startswith('http://') and not url.startswith('https://'): | |
| # Return a simple health check response (silent - no logging) | |
| health_response = b'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nConnection: close\r\n\r\n{"status": "ok", "service": "HF Proxy"}' | |
| client_socket.send(health_response) | |
| return | |
| parsed_url = urlparse(url) | |
| host = parsed_url.hostname | |
| port = parsed_url.port or 80 | |
| # Validate hostname | |
| if not host: | |
| error_response = b'HTTP/1.1 400 Bad Request\r\nContent-Type: text/html\r\n\r\n<h1>400 Bad Request</h1><p>Invalid proxy request</p>' | |
| client_socket.send(error_response) | |
| return | |
| path = parsed_url.path or '/' | |
| if parsed_url.query: | |
| path += '?' + parsed_url.query | |
| logger.info(f"π {method} {host}:{port}{path}") | |
| # Modify request to use relative path instead of absolute URL | |
| modified_request = self.modify_request(request, path, host) | |
| # Connect to target server | |
| server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| server_socket.settimeout(10) | |
| server_socket.connect((host, port)) | |
| # Send request to server | |
| server_socket.send(modified_request) | |
| # Receive response from server and send to client | |
| while True: | |
| response = server_socket.recv(self.BUFFER_SIZE) | |
| if not response: | |
| break | |
| client_socket.send(response) | |
| server_socket.close() | |
| except Exception as e: | |
| logger.error(f"β HTTP request error: {e}") | |
| error_response = b'HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/html\r\n\r\n<h1>502 Bad Gateway</h1>' | |
| client_socket.send(error_response) | |
| def modify_request(self, request: bytes, path: str, host: str) -> bytes: | |
| """Modify the request to use relative path""" | |
| try: | |
| request_str = request.decode('utf-8', errors='ignore') | |
| lines = request_str.split('\r\n') | |
| # Modify first line to use relative path | |
| parts = lines[0].split(' ') | |
| parts[1] = path | |
| lines[0] = ' '.join(parts) | |
| # Ensure Host header is present | |
| has_host = any(line.lower().startswith('host:') for line in lines) | |
| if not has_host: | |
| lines.insert(1, f'Host: {host}') | |
| return '\r\n'.join(lines).encode('utf-8') | |
| except Exception: | |
| return request | |
| def tunnel_data(self, client_socket: socket.socket, server_socket: socket.socket): | |
| """Tunnel data bidirectionally between client and server (supports WSS/WebSocket)""" | |
| sockets = [client_socket, server_socket] | |
| timeout = 300 # 5 minutes - longer timeout for WebSocket connections | |
| try: | |
| while True: | |
| readable, _, exceptional = select.select(sockets, [], sockets, timeout) | |
| if exceptional: | |
| break | |
| if not readable: | |
| break | |
| for sock in readable: | |
| if sock is client_socket: | |
| data = client_socket.recv(self.BUFFER_SIZE) | |
| if data: | |
| server_socket.send(data) | |
| else: | |
| return | |
| elif sock is server_socket: | |
| data = server_socket.recv(self.BUFFER_SIZE) | |
| if data: | |
| client_socket.send(data) | |
| else: | |
| return | |
| except Exception as e: | |
| logger.debug(f"Tunnel closed: {e}") | |
| finally: | |
| server_socket.close() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='HTTP/HTTPS Forward Proxy Server', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=''' | |
| Examples: | |
| python proxy.py # Start on default port 8080 | |
| python proxy.py -p 3128 # Start on port 3128 | |
| python proxy.py -H 127.0.0.1 # Bind to localhost only | |
| python proxy.py --no-ip-info # Skip IP info display | |
| Usage with curl: | |
| curl -x http://localhost:8080 http://example.com | |
| curl -x http://localhost:8080 https://example.com | |
| ''' | |
| ) | |
| parser.add_argument('-H', '--host', default='0.0.0.0', help='Host to bind (default: 0.0.0.0)') | |
| parser.add_argument('-p', '--port', type=int, default=8080, help='Port to listen on (default: 8080)') | |
| parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging') | |
| parser.add_argument('--no-ip-info', action='store_true', help='Skip IP information display') | |
| args = parser.parse_args() | |
| if args.verbose: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| # Display IP information on startup | |
| if not args.no_ip_info: | |
| display_ip_info() | |
| proxy = ProxyServer(host=args.host, port=args.port) | |
| proxy.start() | |
| if __name__ == '__main__': | |
| main() | |