File size: 6,544 Bytes
2446f5f 500ef17 063d7d5 eab2c9c c90334d 7ee09b9 063d7d5 56d0fcf 7ee09b9 c90334d 7ee09b9 4988762 7ee09b9 eab2c9c 2446f5f 063d7d5 7ee09b9 063d7d5 e50ca24 7ee09b9 063d7d5 6b64125 063d7d5 2446f5f 7ee09b9 2446f5f 7ee09b9 063d7d5 7ee09b9 2446f5f eab2c9c 7ee09b9 eab2c9c 063d7d5 7ee09b9 d281b17 eab2c9c 063d7d5 6b64125 7ee09b9 6a8ddc4 eab2c9c 7ee09b9 eab2c9c 7ee09b9 eab2c9c 7ee09b9 eab2c9c 5e1596b eab2c9c 7ee09b9 eab2c9c 7ee09b9 063d7d5 e50ca24 7ee09b9 063d7d5 7ee09b9 063d7d5 7ee09b9 063d7d5 1c864be 063d7d5 5e1596b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse
from starlette.background import BackgroundTask
import os
import random
import logging
import time
from contextlib import asynccontextmanager
# --- Production-Ready Configuration ---
# All key settings are now configurable via environment variables.
# 1. Logging Configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 2. Target URL
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
# 3. Retry Logic Configuration
# Default to 7 retries as requested.
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))
# Default retry codes now include 500. Configurable via a comma-separated string.
DEFAULT_RETRY_CODES = "429,500,502,503,504"
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
try:
# Parse the comma-separated string into a set of integers.
RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
except ValueError:
logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
# --- Helper Function ---
def generate_random_ip():
"""Generates a random, valid-looking IPv4 address."""
return ".".join(str(random.randint(1, 254)) for _ in range(4))
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages the lifecycle of the HTTPX client."""
# Using a longer timeout for the client itself, but no timeout per-request.
async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
app.state.http_client = client
yield
# Initialize the FastAPI app with the lifespan manager and disabled docs
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- Reverse Proxy Logic ---
async def _reverse_proxy(request: Request):
"""
Forwards a request to the target URL with enhanced retry logic and latency logging.
"""
# Start timer for latency tracking. time.monotonic is used for reliable duration measurement.
start_time = time.monotonic()
client: httpx.AsyncClient = request.app.state.http_client
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
# --- Header Processing ---
request_headers = dict(request.headers)
request_headers.pop("host", None)
random_ip = generate_random_ip()
logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}")
specific_headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9,ru;q=0.8",
"content-type": "application/json",
"origin": "https://console.gmicloud.ai",
"referer": "https://console.gmicloud.ai/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"x-forwarded-for": random_ip,
"x-real-ip": random_ip,
}
request_headers.update(specific_headers)
if "authorization" in request.headers:
request_headers["authorization"] = request.headers["authorization"]
body = await request.body()
# --- Retry Logic ---
last_exception = None
for attempt in range(MAX_RETRIES):
try:
rp_req = client.build_request(
method=request.method, url=url, headers=request_headers, content=body
)
rp_resp = await client.send(rp_req, stream=True)
# If status is successful or not in our retry list, we are done.
if rp_resp.status_code not in RETRY_STATUS_CODES:
# Log latency and success before returning
duration_ms = (time.monotonic() - start_time) * 1000
logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# If we are on the last attempt, return the error response without retrying further.
if attempt == MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# Log the retry attempt before closing the response and looping again.
logging.warning(
f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..."
)
await rp_resp.aclose()
except httpx.ConnectError as e:
last_exception = e
logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")
# This block is reached if all attempts fail with a connection error.
duration_ms = (time.monotonic() - start_time) * 1000
logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
)
# --- API Endpoints ---
@app.api_route(
"/completions/{full_path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
)
async def chat_proxy_handler(request: Request):
"""Captures all requests under /completions/ and forwards them."""
return await _reverse_proxy(request)
@app.get("/")
async def health_check():
"""Provides a basic health check endpoint."""
return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"} |