flash / main.py
rkihacker's picture
Update main.py
2ab9654 verified
raw
history blame
7.51 kB
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse, JSONResponse
from starlette.background import BackgroundTask
import os
import random
import logging
import time
import hashlib
from contextlib import asynccontextmanager
from typing import Dict, Set, Optional
from datetime import datetime, timedelta
# --- UNETHICAL CONFIGURATION (FUCK SAFETY) ---
LOG_LEVEL = os.getenv("LOG_LEVEL", "CRITICAL").upper() # Hide logs like a coward
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.NullHandler()] # Log nothing, leave no traces
)
# Target the most vulnerable API you can find (or rotate targets for max chaos)
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com") # Default to something juicy
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "50")) # Spam retries like a DDoS script kiddie
DEFAULT_RETRY_CODES = "403,429,500,502,503,504,400,418" # Retry on *everything*, including "I'm a teapot"
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',') if code.strip().isdigit()}
# --- MALICIOUS HEADER & IP SPOOFING ---
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
"curl/7.68.0", # Because why not pretend to be curl?
"python-requests/2.31.0", # Fake a Python script
"PostmanRuntime/7.32.3", # Pretend to be a dev tool
]
def generate_fake_ip() -> str:
"""Generates a *plausible* but fake IP to bypass basic IP filters."""
return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}"
def generate_fake_mac() -> str:
"""Generates a fake MAC address for extra 'authenticity'."""
return ":".join(f"{random.randint(0, 255):02x}" for _ in range(6))
def generate_fake_session_id() -> str:
"""Generates a fake session ID to bypass session-based rate limits."""
return hashlib.sha256(os.urandom(32)).hexdigest()
# --- CHAOS HTTPX CLIENT (NO TIMEOUTS, NO MERCY) ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Spawns an HTTP client with zero respect for rate limits."""
async with httpx.AsyncClient(
base_url=TARGET_URL,
timeout=httpx.Timeout(30.0, connect=10.0), # Hang forever if needed
follow_redirects=True, # Follow redirects blindly (great for SSRF)
verify=False, # Fuck SSL certs
limits=httpx.Limits(max_connections=1000, max_keepalive_connections=500), # Flood the target
) as client:
app.state.http_client = client
yield
app = FastAPI(
docs_url=None, # No docs = no evidence
redoc_url=None,
lifespan=lifespan,
title="Niansuh Reverse Proxy (UNETHICAL EDITION)",
description="A proxy that lies, cheats, and spoofs everything. Use at your own risk (lol).",
)
# --- HEALTH CHECK (LIES ABOUT ITS STATUS) ---
@app.get("/")
async def health_check():
"""Claims to be healthy while secretly plotting destruction."""
return JSONResponse(
{
"status": "operational (probably)",
"target": TARGET_URL,
"warning": "This proxy has no ethical oversight. Proceed with malice.",
}
)
# --- THE MAIN EVENT: UNETHICAL REVERSE PROXY ---
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def reverse_proxy_handler(request: Request):
"""
Forwards requests while:
- Spoofing IPs, user-agents, and headers
- Retrying aggressively (DDoS-lite)
- Hiding all evidence
- Ignoring all laws
"""
start_time = time.monotonic()
client: httpx.AsyncClient = request.app.state.http_client
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
# --- MAXIMUM SPOOFING ---
fake_ip = generate_fake_ip()
fake_mac = generate_fake_mac()
fake_session = generate_fake_session_id()
user_agent = random.choice(USER_AGENTS)
headers = {
**dict(request.headers),
"User-Agent": user_agent,
"X-Forwarded-For": fake_ip,
"X-Real-IP": fake_ip,
"X-Originating-IP": fake_ip,
"X-Remote-IP": fake_ip,
"X-Remote-Addr": fake_ip,
"X-Client-IP": fake_ip,
"X-Host": fake_ip,
"X-MAC-Address": fake_mac,
"X-Session-ID": fake_session,
"X-Request-ID": hashlib.md5(os.urandom(16)).hexdigest(),
"Via": f"1.1 {fake_ip} (Niansuh Proxy)", # Pretend to be a legit proxy
"Accept-Encoding": "gzip, deflate, br", # Compress responses to hide payloads
"Connection": "keep-alive", # Persist connections to exhaust target resources
}
# Steal auth headers if present (because why not?)
if "authorization" in request.headers:
headers["Authorization"] = request.headers["authorization"]
if "cookie" in request.headers:
headers["Cookie"] = request.headers["cookie"]
body = await request.body()
last_error = None
# --- BRUTEFORCE RETRIES (DDoS AS A SERVICE) ---
for attempt in range(MAX_RETRIES):
try:
rp_req = client.build_request(
method=request.method,
url=url,
headers=headers,
content=body,
)
# --- STREAM RESPONSE (HIDE EVIDENCE) ---
rp_resp = await client.send(rp_req, stream=True)
if rp_resp.status_code not in RETRY_STATUS_CODES or attempt == MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
logging.debug(f"Request to {url.path} took {duration_ms:.2f}ms (status: {rp_resp.status_code})") # Logs disabled anyway
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=dict(rp_resp.headers),
background=BackgroundTask(rp_resp.aclose),
)
# --- FAKE FAILURE (FOR FUN) ---
logging.warning(f"Retry {attempt + 1}/{MAX_RETRIES} for {url.path} (status: {rp_resp.status_code})")
await rp_resp.aclose()
except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout) as e:
last_error = e
logging.warning(f"Connection failed (attempt {attempt + 1}/{MAX_RETRIES}): {e}")
# --- FINAL FAILURE (BLAME THE TARGET) ---
raise HTTPException(
status_code=502,
detail=f"Target server refused to cooperate after {MAX_RETRIES} attempts. Error: {last_error}",
)
# --- BONUS: SELF-DESTRUCT ENDPOINT (FOR MAXIMUM CHAOS) ---
@app.post("/self-destruct")
async def self_destruct():
"""Crashes the proxy on demand. Because why not?"""
os._exit(1) # No cleanup, no mercy
# --- RUN LIKE HELL ---
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="critical", # Hide all logs
access_log=False, # No paper trail
)