flash2 / main.py
rkihacker's picture
Update main.py
7ee09b9 verified
raw
history blame
6.54 kB
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse
from starlette.background import BackgroundTask
import os
import random
import logging
import time
from contextlib import asynccontextmanager
# --- Production-Ready Configuration ---
# All key settings are now configurable via environment variables.
# 1. Logging Configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 2. Target URL
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")
# 3. Retry Logic Configuration
# Default to 7 retries as requested.
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))
# Default retry codes now include 500. Configurable via a comma-separated string.
DEFAULT_RETRY_CODES = "429,500,502,503,504"
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
try:
# Parse the comma-separated string into a set of integers.
RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
except ValueError:
logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}
# --- Helper Function ---
def generate_random_ip():
"""Generates a random, valid-looking IPv4 address."""
return ".".join(str(random.randint(1, 254)) for _ in range(4))
# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages the lifecycle of the HTTPX client."""
# Using a longer timeout for the client itself, but no timeout per-request.
async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
app.state.http_client = client
yield
# Initialize the FastAPI app with the lifespan manager and disabled docs
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)
# --- Reverse Proxy Logic ---
async def _reverse_proxy(request: Request):
"""
Forwards a request to the target URL with enhanced retry logic and latency logging.
"""
# Start timer for latency tracking. time.monotonic is used for reliable duration measurement.
start_time = time.monotonic()
client: httpx.AsyncClient = request.app.state.http_client
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
# --- Header Processing ---
request_headers = dict(request.headers)
request_headers.pop("host", None)
random_ip = generate_random_ip()
logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}")
specific_headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9,ru;q=0.8",
"content-type": "application/json",
"origin": "https://console.gmicloud.ai",
"referer": "https://console.gmicloud.ai/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"x-forwarded-for": random_ip,
"x-real-ip": random_ip,
}
request_headers.update(specific_headers)
if "authorization" in request.headers:
request_headers["authorization"] = request.headers["authorization"]
body = await request.body()
# --- Retry Logic ---
last_exception = None
for attempt in range(MAX_RETRIES):
try:
rp_req = client.build_request(
method=request.method, url=url, headers=request_headers, content=body
)
rp_resp = await client.send(rp_req, stream=True)
# If status is successful or not in our retry list, we are done.
if rp_resp.status_code not in RETRY_STATUS_CODES:
# Log latency and success before returning
duration_ms = (time.monotonic() - start_time) * 1000
logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# If we are on the last attempt, return the error response without retrying further.
if attempt == MAX_RETRIES - 1:
duration_ms = (time.monotonic() - start_time) * 1000
logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
# Log the retry attempt before closing the response and looping again.
logging.warning(
f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..."
)
await rp_resp.aclose()
except httpx.ConnectError as e:
last_exception = e
logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")
# This block is reached if all attempts fail with a connection error.
duration_ms = (time.monotonic() - start_time) * 1000
logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
raise HTTPException(
status_code=502,
detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
)
# --- API Endpoints ---
@app.api_route(
"/completions/{full_path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
)
async def chat_proxy_handler(request: Request):
"""Captures all requests under /completions/ and forwards them."""
return await _reverse_proxy(request)
@app.get("/")
async def health_check():
"""Provides a basic health check endpoint."""
return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"}