CheckMat / chatmock /upstream.py
aiqknow's picture
Upload 97 files
35205e8 verified
from __future__ import annotations
import json
import time
from typing import Any, Dict, List, Tuple
from urllib.parse import urlparse, urlunparse
import requests
from flask import Response, current_app, jsonify, make_response
from .config import CHATGPT_RESPONSES_URL
from .http import build_cors_headers
from .model_registry import normalize_model_name
from .session import ensure_session_id
from flask import request as flask_request
from .utils import get_effective_chatgpt_auth
def _log_json(prefix: str, payload: Any) -> None:
try:
print(f"{prefix}\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
except Exception:
try:
print(f"{prefix}\n{payload}")
except Exception:
pass
def start_upstream_request(
model: str,
input_items: List[Dict[str, Any]],
*,
instructions: str | None = None,
tools: List[Dict[str, Any]] | None = None,
tool_choice: Any | None = None,
parallel_tool_calls: bool = False,
reasoning_param: Dict[str, Any] | None = None,
service_tier: str | None = None,
):
access_token, account_id = get_effective_chatgpt_auth()
if not access_token or not account_id:
resp = make_response(
jsonify(
{
"error": {
"message": "Missing ChatGPT credentials. Run 'python3 chatmock.py login' first.",
}
}
),
401,
)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
include: List[str] = []
if isinstance(reasoning_param, dict):
include.append("reasoning.encrypted_content")
client_session_id = None
try:
client_session_id = (
flask_request.headers.get("X-Session-Id")
or flask_request.headers.get("session_id")
or None
)
except Exception:
client_session_id = None
session_id = ensure_session_id(instructions, input_items, client_session_id)
responses_payload = {
"model": model,
"instructions": instructions if isinstance(instructions, str) and instructions.strip() else instructions,
"input": input_items,
"tools": tools or [],
"tool_choice": tool_choice if tool_choice in ("auto", "none") or isinstance(tool_choice, dict) else "auto",
"parallel_tool_calls": bool(parallel_tool_calls),
"store": False,
"stream": True,
"prompt_cache_key": session_id,
}
if include:
responses_payload["include"] = include
if reasoning_param is not None:
responses_payload["reasoning"] = reasoning_param
if isinstance(service_tier, str) and service_tier.strip():
responses_payload["service_tier"] = service_tier.strip().lower()
return start_upstream_raw_request(
responses_payload,
session_id=session_id,
stream=True,
)
def build_upstream_headers(
access_token: str,
account_id: str,
session_id: str,
*,
accept: str = "text/event-stream",
) -> Dict[str, str]:
return {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": accept,
"chatgpt-account-id": account_id,
"OpenAI-Beta": "responses=experimental",
"session_id": session_id,
}
def start_upstream_raw_request(
responses_payload: Dict[str, Any],
*,
session_id: str | None = None,
stream: bool = True,
):
access_token, account_id = get_effective_chatgpt_auth()
if not access_token or not account_id:
resp = make_response(
jsonify(
{
"error": {
"message": "Missing ChatGPT credentials. Run 'python3 chatmock.py login' first.",
}
}
),
401,
)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
effective_session_id = session_id
if not isinstance(effective_session_id, str) or not effective_session_id.strip():
payload_prompt_cache_key = responses_payload.get("prompt_cache_key")
if isinstance(payload_prompt_cache_key, str) and payload_prompt_cache_key.strip():
effective_session_id = payload_prompt_cache_key.strip()
if not isinstance(effective_session_id, str) or not effective_session_id.strip():
effective_session_id = str(int(time.time() * 1000))
verbose = False
try:
verbose = bool(current_app.config.get("VERBOSE"))
except Exception:
verbose = False
if verbose:
_log_json("OUTBOUND >> ChatGPT Responses API payload", responses_payload)
headers = build_upstream_headers(
access_token,
account_id,
effective_session_id,
accept=("text/event-stream" if stream else "application/json"),
)
try:
upstream = requests.post(
CHATGPT_RESPONSES_URL,
headers=headers,
json=responses_payload,
stream=stream,
timeout=600,
)
except requests.RequestException as e:
resp = make_response(jsonify({"error": {"message": f"Upstream ChatGPT request failed: {e}"}}), 502)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
return upstream, None
def build_upstream_websocket_url() -> str:
parsed = urlparse(CHATGPT_RESPONSES_URL)
scheme = parsed.scheme.lower()
if scheme == "https":
parsed = parsed._replace(scheme="wss")
elif scheme == "http":
parsed = parsed._replace(scheme="ws")
return urlunparse(parsed)