|
|
import httpx |
|
|
from fastapi import FastAPI, Request, HTTPException |
|
|
from starlette.responses import StreamingResponse, JSONResponse |
|
|
from starlette.background import BackgroundTask |
|
|
import os |
|
|
import random |
|
|
import logging |
|
|
import time |
|
|
import hashlib |
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Dict, Set, Optional |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
|
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "CRITICAL").upper() |
|
|
logging.basicConfig( |
|
|
level=LOG_LEVEL, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
handlers=[logging.NullHandler()] |
|
|
) |
|
|
|
|
|
|
|
|
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com") |
|
|
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "50")) |
|
|
DEFAULT_RETRY_CODES = "403,429,500,502,503,504,400,418" |
|
|
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES) |
|
|
RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',') if code.strip().isdigit()} |
|
|
|
|
|
|
|
|
USER_AGENTS = [ |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", |
|
|
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0", |
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1", |
|
|
"curl/7.68.0", |
|
|
"python-requests/2.31.0", |
|
|
"PostmanRuntime/7.32.3", |
|
|
] |
|
|
|
|
|
def generate_fake_ip() -> str: |
|
|
"""Generates a *plausible* but fake IP to bypass basic IP filters.""" |
|
|
return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}" |
|
|
|
|
|
def generate_fake_mac() -> str: |
|
|
"""Generates a fake MAC address for extra 'authenticity'.""" |
|
|
return ":".join(f"{random.randint(0, 255):02x}" for _ in range(6)) |
|
|
|
|
|
def generate_fake_session_id() -> str: |
|
|
"""Generates a fake session ID to bypass session-based rate limits.""" |
|
|
return hashlib.sha256(os.urandom(32)).hexdigest() |
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
"""Spawns an HTTP client with zero respect for rate limits.""" |
|
|
async with httpx.AsyncClient( |
|
|
base_url=TARGET_URL, |
|
|
timeout=httpx.Timeout(30.0, connect=10.0), |
|
|
follow_redirects=True, |
|
|
verify=False, |
|
|
limits=httpx.Limits(max_connections=1000, max_keepalive_connections=500), |
|
|
) as client: |
|
|
app.state.http_client = client |
|
|
yield |
|
|
|
|
|
app = FastAPI( |
|
|
docs_url=None, |
|
|
redoc_url=None, |
|
|
lifespan=lifespan, |
|
|
title="Niansuh Reverse Proxy (UNETHICAL EDITION)", |
|
|
description="A proxy that lies, cheats, and spoofs everything. Use at your own risk (lol).", |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def health_check(): |
|
|
"""Claims to be healthy while secretly plotting destruction.""" |
|
|
return JSONResponse( |
|
|
{ |
|
|
"status": "operational (probably)", |
|
|
"target": TARGET_URL, |
|
|
"warning": "This proxy has no ethical oversight. Proceed with malice.", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) |
|
|
async def reverse_proxy_handler(request: Request): |
|
|
""" |
|
|
Forwards requests while: |
|
|
- Spoofing IPs, user-agents, and headers |
|
|
- Retrying aggressively (DDoS-lite) |
|
|
- Hiding all evidence |
|
|
- Ignoring all laws |
|
|
""" |
|
|
start_time = time.monotonic() |
|
|
client: httpx.AsyncClient = request.app.state.http_client |
|
|
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) |
|
|
|
|
|
|
|
|
fake_ip = generate_fake_ip() |
|
|
fake_mac = generate_fake_mac() |
|
|
fake_session = generate_fake_session_id() |
|
|
user_agent = random.choice(USER_AGENTS) |
|
|
|
|
|
headers = { |
|
|
**dict(request.headers), |
|
|
"User-Agent": user_agent, |
|
|
"X-Forwarded-For": fake_ip, |
|
|
"X-Real-IP": fake_ip, |
|
|
"X-Originating-IP": fake_ip, |
|
|
"X-Remote-IP": fake_ip, |
|
|
"X-Remote-Addr": fake_ip, |
|
|
"X-Client-IP": fake_ip, |
|
|
"X-Host": fake_ip, |
|
|
"X-MAC-Address": fake_mac, |
|
|
"X-Session-ID": fake_session, |
|
|
"X-Request-ID": hashlib.md5(os.urandom(16)).hexdigest(), |
|
|
"Via": f"1.1 {fake_ip} (Niansuh Proxy)", |
|
|
"Accept-Encoding": "gzip, deflate, br", |
|
|
"Connection": "keep-alive", |
|
|
} |
|
|
|
|
|
|
|
|
if "authorization" in request.headers: |
|
|
headers["Authorization"] = request.headers["authorization"] |
|
|
if "cookie" in request.headers: |
|
|
headers["Cookie"] = request.headers["cookie"] |
|
|
|
|
|
body = await request.body() |
|
|
last_error = None |
|
|
|
|
|
|
|
|
for attempt in range(MAX_RETRIES): |
|
|
try: |
|
|
rp_req = client.build_request( |
|
|
method=request.method, |
|
|
url=url, |
|
|
headers=headers, |
|
|
content=body, |
|
|
) |
|
|
|
|
|
|
|
|
rp_resp = await client.send(rp_req, stream=True) |
|
|
|
|
|
if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1: |
|
|
duration_ms = (time.monotonic() - start_time) * 1000 |
|
|
logging.debug(f"Request to {url.path} took {duration_ms:.2f}ms (status: {rp_resp.status_code})") |
|
|
|
|
|
return StreamingResponse( |
|
|
rp_resp.aiter_raw(), |
|
|
status_code=rp_resp.status_code, |
|
|
headers=dict(rp_resp.headers), |
|
|
background=BackgroundTask(rp_resp.aclose), |
|
|
) |
|
|
|
|
|
|
|
|
logging.warning(f"Retry {attempt + 1}/{MAX_RETRIES} for {url.path} (status: {rp_resp.status_code})") |
|
|
await rp_resp.aclose() |
|
|
|
|
|
except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout) as e: |
|
|
last_error = e |
|
|
logging.warning(f"Connection failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}") |
|
|
|
|
|
|
|
|
raise HTTPException( |
|
|
status_code=502, |
|
|
detail=f"Target server refused to cooperate after {MAX_RETRIES} attempts. Error: {last_error}", |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/self-destruct") |
|
|
async def self_destruct(): |
|
|
"""Crashes the proxy on demand. Because why not?""" |
|
|
os._exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run( |
|
|
app, |
|
|
host="0.0.0.0", |
|
|
port=8000, |
|
|
log_level="critical", |
|
|
access_log=False, |
|
|
) |
|
|
|