Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| gemini-web2api - Gemini Web to OpenAI API proxy. | |
| Converts Google Gemini's web interface into an OpenAI-compatible API server. | |
| Uses curl_cffi for Chrome TLS fingerprint impersonation to avoid bot detection. | |
| Usage: | |
| python gemini_web2api.py [--port 8081] [--config config.json] | |
| Client configuration (Cherry Studio, ChatBox, etc.): | |
| Base URL: http://localhost:8081/v1 | |
| API Key: Set via API_KEY environment variable (Hugging Face Secrets) | |
| """ | |
| import json | |
| import urllib.request | |
| import urllib.parse | |
| import time | |
| import ssl | |
| import sys | |
| import uuid | |
| import re | |
| import os | |
| import random | |
| import hashlib | |
| import argparse | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| from socketserver import ThreadingMixIn | |
| from collections import OrderedDict | |
| __version__ = "2.0.0" | |
| # βββ curl_cffi with fallback ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # curl_cffi provides Chrome TLS fingerprint impersonation, making requests | |
| # indistinguishable from real Chrome browsers at the TLS layer. | |
| # Falls back to stdlib urllib if not available (less stealthy). | |
| try: | |
| from curl_cffi.requests import Session as CurlSession | |
| HAS_CURL_CFFI = True | |
| except ImportError: | |
| HAS_CURL_CFFI = False | |
| CurlSession = None | |
| # βββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DEFAULT_CONFIG = { | |
| "port": 8081, | |
| "host": "0.0.0.0", | |
| "retry_attempts": 3, | |
| "retry_delay_sec": 2, | |
| "request_timeout_sec": 180, | |
| "gemini_bl": "boq_assistant-bard-web-server_20260525.09_p0", | |
| "default_model": "gemini-flash", | |
| "log_requests": True, | |
| "cookie_file": None, | |
| "proxy": None, | |
| "api_key": os.environ.get("API_KEY"), # Set via Hugging Face Secrets | |
| # Chrome fingerprint settings | |
| "chrome_version": 124, | |
| "impersonate_target": "chrome124", | |
| # Request jitter (ms) - randomized delays to mimic human behavior | |
| "jitter_min_ms": 50, | |
| "jitter_max_ms": 300, | |
| "debug_mode": False, | |
| } | |
| CONFIG = dict(DEFAULT_CONFIG) | |
| # βββ Models (synced from upstream xwteam/gemini2api v1.6.15) βββββββββββββββββ | |
| # Model selection via x-goog-ext-525001261-jspb header with hex model IDs. | |
| # This replaces the old integer mode category approach. | |
| MODEL_HEADER_KEY = "x-goog-ext-525001261-jspb" | |
| GEMINI_MODELS = { | |
| # Internal model name β routing info | |
| # think: 0 = thinking enabled (for thinking models), 4 = thinking disabled (for standard models) | |
| "gemini-3-pro": { | |
| "id": "9d8ca3786ebdfbea", "capacity": 1, "think": 4, | |
| "desc": "Pro model (free tier)", | |
| }, | |
| "gemini-3-flash": { | |
| "id": "fbb127bbb056c959", "capacity": 1, "think": 4, | |
| "desc": "Fast general-purpose model", | |
| }, | |
| "gemini-3-flash-thinking": { | |
| "id": "5bf011840784117a", "capacity": 1, "think": 0, | |
| "desc": "Deep thinking mode", | |
| }, | |
| # Pro-only (paid tier) models | |
| "gemini-3-pro-plus": { | |
| "id": "e6fa609c3fa255c0", "capacity": 4, "think": 4, | |
| "desc": "Pro+ model (requires subscription)", | |
| }, | |
| "gemini-3-flash-plus": { | |
| "id": "56fdd199312815e2", "capacity": 4, "think": 4, | |
| "desc": "Flash+ model (requires subscription)", | |
| }, | |
| "gemini-3-flash-thinking-plus": { | |
| "id": "e051ce1aa80aa576", "capacity": 4, "think": 0, | |
| "desc": "Thinking+ model (requires subscription)", | |
| }, | |
| } | |
| # Stable public model names (API contract - never change these) | |
| # Maps public name β family β resolved to internal name | |
| PUBLIC_MODELS = { | |
| "gemini-pro": {"family": "pro", "default": "gemini-3-pro", | |
| "desc": "Pro model for complex tasks"}, | |
| "gemini-flash": {"family": "flash", "default": "gemini-3-flash", | |
| "desc": "Fast general-purpose model"}, | |
| "gemini-flash-thinking": {"family": "flash-thinking", "default": "gemini-3-flash-thinking", | |
| "desc": "Deep thinking with extended output"}, | |
| } | |
| # Legacy model name aliases β stable public name | |
| MODEL_ALIASES = { | |
| # Old names from your v1.0.0 | |
| "gemini-3.5-flash": "gemini-flash", | |
| "gemini-3.5-flash-thinking": "gemini-flash-thinking", | |
| "gemini-3.5-flash-thinking-lite": "gemini-flash-thinking", | |
| "gemini-3.1-pro": "gemini-pro", | |
| "gemini-auto": "gemini-flash", | |
| "gemini-flash-lite": "gemini-flash", | |
| # Upstream aliases | |
| "gemini-2.5-pro": "gemini-pro", | |
| "gemini-2.5-flash": "gemini-flash", | |
| "gemini-2.5-flash-thinking": "gemini-flash-thinking", | |
| "gemini-2.5-pro-preview-05-06": "gemini-pro", | |
| "gemini-2.5-flash-preview-04-17": "gemini-flash", | |
| "gemini-2.5-flash-preview-05-20": "gemini-flash", | |
| "gemini-2.0-flash": "gemini-flash", | |
| "gemini-2.0-flash-thinking": "gemini-flash-thinking", | |
| "gemini-2.0-flash-lite": "gemini-flash", | |
| "gemini-1.5-pro": "gemini-pro", | |
| "gemini-1.5-flash": "gemini-flash", | |
| } | |
| # All model names exposed to clients (public names only for API stability) | |
| EXPOSED_MODELS = PUBLIC_MODELS | |
| def resolve_model(model_name: str) -> tuple: | |
| """Resolve any model name to (public_name, internal_name, model_info, error). | |
| Resolution chain: | |
| 1. Legacy alias β public name | |
| 2. Public name β internal name (via family default) | |
| 3. Already an internal name β use directly | |
| """ | |
| # Step 1: resolve aliases | |
| name = MODEL_ALIASES.get(model_name, model_name) | |
| # Step 2: if it's a public name, map to internal | |
| if name in PUBLIC_MODELS: | |
| pub = PUBLIC_MODELS[name] | |
| internal = pub["default"] | |
| info = GEMINI_MODELS.get(internal) | |
| if not info: | |
| return None, None, None, f"Internal model {internal} not found" | |
| return name, internal, info, None | |
| # Step 3: if it's already an internal name | |
| if name in GEMINI_MODELS: | |
| info = GEMINI_MODELS[name] | |
| # Find the public name for this internal model | |
| pub_name = name | |
| for pn, pv in PUBLIC_MODELS.items(): | |
| if pv["default"] == name: | |
| pub_name = pn | |
| break | |
| return pub_name, name, info, None | |
| return None, None, None, f"Unknown model: {model_name}. Available: {', '.join(PUBLIC_MODELS.keys())}" | |
| def build_model_headers(model_info: dict) -> dict: | |
| """Build the x-goog-ext headers for model selection.""" | |
| if not model_info: | |
| return {} | |
| return { | |
| MODEL_HEADER_KEY: f'[1,null,null,null,"{model_info["id"]}",null,null,0,[4],null,null,{model_info["capacity"]}]', | |
| "x-goog-ext-73010989-jspb": "[0]", | |
| "x-goog-ext-73010990-jspb": "[0]", | |
| } | |
| # βββ Utilities βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log(msg: str): | |
| if CONFIG["log_requests"]: | |
| sys.stderr.write(f"[{time.strftime('%H:%M:%S')}] {msg}\n") | |
| sys.stderr.flush() | |
| def apply_jitter(): | |
| """Random delay to mimic human behavior.""" | |
| delay = random.uniform(CONFIG["jitter_min_ms"], CONFIG["jitter_max_ms"]) / 1000.0 | |
| time.sleep(delay) | |
| def load_cookie() -> tuple: | |
| """Load cookie from file. Returns (cookie_str, sapisid).""" | |
| cookie_file = CONFIG.get("cookie_file") | |
| if not cookie_file: | |
| return "", None | |
| if not os.path.exists(cookie_file): | |
| return "", None | |
| try: | |
| with open(cookie_file, "r") as f: | |
| content = f.read().strip() | |
| if content.startswith("{"): | |
| data = json.loads(content) | |
| cookie_str = data.get("cookie", "") | |
| sapisid = data.get("sapisid", "") | |
| else: | |
| cookie_str = content | |
| pairs = dict(p.split("=", 1) for p in cookie_str.split("; ") if "=" in p) | |
| sapisid = pairs.get("SAPISID", "") | |
| return cookie_str, sapisid if sapisid else None | |
| except Exception as e: | |
| log(f"Cookie load error: {e}") | |
| return "", None | |
| def make_sapisidhash(sapisid: str) -> str: | |
| ts = int(time.time()) | |
| h = hashlib.sha1(f"{ts} {sapisid} https://gemini.google.com".encode()).hexdigest() | |
| return f"SAPISIDHASH {ts}_{h}" | |
| # βββ Chrome-like Request Headers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_chrome_headers(method: str = "POST", content_type: str = None) -> OrderedDict: | |
| """Build Chrome-like request headers in the correct order. | |
| Chrome sends headers in a specific order that differs from Python defaults. | |
| Matching this order helps avoid fingerprint-based detection. | |
| """ | |
| ver = CONFIG["chrome_version"] | |
| headers = OrderedDict() | |
| # Chrome header order (important for fingerprint matching) | |
| headers["Host"] = "gemini.google.com" | |
| if content_type: | |
| headers["Content-Type"] = content_type | |
| headers["Sec-Ch-Ua"] = f'"Chromium";v="{ver}", "Google Chrome";v="{ver}", "Not-A.Brand";v="99"' | |
| headers["Sec-Ch-Ua-Mobile"] = "?0" | |
| headers["Sec-Ch-Ua-Platform"] = '"Windows"' | |
| headers["User-Agent"] = ( | |
| f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| f"AppleWebKit/537.36 (KHTML, like Gecko) " | |
| f"Chrome/{ver}.0.0.0 Safari/537.36" | |
| ) | |
| headers["X-Same-Domain"] = "1" | |
| headers["Origin"] = "https://gemini.google.com" | |
| headers["Referer"] = "https://gemini.google.com/app" | |
| # Sec-Fetch headers (differ by method) | |
| if method == "POST": | |
| headers["Sec-Fetch-Site"] = "same-origin" | |
| headers["Sec-Fetch-Mode"] = "cors" | |
| headers["Sec-Fetch-Dest"] = "empty" | |
| else: | |
| headers["Sec-Fetch-Site"] = "same-origin" | |
| headers["Sec-Fetch-Mode"] = "navigate" | |
| headers["Sec-Fetch-Dest"] = "document" | |
| headers["Sec-Fetch-User"] = "?1" | |
| headers["Accept-Language"] = "en-US,en;q=0.9" | |
| headers["Accept"] = "*/*" | |
| return headers | |
| # βββ HTTP Transport Layer ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GeminiHTTPClient: | |
| """HTTP client with Chrome TLS fingerprint impersonation. | |
| Uses curl_cffi when available for real Chrome TLS fingerprints. | |
| Falls back to urllib.request (less stealthy but functional). | |
| """ | |
| def __init__(self): | |
| self._session = None | |
| if HAS_CURL_CFFI: | |
| target = CONFIG.get("impersonate_target", "chrome124") | |
| self._session = CurlSession( | |
| impersonate=target, | |
| timeout=CONFIG["request_timeout_sec"], | |
| ) | |
| log(f"HTTP transport: curl_cffi (impersonating {target})") | |
| else: | |
| log("HTTP transport: urllib (no TLS fingerprinting - less stealthy)") | |
| def get(self, url: str, headers: dict = None, cookies: dict = None) -> str: | |
| """GET request with Chrome fingerprint. Returns response text.""" | |
| if self._session: | |
| return self._get_curl(url, headers, cookies) | |
| else: | |
| return self._get_urllib(url, headers, cookies) | |
| def _get_curl(self, url: str, headers: dict = None, cookies: dict = None) -> str: | |
| self._session.cookies.clear() | |
| proxy = CONFIG.get("proxy") | |
| proxies = {"http": proxy, "https": proxy} if proxy else None | |
| resp = self._session.get( | |
| url, | |
| headers=dict(headers) if headers else {}, | |
| cookies=cookies or {}, | |
| proxies=proxies, | |
| allow_redirects=True, | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"HTTP {resp.status_code}: {resp.text[:200]}") | |
| return resp.text | |
| def _get_urllib(self, url: str, headers: dict = None, cookies: dict = None) -> str: | |
| all_headers = dict(headers) if headers else {} | |
| if cookies: | |
| cookie_str = "; ".join(f"{k}={v}" for k, v in cookies.items()) | |
| existing = all_headers.get("Cookie", "") | |
| if existing: | |
| all_headers["Cookie"] = existing + "; " + cookie_str | |
| else: | |
| all_headers["Cookie"] = cookie_str | |
| req = urllib.request.Request(url, headers=all_headers, method="GET") | |
| ctx = ssl.create_default_context() | |
| proxy = CONFIG.get("proxy") | |
| if proxy: | |
| opener = urllib.request.build_opener( | |
| urllib.request.ProxyHandler({"http": proxy, "https": proxy}), | |
| urllib.request.HTTPSHandler(context=ctx) | |
| ) | |
| urllib.request.install_opener(opener) | |
| else: | |
| opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx)) | |
| urllib.request.install_opener(opener) | |
| try: | |
| with urllib.request.urlopen(req, timeout=CONFIG["request_timeout_sec"]) as resp: | |
| return resp.read().decode("utf-8") | |
| except urllib.error.HTTPError as e: | |
| raise Exception(f"HTTP {e.code}: {e.read().decode('utf-8')[:200]}") | |
| def post(self, url: str, data: bytes, headers: dict, cookies: dict = None) -> str: | |
| """POST request with Chrome fingerprint. Returns response text.""" | |
| if self._session: | |
| return self._post_curl(url, data, headers, cookies) | |
| else: | |
| return self._post_urllib(url, data, headers, cookies) | |
| def _post_curl(self, url: str, data: bytes, headers: dict, cookies: dict = None) -> str: | |
| """POST via curl_cffi with Chrome TLS impersonation.""" | |
| # Clear internal cookie jar to prevent cross-domain cookie conflicts | |
| # (same fix as upstream: google.com / gemini.google.com / accounts.google.com) | |
| self._session.cookies.clear() | |
| proxy = CONFIG.get("proxy") | |
| proxies = {"http": proxy, "https": proxy} if proxy else None | |
| resp = self._session.post( | |
| url, | |
| data=data, | |
| headers=dict(headers), # curl_cffi needs plain dict | |
| cookies=cookies or {}, | |
| proxies=proxies, | |
| allow_redirects=True, | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"HTTP {resp.status_code}: {resp.text[:200]}") | |
| return resp.text | |
| def _post_urllib(self, url: str, data: bytes, headers: dict, cookies: dict = None) -> str: | |
| """Fallback POST via urllib (no TLS fingerprinting).""" | |
| # Merge cookies into headers | |
| all_headers = dict(headers) | |
| if cookies: | |
| cookie_str = "; ".join(f"{k}={v}" for k, v in cookies.items()) | |
| existing = all_headers.get("Cookie", "") | |
| if existing: | |
| all_headers["Cookie"] = existing + "; " + cookie_str | |
| else: | |
| all_headers["Cookie"] = cookie_str | |
| req = urllib.request.Request(url, data=data, headers=all_headers, method="POST") | |
| ctx = ssl.create_default_context() | |
| proxy = CONFIG.get("proxy") | |
| if proxy: | |
| opener = urllib.request.build_opener( | |
| urllib.request.ProxyHandler({"http": proxy, "https": proxy}), | |
| urllib.request.HTTPSHandler(context=ctx) | |
| ) | |
| resp = opener.open(req, timeout=CONFIG["request_timeout_sec"]) | |
| else: | |
| resp = urllib.request.urlopen(req, context=ctx, timeout=CONFIG["request_timeout_sec"]) | |
| return resp.read().decode("utf-8", errors="replace") | |
| def post_stream(self, url: str, data: bytes, headers: dict, cookies: dict = None): | |
| """POST request that yields streaming chunks.""" | |
| if self._session: | |
| return self._post_curl_stream(url, data, headers, cookies) | |
| else: | |
| return self._post_urllib_stream(url, data, headers, cookies) | |
| def _post_curl_stream(self, url: str, data: bytes, headers: dict, cookies: dict = None): | |
| self._session.cookies.clear() | |
| proxy = CONFIG.get("proxy") | |
| proxies = {"http": proxy, "https": proxy} if proxy else None | |
| resp = self._session.post( | |
| url, | |
| data=data, | |
| headers=dict(headers), | |
| cookies=cookies or {}, | |
| proxies=proxies, | |
| allow_redirects=True, | |
| stream=True | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"HTTP {resp.status_code}") | |
| for line in resp.iter_lines(): | |
| if line: | |
| yield line.decode("utf-8", errors="replace") | |
| def _post_urllib_stream(self, url: str, data: bytes, headers: dict, cookies: dict = None): | |
| all_headers = dict(headers) | |
| if cookies: | |
| cookie_str = "; ".join(f"{k}={v}" for k, v in cookies.items()) | |
| existing = all_headers.get("Cookie", "") | |
| if existing: | |
| all_headers["Cookie"] = existing + "; " + cookie_str | |
| else: | |
| all_headers["Cookie"] = cookie_str | |
| req = urllib.request.Request(url, data=data, headers=all_headers, method="POST") | |
| ctx = ssl.create_default_context() | |
| proxy = CONFIG.get("proxy") | |
| if proxy: | |
| opener = urllib.request.build_opener( | |
| urllib.request.ProxyHandler({"http": proxy, "https": proxy}), | |
| urllib.request.HTTPSHandler(context=ctx) | |
| ) | |
| resp = opener.open(req, timeout=CONFIG["request_timeout_sec"]) | |
| else: | |
| resp = urllib.request.urlopen(req, context=ctx, timeout=CONFIG["request_timeout_sec"]) | |
| for line in resp: | |
| if line: | |
| yield line.decode("utf-8", errors="replace") | |
| def close(self): | |
| if self._session: | |
| try: | |
| self._session.close() | |
| except Exception: | |
| pass | |
| # Global HTTP client (initialized in main) | |
| _http_client: GeminiHTTPClient = None | |
| def get_http_client() -> GeminiHTTPClient: | |
| global _http_client | |
| if _http_client is None: | |
| _http_client = GeminiHTTPClient() | |
| return _http_client | |
| # βββ Gemini Protocol βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def gemini_stream_generate(prompt: str, model_info: dict, stream: bool = False): | |
| """Send prompt to Gemini StreamGenerate with retry. | |
| Uses the x-goog-ext-525001261-jspb header for model selection | |
| (upstream approach) instead of the old integer mode category. | |
| """ | |
| # Build the inner payload array | |
| # The payload structure is from Gemini's batchexecute protocol | |
| inner = [None] * 80 | |
| inner[0] = [prompt, 0, None, None, None, None, 0] | |
| inner[1] = ["en"] | |
| inner[2] = ["", "", "", None, None, None, None, None, None, ""] | |
| inner[6] = [0] | |
| inner[7] = 1 | |
| inner[10] = 1 | |
| inner[11] = 0 | |
| # Think mode: 0 = thinking enabled, 4 = thinking disabled | |
| # Each model carries its own think value | |
| think_mode = model_info.get("think", 4) | |
| inner[17] = [[think_mode]] | |
| inner[18] = 0 | |
| inner[27] = 1 | |
| inner[30] = [4] | |
| inner[41] = [2] | |
| inner[53] = 0 | |
| inner[59] = str(uuid.uuid4()) | |
| inner[61] = [] | |
| inner[68] = 1 | |
| # Model is now set via HTTP header, not payload slot 79 | |
| # inner[79] is left as None | |
| outer = [None, json.dumps(inner)] | |
| body = urllib.parse.urlencode({"f.req": json.dumps(outer)}).encode() | |
| reqid = random.randint(10000, 99999) | |
| url = ( | |
| "https://gemini.google.com/_/BardChatUi/data/" | |
| "assistant.lamda.BardFrontendService/StreamGenerate" | |
| f"?bl={CONFIG['gemini_bl']}&hl=en&_reqid={reqid}&rt=c" | |
| ) | |
| # Build Chrome-like headers | |
| headers = build_chrome_headers( | |
| method="POST", | |
| content_type="application/x-www-form-urlencoded", | |
| ) | |
| # Add model selection headers | |
| model_headers = build_model_headers(model_info) | |
| headers.update(model_headers) | |
| # Load and apply cookie | |
| cookie_str, sapisid = load_cookie() | |
| cookies = {} | |
| if cookie_str: | |
| # Parse cookie string into dict for curl_cffi | |
| for pair in cookie_str.split("; "): | |
| if "=" in pair: | |
| k, v = pair.split("=", 1) | |
| cookies[k.strip()] = v.strip() | |
| # Also set as header for urllib fallback | |
| headers["Cookie"] = cookie_str | |
| if sapisid: | |
| headers["Authorization"] = make_sapisidhash(sapisid) | |
| client = get_http_client() | |
| last_err = None | |
| for attempt in range(CONFIG["retry_attempts"]): | |
| try: | |
| # Apply request jitter to mimic human behavior | |
| if attempt > 0: | |
| time.sleep(CONFIG["retry_delay_sec"]) | |
| apply_jitter() | |
| if stream: | |
| return client.post_stream(url, data=body, headers=headers, cookies=cookies) | |
| else: | |
| return client.post(url, data=body, headers=headers, cookies=cookies) | |
| except Exception as e: | |
| last_err = e | |
| if attempt < CONFIG["retry_attempts"] - 1: | |
| log(f"Retry {attempt+1}/{CONFIG['retry_attempts']}: {e}") | |
| raise last_err | |
| def clean_gemini_text(text: str) -> str: | |
| """Remove internal code execution artifacts and image placeholders.""" | |
| # Convert code execution blocks to standard markdown | |
| text = re.sub( | |
| r'\?code_(?:reference|stdout)&code_event_index=\d+', | |
| '', text | |
| ) | |
| # Remove googleusercontent placeholder URLs (image gen/retrieval/collection) | |
| text = re.sub( | |
| r'https?://googleusercontent\.com/(?:image_generation_content|image_retrieval|image_collection)[/\w]*\d*', | |
| '', text | |
| ) | |
| return text | |
| def _scan_complete_wrb_frames(buf: str) -> list: | |
| """Extract complete wrb.fr frames using bracket-depth scanning. | |
| This is the upstream's improved parser that correctly handles | |
| partial chunks and escape sequences, replacing the old line-by-line approach. | |
| """ | |
| frames = [] | |
| i = 0 | |
| n = len(buf) | |
| while i < n: | |
| start = buf.find('["wrb.fr"', i) | |
| if start == -1: | |
| break | |
| # Bracket-depth scan to find matching close bracket | |
| depth = 0 | |
| in_str = False | |
| esc = False | |
| end = -1 | |
| j = start | |
| while j < n: | |
| c = buf[j] | |
| if in_str: | |
| if esc: | |
| esc = False | |
| elif c == '\\': | |
| esc = True | |
| elif c == '"': | |
| in_str = False | |
| else: | |
| if c == '"': | |
| in_str = True | |
| elif c == '[': | |
| depth += 1 | |
| elif c == ']': | |
| depth -= 1 | |
| if depth == 0: | |
| end = j | |
| break | |
| j += 1 | |
| if end == -1: | |
| break # Incomplete frame | |
| elem_str = buf[start:end + 1] | |
| try: | |
| elem = json.loads(elem_str) | |
| frames.append(elem) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| i = end + 1 | |
| return frames | |
| def gemini_stream_parse(stream_generator, model_info: dict = None): | |
| """Consume network chunks, parse wrb.fr frames, and yield text deltas incrementally.""" | |
| buf = "" | |
| emitted_raw = "" | |
| first_chunk = True | |
| for chunk in stream_generator: | |
| if not chunk: continue | |
| buf += chunk | |
| frames = _scan_complete_wrb_frames(buf) | |
| if not frames: continue | |
| # In stream context, the parser needs to extract the latest text | |
| texts = [] | |
| for elem in frames: | |
| try: | |
| if not isinstance(elem, list) or len(elem) < 3 or elem[0] != "wrb.fr": | |
| continue | |
| rp = elem[2] | |
| if not isinstance(rp, str) or len(rp) < 50: | |
| continue | |
| payload = json.loads(rp) | |
| if isinstance(payload, list) and len(payload) > 4 and payload[4]: | |
| for part in payload[4]: | |
| if isinstance(part, list) and len(part) > 1 and part[1]: | |
| if isinstance(part[1], list): | |
| for t in part[1]: | |
| if isinstance(t, str) and len(t) > 0: | |
| texts.append(t) | |
| except (json.JSONDecodeError, IndexError, TypeError): | |
| pass | |
| current_full_text = "" | |
| for t in reversed(texts): | |
| if t.strip(): | |
| current_full_text = t | |
| break | |
| if current_full_text == emitted_raw: | |
| continue | |
| if current_full_text.startswith(emitted_raw): | |
| raw_delta = current_full_text[len(emitted_raw):] | |
| emitted_raw = current_full_text | |
| if raw_delta: | |
| cleaned_delta = clean_gemini_text(raw_delta) | |
| first_chunk = False | |
| if cleaned_delta: | |
| yield cleaned_delta | |
| def extract_response_text(raw: str, model_info: dict = None) -> str: | |
| """Parse StreamGenerate response to extract final text. (Backwards compatible)""" | |
| gen = gemini_stream_parse([raw], model_info) | |
| return "".join(list(gen)) | |
| # βββ OpenAI Format Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def messages_to_prompt(messages: list, tools: list = None) -> str: | |
| """Convert OpenAI messages to prompt string.""" | |
| parts = [] | |
| if tools: | |
| tool_defs = [] | |
| for tool in tools: | |
| fn = tool.get("function", tool) if tool.get("type") == "function" else tool | |
| tool_defs.append({ | |
| "name": fn.get("name", tool.get("name", "")), | |
| "description": fn.get("description", tool.get("description", "")), | |
| "parameters": fn.get("parameters", tool.get("parameters", {})), | |
| }) | |
| if tool_defs: | |
| parts.append( | |
| "[System instruction]: You have access to tools. " | |
| "To call a tool, respond with:\n" | |
| '```tool_call\n{"name": "func_name", "arguments": {...}}\n```\n' | |
| "Only use tool_call blocks when needed.\n\n" | |
| f"Available tools:\n{json.dumps(tool_defs, indent=2)}" | |
| ) | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| content = " ".join( | |
| c.get("text", "") for c in content | |
| if c.get("type") in ("text", "input_text") | |
| ) | |
| if role == "system": | |
| parts.append(f"[System instruction]: {content}") | |
| elif role == "assistant": | |
| if msg.get("tool_calls"): | |
| tc_strs = [] | |
| for tc in msg["tool_calls"]: | |
| fn = tc.get("function", {}) | |
| tc_strs.append( | |
| f'```tool_call\n{{"name": "{fn.get("name")}", ' | |
| f'"arguments": {fn.get("arguments", "{}}")}}}\n```' | |
| ) | |
| parts.append(f"[Assistant]: {content or ''}\n" + "\n".join(tc_strs)) | |
| else: | |
| parts.append(f"[Assistant]: {content}") | |
| elif role == "tool": | |
| parts.append(f"[Tool result for {msg.get('name', '')}]: {content}") | |
| else: | |
| parts.append(content if content else "") | |
| return "\n\n".join(p for p in parts if p) | |
| def parse_tool_calls(text: str) -> tuple: | |
| """Extract tool_call blocks. Returns (clean_text, tool_calls_list).""" | |
| tool_calls = [] | |
| pattern = r'```tool_call\s*\n(.*?)\n```' | |
| for match in re.findall(pattern, text, re.DOTALL): | |
| try: | |
| data = json.loads(match.strip()) | |
| tool_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:8]}", | |
| "type": "function", | |
| "function": { | |
| "name": data["name"], | |
| "arguments": json.dumps(data.get("arguments", {}), ensure_ascii=False), | |
| }, | |
| }) | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| clean = re.sub(pattern, '', text, flags=re.DOTALL).strip() | |
| return clean, tool_calls | |
| # βββ HTTP Handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GeminiHandler(BaseHTTPRequestHandler): | |
| def log_message(self, fmt, *args): | |
| log(fmt % args) | |
| def validate_api_key(self) -> bool: | |
| """Validate the API key from Authorization header against configured key. | |
| Returns True if valid or if no API key is configured (open access).""" | |
| configured_key = CONFIG.get("api_key") | |
| if not configured_key: | |
| return True # No key configured = open access | |
| auth_header = self.headers.get("Authorization", "") | |
| if auth_header.startswith("Bearer "): | |
| provided_key = auth_header[7:].strip() | |
| else: | |
| provided_key = auth_header.strip() | |
| if provided_key == configured_key: | |
| return True | |
| log(f"API key rejected from {self.client_address[0]}") | |
| self.send_json( | |
| {"error": {"message": "Invalid API key. Provide a valid key via 'Authorization: Bearer <key>' header.", | |
| "type": "authentication_error", "code": "invalid_api_key"}}, 401) | |
| return False | |
| def send_json(self, data, status=200): | |
| body = json.dumps(data, ensure_ascii=False).encode() | |
| self.send_response(status) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| def do_OPTIONS(self): | |
| self.send_response(204) | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "*") | |
| self.end_headers() | |
| def do_GET(self): | |
| try: | |
| if self.path == "/v1/models": | |
| if not self.validate_api_key(): | |
| return | |
| self.send_json({"object": "list", "data": [ | |
| {"id": n, "object": "model", "created": 1700000000, | |
| "owned_by": "google", "description": c["desc"]} | |
| for n, c in EXPOSED_MODELS.items() | |
| ]}) | |
| elif self.path == "/": | |
| self.send_json({ | |
| "status": "ok", | |
| "version": __version__, | |
| "transport": "curl_cffi" if HAS_CURL_CFFI else "urllib", | |
| "models": list(EXPOSED_MODELS.keys()), | |
| "aliases": list(MODEL_ALIASES.keys()), | |
| }) | |
| else: | |
| self.send_json({"error": "not found"}, 404) | |
| except (BrokenPipeError, ConnectionResetError): | |
| pass | |
| except Exception as e: | |
| log(f"GET error: {e}") | |
| def do_POST(self): | |
| try: | |
| if not self.validate_api_key(): | |
| return | |
| if self.headers.get("Transfer-Encoding", "").lower() == "chunked": | |
| body = b"" | |
| while True: | |
| line = self.rfile.readline().strip() | |
| if not line: | |
| break | |
| chunk_size = int(line, 16) | |
| if chunk_size == 0: | |
| self.rfile.readline() # Read trailing \r\n | |
| break | |
| body += self.rfile.read(chunk_size) | |
| self.rfile.readline() # Read trailing \r\n | |
| else: | |
| length = int(self.headers.get("Content-Length", 0)) | |
| body = self.rfile.read(length) if length else b"" | |
| if self.path == "/v1/chat/completions": | |
| self.handle_chat(body) | |
| elif self.path == "/v1/responses": | |
| self.handle_responses(body) | |
| else: | |
| self.send_json({"error": "not found"}, 404) | |
| except (BrokenPipeError, ConnectionResetError): | |
| pass | |
| except Exception as e: | |
| log(f"POST error: {e}") | |
| try: | |
| self.send_json({"error": {"message": str(e)}}, 500) | |
| except: | |
| pass | |
| def _resolve_model(self, model_name): | |
| pub_name, internal_name, model_info, err = resolve_model(model_name) | |
| if err: | |
| return None, None, err | |
| return pub_name, model_info, None | |
| def _call_gemini(self, prompt, model_info, tools, stream=False): | |
| raw = gemini_stream_generate(prompt, model_info, stream=stream) | |
| if stream: | |
| return gemini_stream_parse(raw, model_info) | |
| else: | |
| text = extract_response_text(raw, model_info) | |
| tool_calls = None | |
| if tools and text: | |
| text, tool_calls = parse_tool_calls(text) | |
| return text or "", tool_calls | |
| def handle_chat(self, body: bytes): | |
| try: | |
| req = json.loads(body) | |
| if CONFIG.get("debug_mode"): | |
| log(f"DEBUG [CHAT] REQUEST: {json.dumps(req, ensure_ascii=False)[:2000]}") | |
| except json.JSONDecodeError as e: | |
| self.send_json({"error": {"message": f"Invalid JSON payload: {e}. Body received: {body.decode('utf-8', errors='replace')}"}}, 400) | |
| return | |
| model_name, model_info, err = self._resolve_model( | |
| req.get("model", CONFIG["default_model"])) | |
| if err: | |
| self.send_json({"error": {"message": err}}, 400) | |
| return | |
| tools = req.get("tools") | |
| if CONFIG.get("debug_mode"): | |
| think_status = "Enabled" if model_info.get("think") == 0 else "Disabled" | |
| log(f"DEBUG [CHAT] MODEL: {model_name} (Think Mode: {think_status})") | |
| if tools: | |
| log(f"DEBUG [CHAT] TOOLS PROVIDED: {len(tools)} tools") | |
| prompt = messages_to_prompt(req.get("messages", []), tools) | |
| if not prompt.strip(): | |
| self.send_json({"error": {"message": "empty prompt"}}, 400) | |
| return | |
| is_stream = bool(req.get("stream")) | |
| try: | |
| # If tools are provided, we must collect full text first to parse them, so disable network streaming | |
| if tools: | |
| text, tool_calls = self._call_gemini(prompt, model_info, tools, stream=False) | |
| if CONFIG.get("debug_mode"): | |
| log(f"DEBUG [CHAT] RESPONSE TEXT: {text}") | |
| log(f"DEBUG [CHAT] RESPONSE TOOLS: {tool_calls}") | |
| else: | |
| result = self._call_gemini(prompt, model_info, tools, stream=is_stream) | |
| if not is_stream and CONFIG.get("debug_mode"): | |
| log(f"DEBUG [CHAT] RESPONSE: {result}") | |
| except Exception as e: | |
| self.send_json({"error": {"message": f"upstream error: {e}"}}, 502) | |
| return | |
| cid = f"chatcmpl-{uuid.uuid4().hex[:12]}" | |
| if is_stream: | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/event-stream") | |
| self.send_header("Cache-Control", "no-cache") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| if tools: | |
| # Tools were present, so we ran synchronously. We yield the tool calls in streaming format. | |
| if tool_calls: | |
| for tc in tool_calls: | |
| chunk = {"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), | |
| "model": model_name, "choices": [{"index": 0, "delta": {"tool_calls": [tc]}}]} | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| chunk = {"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), | |
| "model": model_name, "choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}]} | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| else: | |
| msg = {"role": "assistant", "content": text or ""} | |
| chunk = {"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), | |
| "model": model_name, "choices": [{"index": 0, "delta": msg, "finish_reason": "stop"}]} | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| else: | |
| # Real streaming | |
| first = True | |
| for delta in result: | |
| if CONFIG.get("debug_mode") and delta: | |
| log(f"DEBUG [CHAT] CHUNK: {delta}") | |
| msg = {"role": "assistant"} if first else {} | |
| if delta: msg["content"] = delta | |
| first = False | |
| chunk = {"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), | |
| "model": model_name, "choices": [{"index": 0, "delta": msg, "finish_reason": None}]} | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| self.wfile.flush() | |
| chunk = {"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), | |
| "model": model_name, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]} | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| self.wfile.write(b"data: [DONE]\n\n") | |
| self.wfile.flush() | |
| else: | |
| if tools: | |
| msg = {"role": "assistant", "content": text or None} | |
| if tool_calls: | |
| msg["tool_calls"] = tool_calls | |
| finish = "tool_calls" if tool_calls else "stop" | |
| else: | |
| msg = {"role": "assistant", "content": result or None} | |
| finish = "stop" | |
| self.send_json({ | |
| "id": cid, "object": "chat.completion", "created": int(time.time()), | |
| "model": model_name, | |
| "choices": [{"index": 0, "message": msg, "finish_reason": finish}], | |
| "usage": {"prompt_tokens": len(prompt)//4, "completion_tokens": 0, | |
| "total_tokens": len(prompt)//4}, | |
| }) | |
| def handle_responses(self, body: bytes): | |
| """OpenAI Responses API for Codex CLI compatibility.""" | |
| try: | |
| req = json.loads(body) | |
| if CONFIG.get("debug_mode"): | |
| log(f"DEBUG [RESP] REQUEST: {json.dumps(req, ensure_ascii=False)[:2000]}") | |
| except json.JSONDecodeError as e: | |
| self.send_json({"error": {"message": f"Invalid JSON payload: {e}. Body received: {body.decode('utf-8', errors='replace')}"}}, 400) | |
| return | |
| model_name, model_info, err = self._resolve_model( | |
| req.get("model", CONFIG["default_model"])) | |
| if err: | |
| self.send_json({"error": {"message": err}}, 400) | |
| return | |
| input_items = req.get("input", []) | |
| tools = req.get("tools") | |
| if CONFIG.get("debug_mode"): | |
| think_status = "Enabled" if model_info.get("think") == 0 else "Disabled" | |
| log(f"DEBUG [RESP] MODEL: {model_name} (Think Mode: {think_status})") | |
| if tools: | |
| log(f"DEBUG [RESP] TOOLS PROVIDED: {len(tools)} tools") | |
| messages = [] | |
| if req.get("instructions"): | |
| messages.append({"role": "system", "content": req["instructions"]}) | |
| if isinstance(input_items, str): | |
| messages.append({"role": "user", "content": input_items}) | |
| elif isinstance(input_items, list): | |
| for item in input_items: | |
| if isinstance(item, str): | |
| messages.append({"role": "user", "content": item}) | |
| elif isinstance(item, dict): | |
| if item.get("type") == "function_call_output": | |
| messages.append({"role": "tool", "tool_call_id": item.get("call_id", ""), | |
| "name": item.get("name", ""), "content": item.get("output", "")}) | |
| elif item.get("role") == "assistant" or (item.get("type") == "message" and item.get("role") == "assistant"): | |
| cp = item.get("content", []) | |
| text_acc, tc_list = "", [] | |
| if isinstance(cp, list): | |
| for c in cp: | |
| if isinstance(c, dict): | |
| if c.get("type") == "output_text": text_acc += c.get("text", "") | |
| elif c.get("type") == "function_call": tc_list.append(c) | |
| elif isinstance(cp, str): | |
| text_acc = cp | |
| m = {"role": "assistant", "content": text_acc or None} | |
| if tc_list: | |
| m["tool_calls"] = [{"id": tc.get("call_id", f"call_{i}"), "type": "function", | |
| "function": {"name": tc.get("name",""), "arguments": tc.get("arguments","{}")}} | |
| for i, tc in enumerate(tc_list)] | |
| messages.append(m) | |
| else: | |
| role = item.get("role", "user") | |
| content = item.get("content", "") | |
| if isinstance(content, list): | |
| content = " ".join(c.get("text", "") for c in content if c.get("type") in ("text", "input_text")) | |
| messages.append({"role": role, "content": content}) | |
| if tools: | |
| tools = [{"type": "function", "function": {"name": t["name"], "description": t.get("description", ""), "parameters": t.get("parameters", {})}} | |
| if t.get("type") == "function" and "function" not in t else t for t in tools] | |
| prompt = messages_to_prompt(messages, tools) | |
| if not prompt.strip(): | |
| self.send_json({"error": {"message": "empty input"}}, 400) | |
| return | |
| try: | |
| text, tool_calls = self._call_gemini(prompt, model_info, tools) | |
| if CONFIG.get("debug_mode"): | |
| log(f"DEBUG [RESP] RESPONSE TEXT: {text}") | |
| log(f"DEBUG [RESP] RESPONSE TOOLS: {tool_calls}") | |
| except Exception as e: | |
| self.send_json({"error": {"message": f"upstream error: {e}"}}, 502) | |
| return | |
| rid = f"resp_{uuid.uuid4().hex[:16]}" | |
| mid = f"msg_{uuid.uuid4().hex[:12]}" | |
| output = [] | |
| if tool_calls: | |
| for tc in tool_calls: | |
| output.append({"type": "function_call", "id": tc["id"], "call_id": tc["id"], | |
| "name": tc["function"]["name"], "arguments": tc["function"]["arguments"], "status": "completed"}) | |
| if text or not tool_calls: | |
| output.append({"type": "message", "id": mid, "role": "assistant", "status": "completed", | |
| "content": [{"type": "output_text", "text": text or "", "annotations": []}]}) | |
| if req.get("stream"): | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/event-stream") | |
| self.send_header("Cache-Control", "no-cache") | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.end_headers() | |
| ev = {"type": "response.created", "response": {"id": rid, "object": "response", "status": "in_progress", "model": model_name, "output": []}} | |
| self.wfile.write(f"event: response.created\ndata: {json.dumps(ev)}\n\n".encode()) | |
| for item in output: | |
| if item["type"] == "function_call": | |
| ev = {"type": "response.function_call_arguments.done", "item_id": item["id"], "call_id": item["call_id"], "name": item["name"], "arguments": item["arguments"]} | |
| self.wfile.write(f"event: response.function_call_arguments.done\ndata: {json.dumps(ev)}\n\n".encode()) | |
| elif item["type"] == "message": | |
| for ci, cp in enumerate(item["content"]): | |
| ev = {"type": "response.output_text.done", "item_id": item["id"], "content_index": ci, "text": cp["text"]} | |
| self.wfile.write(f"event: response.output_text.done\ndata: {json.dumps(ev)}\n\n".encode()) | |
| resp_obj = {"id": rid, "object": "response", "status": "completed", "model": model_name, "output": output, | |
| "usage": {"input_tokens": len(prompt)//4, "output_tokens": len(text)//4, "total_tokens": (len(prompt)+len(text))//4}} | |
| self.wfile.write(f"event: response.completed\ndata: {json.dumps({'type': 'response.completed', 'response': resp_obj})}\n\n".encode()) | |
| self.wfile.flush() | |
| else: | |
| self.send_json({"id": rid, "object": "response", "created_at": int(time.time()), "status": "completed", | |
| "model": model_name, "output": output, | |
| "usage": {"input_tokens": len(prompt)//4, "output_tokens": len(text)//4, "total_tokens": (len(prompt)+len(text))//4}}) | |
| # βββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_config(path: str): | |
| if path and os.path.exists(path): | |
| with open(path) as f: | |
| CONFIG.update(json.load(f)) | |
| log(f"Config loaded: {path}") | |
| def update_gemini_bl(): | |
| """Scrape the gemini.google.com homepage to extract the latest gemini_bl parameter.""" | |
| try: | |
| log("Fetching latest gemini_bl parameter from gemini.google.com...") | |
| client = get_http_client() | |
| headers = build_chrome_headers(method="GET") | |
| html = client.get("https://gemini.google.com/app", headers=headers) | |
| # Look for the bl string in the HTML (usually under cfb2h or SNlM0e) | |
| match = re.search(r'"cfb2h":"([^"]+)"', html) | |
| if not match: | |
| match = re.search(r'"SNlM0e":"([^"]+)"', html) | |
| if match: | |
| CONFIG["gemini_bl"] = match.group(1) | |
| log(f"Successfully updated gemini_bl to: {CONFIG['gemini_bl']}") | |
| else: | |
| log("Warning: Could not extract gemini_bl from homepage. Using fallback.") | |
| except Exception as e: | |
| log(f"Error fetching gemini_bl: {e}. Using fallback.") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Gemini Web to OpenAI API") | |
| parser.add_argument("--port", type=int, default=None) | |
| parser.add_argument("--config", type=str, default=None) | |
| parser.add_argument("--cookie-file", type=str, default=None, help="Path to cookie file") | |
| parser.add_argument("--proxy", type=str, default=None, help="HTTP proxy, e.g. http://127.0.0.1:7890") | |
| parser.add_argument("--debug", action="store_true", help="Enable debug logging of requests/responses") | |
| parser.add_argument("--version", action="version", version=f"gemini-web2api {__version__}") | |
| args = parser.parse_args() | |
| config_path = args.config or os.environ.get("GEMINI_WEB2API_CONFIG") | |
| if not config_path: | |
| for p in ["./config.json", os.path.expanduser("~/.config/gemini-web2api/config.json")]: | |
| if os.path.exists(p): | |
| config_path = p | |
| break | |
| load_config(config_path) | |
| if args.port: | |
| CONFIG["port"] = args.port | |
| if args.cookie_file: | |
| CONFIG["cookie_file"] = args.cookie_file | |
| if args.proxy: | |
| CONFIG["proxy"] = args.proxy | |
| if args.debug: | |
| CONFIG["debug_mode"] = True | |
| # Initialize HTTP client | |
| get_http_client() | |
| update_gemini_bl() | |
| class ThreadedServer(ThreadingMixIn, HTTPServer): | |
| daemon_threads = True | |
| allow_reuse_address = True | |
| port = CONFIG["port"] | |
| server = ThreadedServer((CONFIG["host"], port), GeminiHandler) | |
| print(f"gemini-web2api v{__version__}") | |
| print(f" Listening: http://0.0.0.0:{port}") | |
| print(f" Base URL: http://localhost:{port}/v1") | |
| print(f" Transport: {'curl_cffi (Chrome TLS fingerprint)' if HAS_CURL_CFFI else 'urllib (no fingerprint - install curl_cffi for stealth)'}") | |
| print(f" Models: {', '.join(EXPOSED_MODELS.keys())}") | |
| print(f" Aliases: {len(MODEL_ALIASES)} legacy names supported") | |
| print(f" API Key: {'configured (set via API_KEY env)' if CONFIG.get('api_key') else 'none (open access)'}") | |
| print(f" Cookie: {'yes (' + CONFIG['cookie_file'] + ')' if CONFIG.get('cookie_file') else 'none (anonymous)'}") | |
| print(f" Proxy: {CONFIG.get('proxy') or 'none (uses system env HTTP_PROXY/HTTPS_PROXY)'}") | |
| print(f" Retry: {CONFIG['retry_attempts']}x / {CONFIG['retry_delay_sec']}s") | |
| print(f" Jitter: {CONFIG['jitter_min_ms']}-{CONFIG['jitter_max_ms']}ms") | |
| print(f" Debug: {'enabled' if CONFIG.get('debug_mode') else 'disabled'}") | |
| print() | |
| try: | |
| server.serve_forever() | |
| except KeyboardInterrupt: | |
| print("\nStopped.") | |
| get_http_client().close() | |
| server.server_close() | |
| if __name__ == "__main__": | |
| main() | |