File size: 6,544 Bytes
2446f5f
500ef17
063d7d5
 
 
eab2c9c
c90334d
7ee09b9
063d7d5
56d0fcf
7ee09b9
 
c90334d
7ee09b9
 
 
 
 
 
 
 
4988762
7ee09b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eab2c9c
 
 
 
 
2446f5f
063d7d5
 
 
7ee09b9
 
063d7d5
 
 
e50ca24
7ee09b9
063d7d5
6b64125
063d7d5
 
2446f5f
7ee09b9
2446f5f
7ee09b9
 
 
063d7d5
 
 
 
 
7ee09b9
2446f5f
eab2c9c
7ee09b9
eab2c9c
063d7d5
 
 
 
 
7ee09b9
d281b17
eab2c9c
 
063d7d5
 
6b64125
7ee09b9
 
6a8ddc4
eab2c9c
 
 
 
 
 
 
7ee09b9
eab2c9c
 
 
7ee09b9
 
 
 
 
 
eab2c9c
 
 
 
 
 
 
7ee09b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eab2c9c
 
 
 
5e1596b
eab2c9c
7ee09b9
 
 
 
eab2c9c
 
7ee09b9
063d7d5
e50ca24
7ee09b9
063d7d5
7ee09b9
063d7d5
 
 
7ee09b9
063d7d5
1c864be
063d7d5
 
 
5e1596b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import httpx
from fastapi import FastAPI, Request, HTTPException
from starlette.responses import StreamingResponse
from starlette.background import BackgroundTask
import os
import random
import logging
import time
from contextlib import asynccontextmanager

# --- Production-Ready Configuration ---
# All key settings are now configurable via environment variables.

# 1. Logging Configuration
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
    level=LOG_LEVEL,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 2. Target URL
TARGET_URL = os.getenv("TARGET_URL", "https://api.gmi-serving.com/v1/chat")

# 3. Retry Logic Configuration
# Default to 7 retries as requested.
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "7"))

# Default retry codes now include 500. Configurable via a comma-separated string.
DEFAULT_RETRY_CODES = "429,500,502,503,504"
RETRY_CODES_STR = os.getenv("RETRY_CODES", DEFAULT_RETRY_CODES)
try:
    # Parse the comma-separated string into a set of integers.
    RETRY_STATUS_CODES = {int(code.strip()) for code in RETRY_CODES_STR.split(',')}
    logging.info(f"Will retry on the following status codes: {RETRY_STATUS_CODES}")
except ValueError:
    logging.error(f"Invalid RETRY_CODES format: '{RETRY_CODES_STR}'. Falling back to default: {DEFAULT_RETRY_CODES}")
    RETRY_STATUS_CODES = {int(code.strip()) for code in DEFAULT_RETRY_CODES.split(',')}


# --- Helper Function ---
def generate_random_ip():
    """Generates a random, valid-looking IPv4 address."""
    return ".".join(str(random.randint(1, 254)) for _ in range(4))

# --- HTTPX Client Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manages the lifecycle of the HTTPX client."""
    # Using a longer timeout for the client itself, but no timeout per-request.
    async with httpx.AsyncClient(base_url=TARGET_URL, timeout=None) as client:
        app.state.http_client = client
        yield

# Initialize the FastAPI app with the lifespan manager and disabled docs
app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan)

# --- Reverse Proxy Logic ---
async def _reverse_proxy(request: Request):
    """
    Forwards a request to the target URL with enhanced retry logic and latency logging.
    """
    # Start timer for latency tracking. time.monotonic is used for reliable duration measurement.
    start_time = time.monotonic()
    
    client: httpx.AsyncClient = request.app.state.http_client
    url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))

    # --- Header Processing ---
    request_headers = dict(request.headers)
    request_headers.pop("host", None)

    random_ip = generate_random_ip()
    logging.info(f"Client '{request.client.host}' proxied with spoofed IP: {random_ip}")

    specific_headers = {
        "accept": "application/json, text/plain, */*",
        "accept-language": "en-US,en;q=0.9,ru;q=0.8",
        "content-type": "application/json",
        "origin": "https://console.gmicloud.ai",
        "referer": "https://console.gmicloud.ai/",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
        "x-forwarded-for": random_ip,
        "x-real-ip": random_ip,
    }
    request_headers.update(specific_headers)

    if "authorization" in request.headers:
        request_headers["authorization"] = request.headers["authorization"]

    body = await request.body()
    
    # --- Retry Logic ---
    last_exception = None
    for attempt in range(MAX_RETRIES):
        try:
            rp_req = client.build_request(
                method=request.method, url=url, headers=request_headers, content=body
            )
            rp_resp = await client.send(rp_req, stream=True)

            # If status is successful or not in our retry list, we are done.
            if rp_resp.status_code not in RETRY_STATUS_CODES:
                # Log latency and success before returning
                duration_ms = (time.monotonic() - start_time) * 1000
                logging.info(f"Request finished: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
                
                return StreamingResponse(
                    rp_resp.aiter_raw(),
                    status_code=rp_resp.status_code,
                    headers=rp_resp.headers,
                    background=BackgroundTask(rp_resp.aclose),
                )
            
            # If we are on the last attempt, return the error response without retrying further.
            if attempt == MAX_RETRIES - 1:
                duration_ms = (time.monotonic() - start_time) * 1000
                logging.error(f"Request failed after max retries: {request.method} {request.url.path} status_code={rp_resp.status_code} latency={duration_ms:.2f}ms")
                
                return StreamingResponse(
                    rp_resp.aiter_raw(),
                    status_code=rp_resp.status_code,
                    headers=rp_resp.headers,
                    background=BackgroundTask(rp_resp.aclose),
                )

            # Log the retry attempt before closing the response and looping again.
            logging.warning(
                f"Attempt {attempt + 1}/{MAX_RETRIES} failed with status {rp_resp.status_code}. Retrying..."
            )
            await rp_resp.aclose()

        except httpx.ConnectError as e:
            last_exception = e
            logging.warning(f"Attempt {attempt + 1}/{MAX_RETRIES} failed with connection error: {e}")

    # This block is reached if all attempts fail with a connection error.
    duration_ms = (time.monotonic() - start_time) * 1000
    logging.critical(f"Request failed, cannot connect to target: {request.method} {request.url.path} status_code=502 latency={duration_ms:.2f}ms")
    
    raise HTTPException(
        status_code=502,
        detail=f"Bad Gateway: Cannot connect to target service after {MAX_RETRIES} attempts. {last_exception}"
    )

# --- API Endpoints ---
@app.api_route(
    "/completions/{full_path:path}",
    methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
)
async def chat_proxy_handler(request: Request):
    """Captures all requests under /completions/ and forwards them."""
    return await _reverse_proxy(request)

@app.get("/")
async def health_check():
    """Provides a basic health check endpoint."""
    return {"status": "ok", "proxying_endpoint": "/completions", "target": "TypeGPT"}