File size: 6,026 Bytes
e572557 c8fd960 e572557 c8fd960 e572557 f08a674 e572557 f08a674 e572557 f08a674 e572557 c8fd960 f08a674 c8fd960 f08a674 1e42396 f08a674 0bdfb7a 1e42396 f08a674 1e42396 e572557 c8fd960 e572557 c8fd960 f08a674 e572557 1e42396 e572557 1e42396 e572557 f08a674 e572557 1e42396 e572557 f08a674 e572557 f08a674 e572557 1e42396 e572557 1e42396 f08a674 1e42396 f08a674 e572557 1e42396 e572557 f08a674 e572557 1e42396 e572557 39bacc7 e572557 39bacc7 f08a674 e572557 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import requests
import json
import uuid
import time
import jwt
from flask import Flask, request, jsonify, Response
# 1. 初始化Flask应用
app = Flask(__name__)
app.config['SECRET_KEY'] = str(uuid.uuid4()) # 随机生成一个secret key用于JWT
# 2. gpt-oss.com API的固定配置 (移除了Cookie)
GPT_OSS_API_URL = "https://api.gpt-oss.com/chatkit"
BASE_GPT_OSS_HEADERS = {
'authority': 'api.gpt-oss.com',
'accept': 'text/event-stream',
'content-type': 'application/json',
'origin': 'https://gpt-oss.com',
'referer': 'https://gpt-oss.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'x-selected-model': 'gpt-oss-120b',
}
# 3. 根路由,包含新的使用说明
@app.route('/', methods=['GET'])
def root():
# 生成示例user_id和session
example_user_id = str(uuid.uuid4())
example_session = jwt.encode({'user_id': example_user_id}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({
"message": "欢迎使用 GPT-OSS to OpenAI 格式代理API",
"status": "ok",
"authentication_method": "无需认证,系统会自动生成用户ID和会话令牌",
"example_user_id": example_user_id,
"example_session_token": example_session
})
# 辅助函数:创建标准的OpenAI流式数据块
def create_openai_chunk(content, model="gpt-oss-120b"):
return {
"id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion.chunk",
"created": int(time.time()), "model": model,
"choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}]
}
# 4. 核心API端点 (移除了认证逻辑)
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions_proxy():
# 自动生成user_id和session
user_id = str(uuid.uuid4())
session_token = jwt.encode({'user_id': user_id}, app.config['SECRET_KEY'], algorithm='HS256')
# --- 动态构建本次请求的Headers ---
request_headers = BASE_GPT_OSS_HEADERS.copy()
request_headers['cookie'] = f"user_id={user_id}; session={session_token}"
try:
openai_request_data = request.json
user_prompt = next((m['content'] for m in reversed(openai_request_data.get("messages", [])) if m.get('role') == 'user'), None)
if not user_prompt: return jsonify({"error": "未找到用户消息。"}), 400
stream_requested = openai_request_data.get("stream", False)
except Exception as e:
return jsonify({"error": f"请求格式无效: {e}"}), 400
request_headers['x-show-reasoning'] = 'true' if stream_requested else 'false'
gpt_oss_payload = {
"op": "threads.create",
"params": {"input": {"text": user_prompt, "content": [{"type": "input_text", "text": user_prompt}]}}
}
def _internal_proxy_stream():
try:
with requests.post(
GPT_OSS_API_URL, headers=request_headers,
json=gpt_oss_payload, stream=True, timeout=120
) as response:
response.raise_for_status()
for line in response.iter_lines():
if line and line.decode('utf-8').startswith('data: '):
try:
yield json.loads(line.decode('utf-8')[6:])
except json.JSONDecodeError: continue
except requests.exceptions.RequestException as e:
raise IOError(f"与后端服务通信失败: {e}")
# 流式和非流式处理逻辑保持不变
if stream_requested:
def stream_formatter():
thinking_buffer = []
thinking_block_sent = False
try:
for gpt_oss_data in _internal_proxy_stream():
event_type = gpt_oss_data.get('type')
update_type = gpt_oss_data.get('update', {}).get('type')
if event_type == 'thread.item_updated' and update_type == 'cot.entry_added':
thinking_buffer.append(f"- {gpt_oss_data['update']['entry']['content']}")
continue
if event_type == 'thread.item_updated' and update_type == 'assistant_message.content_part.text_delta':
if not thinking_block_sent and thinking_buffer:
all_thoughts = "\n".join(thinking_buffer)
formatted_block = f"```markdown\n[思考过程]\n{all_thoughts}\n```\n\n"
yield f"data: {json.dumps(create_openai_chunk(formatted_block))}\n\n"
thinking_block_sent = True
yield f"data: {json.dumps(create_openai_chunk(gpt_oss_data['update'].get('delta', '')))}\n\n"
yield "data: [DONE]\n\n"
except IOError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(stream_formatter(), mimetype='text/event-stream')
else:
try:
full_response_content = ""
for gpt_oss_data in _internal_proxy_stream():
if gpt_oss_data.get('type') == 'thread.item_updated' and gpt_oss_data.get('update', {}).get('type') == 'assistant_message.content_part.text_delta':
full_response_content += gpt_oss_data['update'].get('delta', '')
final_response = {
"id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion", "created": int(time.time()),
"model": "gpt-oss-120b",
"choices": [{"index": 0, "message": {"role": "assistant", "content": full_response_content.strip()}, "finish_reason": "stop"}],
"usage": {}
}
return jsonify(final_response)
except IOError as e:
return jsonify({"error": str(e)}), 500
# 5. 启动应用
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860) |