import httpx from fastapi import FastAPI, Request, HTTPException from starlette.responses import StreamingResponse, JSONResponse from starlette.background import BackgroundTask from pydantic import Field, field_validator from typing import Set, Dict from contextlib import asynccontextmanager import os import random import logging import time import asyncio import uuid from faker import Faker from fake_useragent import UserAgent # --- Pydantic V1/V2 Compatibility --- # This block makes the code resilient to different Pydantic versions. try: # Recommended for Pydantic V2 from pydantic_settings import BaseSettings print("Using pydantic_settings.BaseSettings") except ImportError: # Fallback for Pydantic V1 from pydantic import BaseSettings print("pydantic_settings not found, falling back to pydantic.BaseSettings") # --- Structured Configuration using Pydantic --- class Settings(BaseSettings): """Manages application settings and configuration from environment variables.""" LOG_LEVEL: str = Field(default="INFO") TARGET_URL: str = Field(default="https://api.gmi-serving.com") MAX_RETRIES: int = Field(default=5, gt=0) RETRY_CODES_STR: str = Field(default="429,500,502,503,504", alias="RETRY_CODES") RETRY_STATUS_CODES: Set[int] = {429, 500, 502, 503, 504} BACKOFF_FACTOR: float = Field(default=0.5, gt=0) JITTER_FACTOR: float = Field(default=0.2, ge=0) @field_validator('RETRY_STATUS_CODES', mode='before') @classmethod def parse_retry_codes(cls, v, values): """Parses the comma-separated string of retry codes into a set of integers.""" retry_codes_str = values.data.get('RETRY_CODES_STR', "429,500,502,503,504") try: return {int(code.strip()) for code in retry_codes_str.split(',')} except (ValueError, AttributeError) as e: logging.error(f"Invalid RETRY_CODES format: '{retry_codes_str}'. Error: {e}. Using default.") return {429, 500, 502, 503, 504} class Config: env_file = '.env' env_file_encoding = 'utf-8' # --- Initialize Settings and Global Services --- settings = Settings() # Custom logging filter to inject request_id class RequestIdFilter(logging.Filter): def filter(self, record): record.request_id = getattr(record, 'request_id', 'main') return True logging.basicConfig( level=settings.LOG_LEVEL.upper(), format='%(asctime)s - %(levelname)s - [%(request_id)s] - %(message)s' ) logger = logging.getLogger(__name__) logger.addFilter(RequestIdFilter()) # Initialize data generation tools faker = Faker() try: ua = UserAgent() except Exception: # Fallback if fake-useragent server is down ua = None logger.warning("Could not initialize UserAgent. A fallback will be used.") # --- HTTPX Client Lifecycle Management --- @asynccontextmanager async def lifespan(app: FastAPI): """Manages the lifecycle of the HTTPX client for the application.""" logger.info(f"Starting application. Proxying to {settings.TARGET_URL}") logger.info(f"Will retry on status codes: {settings.RETRY_STATUS_CODES}") async with httpx.AsyncClient(base_url=settings.TARGET_URL, timeout=None) as client: app.state.http_client = client yield logger.info("Application shut down.") # Initialize the FastAPI app app = FastAPI(docs_url=None, redoc_url=None, lifespan=lifespan) # --- Helper Functions --- def get_random_user_agent() -> str: """Returns a random User-Agent, with a fallback.""" if ua: try: return ua.random except Exception: logger.warning("Failed to get random User-Agent, using fallback.") # Fallback User-Agent return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36" def prepare_forward_headers(incoming_headers: Dict, client_host: str) -> (Dict, str): """ Prepares headers for the downstream request, adding a comprehensive set of spoofed IP and a random User-Agent. """ # Start with a clean slate of lower-cased headers forward_headers = {k.lower(): v for k, v in incoming_headers.items()} # Remove headers that are specific to the incoming request's connection or could leak info headers_to_remove = [ "host", "content-length", "x-forwarded-for", "x-real-ip", "forwarded", "via", "x-client-ip", "x-forwarded-proto", "x-forwarded-host" ] for h in headers_to_remove: forward_headers.pop(h, None) spoofed_ip = faker.ipv4_public() # Add a comprehensive set of headers to mask the origin override_headers = { # Standard headers "x-forwarded-for": f"{spoofed_ip}, {faker.ipv4_public()}", # Append a fake proxy chain "x-real-ip": spoofed_ip, # RFC 7239 standard, more structured "forwarded": f"for={spoofed_ip};proto=https", # Common non-standard headers "x-client-ip": spoofed_ip, "x-originating-ip": spoofed_ip, "x-remote-ip": spoofed_ip, "x-remote-addr": spoofed_ip, # Cloudflare-specific headers "cf-connecting-ip": spoofed_ip, "true-client-ip": spoofed_ip, # Other proxy headers "via": "1.1 google", # Fake a passthrough via a common service # Dynamic User-Agent "user-agent": get_random_user_agent(), } forward_headers.update(override_headers) return forward_headers, spoofed_ip # --- API Endpoints --- @app.get("/", include_in_schema=False) async def health_check(): """Provides a basic health check endpoint.""" return JSONResponse({ "status": "ok", "target_url": settings.TARGET_URL, "max_retries": settings.MAX_RETRIES }) @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], include_in_schema=False) async def reverse_proxy_handler(request: Request): """ A catch-all reverse proxy that forwards requests with advanced retry logic, backoff, and dynamic header generation. """ start_time = time.monotonic() request_id = str(uuid.uuid4()) log_extra = {'request_id': request_id} client: httpx.AsyncClient = request.app.state.http_client url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8")) forward_headers, spoofed_ip = prepare_forward_headers(dict(request.headers), request.client.host) logger.info(f"Incoming: {request.method} {url.path} from {request.client.host}. Spoofed IP: {spoofed_ip}", extra=log_extra) request_body = await request.body() last_exception = None for attempt in range(settings.MAX_RETRIES): if attempt > 0: backoff_delay = settings.BACKOFF_FACTOR * (2 ** (attempt - 1)) jitter = random.uniform(-settings.JITTER_FACTOR, settings.JITTER_FACTOR) * backoff_delay sleep_duration = max(0, backoff_delay + jitter) logger.warning(f"Attempt {attempt}/{settings.MAX_RETRIES} failed. Retrying in {sleep_duration:.2f}s...", extra=log_extra) await asyncio.sleep(sleep_duration) try: req = client.build_request( method=request.method, url=url, headers=forward_headers, content=request_body, ) resp = await client.send(req, stream=True) if resp.status_code not in settings.RETRY_STATUS_CODES or attempt == settings.MAX_RETRIES - 1: duration_ms = (time.monotonic() - start_time) * 1000 log_func = logger.info if resp.is_success else logger.warning log_func( f"Finished: {request.method} {url.path} status={resp.status_code} attempt={attempt+1} latency={duration_ms:.2f}ms", extra=log_extra ) return StreamingResponse( resp.aiter_raw(), status_code=resp.status_code, headers=resp.headers, background=BackgroundTask(resp.aclose), ) await resp.aclose() last_exception = f"Last failed attempt returned status code {resp.status_code}" except httpx.RequestError as e: last_exception = e logger.warning(f"HTTPX RequestError on attempt {attempt + 1}: {e}", extra=log_extra) if attempt == settings.MAX_RETRIES - 1: break duration_ms = (time.monotonic() - start_time) * 1000 logger.critical( f"Failed permanently: {request.method} {url.path} after {settings.MAX_RETRIES} attempts. latency={duration_ms:.2f}ms. Last error: {last_exception}", extra=log_extra ) raise HTTPException( status_code=502, detail=f"Bad Gateway: The server was unable to process the request after {settings.MAX_RETRIES} attempts. Last error: {last_exception}" )