test / app.py
bunnybun07's picture
Create app.py
f9a1ce9 verified
from flask import Flask, request, Response, jsonify, stream_with_context
import requests
import json
import uuid
import time
from datetime import datetime
ORIGINAL_API_URL = "https://app.unlimitedai.chat/api/chat"
app = Flask(__name__)
@app.route('/v1/models', methods=['GET'])
def list_models():
# 你可以根据实际情况自定义模型列表
models = [
{
"id": "chat-model-reasoning",
"object": "model",
"created": 1713235200,
"owned_by": "organization-owner",
"permission": [],
"root": "chat-model-reasoning",
"parent": None
}
]
return jsonify({"object": "list", "data": models})
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
data = request.json
is_stream = data.get('stream', False)
messages = data.get('messages', [])
original_messages = []
for msg in messages:
original_msg = {
"id": str(uuid.uuid4()),
"createdAt": datetime.utcnow().isoformat() + "Z",
"role": msg["role"],
"content": msg["content"],
"parts": [
{
"type": "text",
"text": msg["content"]
}
]
}
original_messages.append(original_msg)
original_request = {
"id": str(uuid.uuid4()),
"messages": original_messages,
"selectedChatModel": "chat-model-reasoning"
}
headers = {'Content-Type': 'application/json'}
if is_stream:
return stream_response(original_request, headers, data)
else:
return non_stream_response(original_request, headers, data)
def stream_response(original_request, headers, openai_request):
def generate():
response = requests.post(
ORIGINAL_API_URL,
headers=headers,
json=original_request,
stream=True
)
# 用于存储推理和回复内容
reasoning_content = ""
reply_content = ""
message_id = None
for line in response.iter_lines():
if not line:
continue
line_str = line.decode('utf-8')
# 解析不同类型的响应行
if line_str.startswith('f:'):
# 消息 ID
message_data = json.loads(line_str[2:])
message_id = message_data.get("messageId")
# 发送 OpenAI 兼容的流式开始标记
start_chunk = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": openai_request.get("model", "gpt-3.5-turbo"),
"choices": [
{
"index": 0,
"delta": {"role": "assistant"},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(start_chunk)}\n\n"
elif line_str.startswith('g:'):
# 推理部分,在 OpenAI 格式中不直接显示,但我们可以收集它
reasoning_part = line_str[2:].strip('"').replace("\\n", "\n")
reasoning_content += reasoning_part
content_chunk = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": openai_request.get("model", "gpt-3.5-turbo"),
"choices": [
{
"index": 0,
"delta": {"reasoning_content": reasoning_part},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(content_chunk)}\n\n"
elif line_str.startswith('0:'):
# 回复部分,这是我们需要流式传输的主要内容
reply_part = line_str[2:].strip('"').replace("\\n", "\n")
reply_content += reply_part
# 发送 OpenAI 兼容的内容块
content_chunk = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": openai_request.get("model", "gpt-3.5-turbo"),
"choices": [
{
"index": 0,
"delta": {"content": reply_part},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(content_chunk)}\n\n"
elif line_str.startswith('e:') or line_str.startswith('d:'):
# 结束标记
finish_data = json.loads(line_str[2:])
finish_reason = finish_data.get("finishReason", "stop")
# 发送 OpenAI 兼容的结束块
end_chunk = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": openai_request.get("model", "gpt-3.5-turbo"),
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": finish_reason
}
]
}
yield f"data: {json.dumps(end_chunk)}\n\n"
yield "data: [DONE]\n\n"
break
return Response(
stream_with_context(generate()),
content_type='text/event-stream'
)
def non_stream_response(original_request, headers, openai_request):
response = requests.post(
ORIGINAL_API_URL,
headers=headers,
json=original_request,
stream=True
)
# 用于存储推理和回复内容
reasoning_content = ""
reply_content = ""
message_id = None
finish_reason = "stop"
for line in response.iter_lines():
if not line:
continue
line_str = line.decode('utf-8')
# 解析不同类型的响应行
if line_str.startswith('f:'):
# 消息 ID
message_data = json.loads(line_str[2:])
message_id = message_data.get("messageId")
elif line_str.startswith('g:'):
# 推理部分
reasoning_part = line_str[2:].strip('"')
reasoning_content += reasoning_part
elif line_str.startswith('0:'):
# 回复部分
reply_part = line_str[2:].strip('"').replace("\\n", "\n")
reply_content += reply_part
elif line_str.startswith('e:') or line_str.startswith('d:'):
# 结束标记
finish_data = json.loads(line_str[2:])
finish_reason = finish_data.get("finishReason", "stop")
# 构建 OpenAI 兼容的响应
openai_response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": openai_request.get("model", "gpt-3.5-turbo"),
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": reply_content
},
"finish_reason": finish_reason
}
],
"usage": {
"prompt_tokens": 0, # 这里可以根据实际情况设置
"completion_tokens": 0,
"total_tokens": 0
}
}
return jsonify(openai_response)
import os
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860)) # 7860 default untuk Hugging Face
app.run(host='0.0.0.0', port=port)