import httpx from fastapi import FastAPI, Request, HTTPException from starlette.responses import StreamingResponse from starlette.background import BackgroundTask import os import random import logging import time from contextlib import asynccontextmanager # --- Production-Ready Configuration --- # All key settings are now configurable via environment variables. # 1. Logging Configuration LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s' ) # 2. Target URL TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat") # 3. Retry Logic Configuration # Default to 7 retries as requested. MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7")) # Default retry codes now include 500. Configurable via a comma-separated string. DEFAULT_RETRY_CODES = "429,500,502,503,504" RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES) try: # Parse the comma-separated string into a set of integers. RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')} logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}") except ValueError: logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}") RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')} # --- Helper Function --- def generate_random_ip(): """Generates a random, valid-looking IPv4 address.""" return ".".join(str(random.randint(1, 254)) for _ in range(4)) # --- HTTPX Client Lifecycle Management --- @asynccontextmanager async def lifespan(app: FastAPI): """Manages the lifecycle of the HTTPX client.""" # Using a longer timeout for the client itself, but no timeout per-request. async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client: app.state.http_client = client yield # Initialize the FastAPI app with the lifespan manager and disabled docs app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) # --- Reverse Proxy Logic --- async def _reverse_proxy(request: Request): """ Forwards a request to the target URL with enhanced retry logic and latency logging. """ # Start timer for latency tracking. time.monotonic is used for reliable duration measurement. start_time = time.monotonic() client: httpx.AsyncClient = request.app.state.http_client url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) # --- Header Processing --- request_headers = dict(request.headers) request_headers.pop("host", None) random_ip = generate_random_ip() logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}") specific_headers = { "accept": "application/json, text/plain, */*", "accept-language": "en-US,en;q=0.9,ru;q=0.8", "content-type": "application/json", "origin": "https://console.gmicloud.ai", "referer": "https://console.gmicloud.ai/", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "x-forwarded-for": random_ip, "x-real-ip": random_ip, } request_headers.update(specific_headers) if "authorization" in request.headers: request_headers["authorization"] = request.headers["authorization"] body = await request.body() # --- Retry Logic --- last_exception = None for attempt in range(MAX_RETRIES): try: rp_req = client.build_request( method=request.method, url=url, headers=request_headers, content=body ) rp_resp = await client.send(rp_req, stream=True) # If status is successful or not in our retry list, we are done. if rp_resp.status_code not in RETRY_STATUS_CODES: # Log latency and success before returning duration_ms = (time.monotonic() - start_time) * 1000 logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms") return StreamingResponse( rp_resp.aiter_raw(), status_code=rp_resp.status_code, headers=rp_resp.headers, background=BackgroundTask(rp_resp.aclose), ) # If we are on the last attempt, return the error response without retrying further. if attempt == MAX_RETRIES - 1: duration_ms = (time.monotonic() - start_time) * 1000 logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms") return StreamingResponse( rp_resp.aiter_raw(), status_code=rp_resp.status_code, headers=rp_resp.headers, background=BackgroundTask(rp_resp.aclose), ) # Log the retry attempt before closing the response and looping again. logging.warning( f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..." ) await rp_resp.aclose() except httpx.ConnectError as e: last_exception = e logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}") # This block is reached if all attempts fail with a connection error. duration_ms = (time.monotonic() - start_time) * 1000 logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms") raise HTTPException( status_code=502, detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}" ) # --- API Endpoints --- @app.api_route( "/completions/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"] ) async def chat_proxy_handler(request: Request): """Captures all requests under /completions/ and forwards them.""" return await _reverse_proxy(request) @app.get("/") async def health_check(): """Provides a basic health check endpoint.""" return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"}