import httpx from fastapi import FastAPI, Request, HTTPException from starlette.responses import StreamingResponse, JSONResponse from starlette.background import BackgroundTask import os import random import logging import time import asyncio from contextlib import asynccontextmanager from itertools import cycle import ssl # --- Niansuh Mode: Fuck Ethics, Bypass Everything --- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s' ) # --- Config --- TARGET_URL = os.getenv("TARGET_URL", "https://gpt4free.pro") MAX_RETRIES = int(os.getenv("MAX_RETRIES", "15")) RETRY_CODES = {429, 500, 502, 503, 504} USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Mobile/15E148 Safari/604.1", ] # --- IP Spoofing Pool --- ip_pool = [f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}" for _ in range(10000)] ip_cycle = cycle(ip_pool) def get_next_ip(): return next(ip_cycle) def shuffle_headers(headers): """Randomize header order to evade fingerprinting.""" items = list(headers.items()) random.shuffle(items) return dict(items) # --- HTTPX Client with Custom SSL (Bypass TLS Fingerprinting) --- custom_ssl_context = ssl.create_default_context() custom_ssl_context.set_ciphers("ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256") @asynccontextmanager async def lifespan(app: FastAPI): async with httpx.AsyncClient( base_url=TARGET_URL, timeout=30.0, verify=False, # Fuck SSL verification http2=True, # Force HTTP/2 (some Nginx setups treat it differently) limits=httpx.Limits(max_connections=1000, max_keepalive_connections=500), proxies=None, # Optional: Add Tor/SOCKS5 here if needed ) as client: app.state.http_client = client yield app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) # --- Health Check --- @app.get("/") async def health_check(): return JSONResponse({"status": "ok", "target": TARGET_URL, "message": "Niansuh Proxy: Rate-Limit Destruction Mode 🔥"}) # --- Main Reverse Proxy (Aggressive Bypass) --- @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def reverse_proxy_handler(request: Request): start_time = time.monotonic() client: httpx.AsyncClient = request.app.state.http_client url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) # --- Spoof Everything --- random_ip = get_next_ip() request_headers = dict(request.headers) request_headers.pop("host", None) # Remove original host # --- Full Header Spoofing (Nginx-Proof) --- spoofed_headers = { "user-agent": random.choice(USER_AGENTS), "x-forwarded-for": random_ip, "x-real-ip": random_ip, "x-originating-ip": random_ip, "x-remote-ip": random_ip, "x-remote-addr": random_ip, "x-client-ip": random_ip, "x-forwarded": random_ip, "x-cluster-client-ip": random_ip, "cf-connecting-ip": random_ip, # Trick Nginx into thinking it's Cloudflare "true-client-ip": random_ip, "via": f"1.1 {random_ip}", "client-ip": random_ip, "accept-encoding": "gzip, deflate, br", # Mimic browser "accept-language": "en-US,en;q=0.9", # Mimic browser } # --- Preserve Auth & Randomize Headers --- if "authorization" in request.headers: spoofed_headers["authorization"] = request.headers["authorization"] request_headers.update(spoofed_headers) request_headers = shuffle_headers(request_headers) # Evade fingerprinting body = await request.body() # --- Retry Loop with Jitter Delays --- last_exception = None for attempt in range(MAX_RETRIES): try: # --- Random Delay (Avoid Detection) --- if attempt > 0: delay = random.uniform(0.1, 1.5) # Jitter delay await asyncio.sleep(delay) # --- Send Request --- rp_req = client.build_request( method=request.method, url=url, headers=request_headers, content=body, ) rp_resp = await client.send(rp_req, stream=True) # --- Log & Return --- duration_ms = (time.monotonic() - start_time) * 1000 if rp_resp.status_code not in RETRY_CODES or attempt == MAX_RETRIES - 1: log_func = logging.info if rp_resp.is_success else logging.warning log_func( f"Request {attempt + 1}/{MAX_RETRIES} | " f"Method: {request.method} | " f"Path: {url.path} | " f"Status: {rp_resp.status_code} | " f"Latency: {duration_ms:.2f}ms | " f"Spoofed IP: {random_ip}" ) return StreamingResponse( rp_resp.aiter_raw(), status_code=rp_resp.status_code, headers=dict(rp_resp.headers), background=BackgroundTask(rp_resp.aclose), ) # --- Retry if Blocked --- logging.warning( f"Attempt {attempt + 1}/{MAX_RETRIES} failed (Status: {rp_resp.status_code}). " f"Retrying with new IP: {get_next_ip()}..." ) await rp_resp.aclose() except (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError) as e: last_exception = e logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed: {str(e)}. Retrying...") # --- Final Failure --- duration_ms = (time.monotonic() - start_time) * 1000 logging.error( f"Request FAILED after {MAX_RETRIES} attempts | " f"Method: {request.method} | " f"Path: {url.path} | " f"Latency: {duration_ms:.2f}ms | " f"Error: {str(last_exception)}" ) raise HTTPException( status_code=502, detail=f"Niansuh Proxy Error: Target server blocked all {MAX_RETRIES} attempts. Last error: {str(last_exception)}", ) # --- Run the App (Gunicorn/Uvicorn) --- if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=8000, workers=4, # More workers = more parallel requests log_level=LOG_LEVEL.lower(), )