oapi / app.py
sehsapneb's picture
Update app.py
f08a674 verified
import requests
import json
import uuid
import time
from flask import Flask, request, jsonify, Response
# 1. 初始化Flask应用
app = Flask(__name__)
# 2. gpt-oss.com API的固定配置 (移除了Cookie)
GPT_OSS_API_URL = "https://api.gpt-oss.com/chatkit"
BASE_GPT_OSS_HEADERS = {
'authority': 'api.gpt-oss.com',
'accept': 'text/event-stream',
'content-type': 'application/json',
'origin': 'https://gpt-oss.com',
'referer': 'https://gpt-oss.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'x-selected-model': 'gpt-oss-120b',
}
# 3. 根路由,包含新的使用说明
@app.route('/', methods=['GET'])
def root():
return jsonify({
"message": "欢迎使用 GPT-OSS to OpenAI 格式代理API",
"status": "ok",
"authentication_method": "使用动态API Key作为Session",
"api_key_format": "在'Authorization: Bearer'后填入 '你的user_id|你的session令牌'"
})
# 辅助函数:创建标准的OpenAI流式数据块
def create_openai_chunk(content, model="gpt-oss-120b"):
return {
"id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion.chunk",
"created": int(time.time()), "model": model,
"choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}]
}
# 4. 核心API端点 (包含新的认证逻辑)
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions_proxy():
# --- 关键改动:解析API Key作为Session ---
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"error": "缺少或格式错误的Authorization Header。请提供 'Bearer 你的组合密钥'。"}), 401
combined_key = auth_header.split('Bearer ')[1]
if '|' not in combined_key:
return jsonify({"error": "API Key格式错误。正确格式为 '你的user_id|你的session令牌'。"}), 401
try:
user_id, session_token = combined_key.split('|', 1)
except ValueError:
return jsonify({"error": "API Key格式解析失败。请确保格式为 'user_id|session_token'。"}), 401
# --- 动态构建本次请求的Headers ---
request_headers = BASE_GPT_OSS_HEADERS.copy()
request_headers['cookie'] = f"user_id={user_id}; session={session_token}"
try:
openai_request_data = request.json
user_prompt = next((m['content'] for m in reversed(openai_request_data.get("messages", [])) if m.get('role') == 'user'), None)
if not user_prompt: return jsonify({"error": "未找到用户消息。"}), 400
stream_requested = openai_request_data.get("stream", False)
except Exception as e:
return jsonify({"error": f"请求格式无效: {e}"}), 400
request_headers['x-show-reasoning'] = 'true' if stream_requested else 'false'
gpt_oss_payload = {
"op": "threads.create",
"params": {"input": {"text": user_prompt, "content": [{"type": "input_text", "text": user_prompt}]}}
}
def _internal_proxy_stream():
try:
with requests.post(
GPT_OSS_API_URL, headers=request_headers,
json=gpt_oss_payload, stream=True, timeout=120
) as response:
response.raise_for_status()
for line in response.iter_lines():
if line and line.decode('utf-8').startswith('data: '):
try:
yield json.loads(line.decode('utf-8')[6:])
except json.JSONDecodeError: continue
except requests.exceptions.RequestException as e:
raise IOError(f"与后端服务通信失败: {e}")
# 流式和非流式处理逻辑保持不变
if stream_requested:
def stream_formatter():
thinking_buffer = []
thinking_block_sent = False
try:
for gpt_oss_data in _internal_proxy_stream():
event_type = gpt_oss_data.get('type')
update_type = gpt_oss_data.get('update', {}).get('type')
if event_type == 'thread.item_updated' and update_type == 'cot.entry_added':
thinking_buffer.append(f"- {gpt_oss_data['update']['entry']['content']}")
continue
if event_type == 'thread.item_updated' and update_type == 'assistant_message.content_part.text_delta':
if not thinking_block_sent and thinking_buffer:
all_thoughts = "\n".join(thinking_buffer)
formatted_block = f"```markdown\n[思考过程]\n{all_thoughts}\n```\n\n"
yield f"data: {json.dumps(create_openai_chunk(formatted_block))}\n\n"
thinking_block_sent = True
yield f"data: {json.dumps(create_openai_chunk(gpt_oss_data['update'].get('delta', '')))}\n\n"
yield "data: [DONE]\n\n"
except IOError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(stream_formatter(), mimetype='text/event-stream')
else:
try:
full_response_content = ""
for gpt_oss_data in _internal_proxy_stream():
if gpt_oss_data.get('type') == 'thread.item_updated' and gpt_oss_data.get('update', {}).get('type') == 'assistant_message.content_part.text_delta':
full_response_content += gpt_oss_data['update'].get('delta', '')
final_response = {
"id": f"chatcmpl-{str(uuid.uuid4())}", "object": "chat.completion", "created": int(time.time()),
"model": "gpt-oss-120b",
"choices": [{"index": 0, "message": {"role": "assistant", "content": full_response_content.strip()}, "finish_reason": "stop"}],
"usage": {}
}
return jsonify(final_response)
except IOError as e:
return jsonify({"error": str(e)}), 500
# 5. 启动应用
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)