driver-union-ds / app.py
kevin1207's picture
Add Dockerfile and configure app port to 8080
fb0e3c4 verified
from flask import Flask, request, Response, jsonify, stream_with_context
import requests
import json
import uuid
import time
import logging
import random
from functools import wraps
app = Flask(__name__)
app.logger.setLevel(logging.INFO)
# 配置日志格式
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
app.logger.addHandler(handler)
def validate_messages(f):
"""消息格式验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
messages = request.json.get('messages', [])
for msg in messages:
if 'role' not in msg or 'content' not in msg:
return jsonify({"error": "Invalid message format"}), 400
if not isinstance(msg['content'], str):
return jsonify({"error": "Content must be string"}), 400
return f(*args, **kwargs)
return decorated_function
def convert_history(messages):
"""将OpenAI格式消息转换为WoCloud格式"""
history = []
current_question = None
for i in range(len(messages) - 1):
try:
if messages[i]['role'] == 'user' and i+1 < len(messages) and messages[i+1]['role'] == 'assistant':
query = messages[i]['content'].strip()
response = messages[i+1]['content'].strip()
# 使用符合WoCloud格式的历史记录
history.append({
"query": query,
"rewriteQuery": query,
"uploadFileUrl": "",
"response": response,
"reasoningContent": "",
"state": "finish",
"key": str(random.random())
})
except (KeyError, IndexError) as e:
app.logger.warning(f"Error processing message: {str(e)}")
continue
app.logger.debug(f"Converted history: {json.dumps(history, ensure_ascii=False)}")
return history
def handle_wo_error(response):
"""统一处理沃云错误响应"""
try:
if response.headers.get('Content-Type', '').startswith('text/event-stream'):
error_line = next((line for line in response.text.split('\n') if line.startswith('data:')), None)
if error_line:
try:
error_data = json.loads(error_line[5:].strip())
return {
"code": error_data.get('code'),
"message": error_data.get('message', 'Unknown error')
}
except json.JSONDecodeError:
return {
"code": "PARSE_ERROR",
"message": f"Invalid JSON in error response: {error_line[5:100]}"
}
else:
try:
error_data = response.json()
return {
"code": error_data.get('code'),
"message": error_data.get('message', error_data.get('response', 'Unknown error'))
}
except json.JSONDecodeError:
return {
"code": "PARSE_ERROR",
"message": f"Invalid JSON in error response: {response.text[:100]}"
}
except Exception as e:
return {
"code": "PARSE_ERROR",
"message": f"Failed to parse error response: {str(e)}"
}
@app.route('/v1/chat/completions', methods=['POST'])
@validate_messages
def chat_completions():
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({"error": "Invalid Authorization header format"}), 401
access_token = auth_header[7:]
openai_request = request.json
stream_mode = openai_request.get('stream', False)
try:
# 提取最后用户消息
user_message = next(
(msg['content'] for msg in reversed(openai_request['messages'])
if msg['role'] == 'user' and msg.get('content')),
None
)
if not user_message:
return jsonify({"error": "No valid user message found"}), 400
# 构建沃云请求
wo_headers = {
'content-type': 'application/json',
'origin': 'https://panservice.mail.wo.cn',
'referer': 'https://panservice.mail.wo.cn/h5/wocloud_ai/?modelType=1',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1',
'x-yp-access-token': access_token,
'x-yp-client-id': '1001000035',
'accept': 'text/event-stream' if stream_mode else 'application/json'
}
# 转换历史记录
history = convert_history(openai_request['messages'][:-1]) # 排除最后一条用户消息,因为它会作为input发送
wo_data = {
"modelId": 1,
"input": user_message,
"history": history
}
app.logger.debug(f"Forwarding request to WoCloud: {json.dumps(wo_data, ensure_ascii=False)}")
if stream_mode:
def generate():
response_id = f"chatcmpl-{uuid.uuid4()}"
created_time = int(time.time())
full_content = ""
try:
resp = requests.post(
'https://panservice.mail.wo.cn/wohome/ai/assistant/query',
headers=wo_headers,
json=wo_data,
stream=True,
timeout=30
)
app.logger.debug(f"WoCloud stream response status: {resp.status_code}")
if resp.status_code != 200:
error_info = handle_wo_error(resp)
app.logger.error(f"WoCloud API error: {error_info}")
yield f"""data: {json.dumps({
'error': error_info
})}\n\n"""
return
for line in resp.iter_lines():
if line:
try:
line_str = line.decode('utf-8')
print(f"Received line: {line_str}") # Debug log
if line_str.startswith('data:'):
data = json.loads(line_str[5:])
# 检查是否有错误代码
if data.get('code') and data['code'] != 0 and data['code'] != '0':
app.logger.error(f"WoCloud stream error: {data}")
yield f"""data: {json.dumps({
'error': {'code': data['code'], 'message': data.get('message', 'Unknown error')}
})}\n\n"""
return
# 获取思考内容和回答内容
content = data.get('response', '')
reasoning = data.get('reasoningContent', '')
# 首次输出role
if not full_content:
yield f"""data: {json.dumps({
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {'role': 'assistant'},
'finish_reason': None
}]
})}\n\n"""
# 处理思考内容
if reasoning:
if not full_content: # 思考开始
response_data = {
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {'content': '<think>\n'},
'finish_reason': None
}]
}
yield f"data: {json.dumps(response_data)}\n\n"
# if not full_content: # 思考开始
# yield f"""data: {json.dumps({
# 'id': response_id,
# 'object': 'chat.completion.chunk',
# 'created': created_time,
# 'model': 'DeepSeek-R1',
# 'choices': [{
# 'index': 0,
# 'delta': {'content': '<think>\n'},
# 'finish_reason': None
# }]
# })}\n\n"""
full_content += reasoning
yield f"""data: {json.dumps({
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {'content': reasoning},
'finish_reason': None
}]
})}\n\n"""
# 处理回答内容
if content:
if full_content: # 如果有思考内容,先结束思考
# yield f"""data: {json.dumps({
# 'id': response_id,
# 'object': 'chat.completion.chunk',
# 'created': created_time,
# 'model': 'DeepSeek-R1',
# 'choices': [{
# 'index': 0,
# 'delta': {'content': '\n</think>\n\n'},
# 'finish_reason': None
# }]
# })}\n\n"""
response_data = {
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {'content': '\n</think>\n\n'},
'finish_reason': None
}]
}
yield f"data: {json.dumps(response_data)}\n\n"
full_content = "" # 重置思考内容
yield f"""data: {json.dumps({
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {'content': content},
'finish_reason': None
}]
})}\n\n"""
if data.get('finish') == 1:
break
except Exception as e:
app.logger.error(f"Stream parsing error: {str(e)}")
except requests.exceptions.RequestException as e:
app.logger.error(f"Request failed: {str(e)}")
yield f"""data: {json.dumps({
'error': {'code': 'CONNECTION_ERROR', 'message': str(e)}
})}\n\n"""
# 结束标记
yield f"""data: {json.dumps({
'id': response_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': 'DeepSeek-R1',
'choices': [{
'index': 0,
'delta': {},
'finish_reason': 'stop'
}]
})}\n\n"""
yield "data: [DONE]\n\n"
return Response(stream_with_context(generate()), content_type='text/event-stream')
else:
try:
resp = requests.post(
'https://panservice.mail.wo.cn/wohome/ai/assistant/query',
headers=wo_headers,
json=wo_data,
timeout=30
)
app.logger.debug(f"WoCloud response status: {resp.status_code}")
app.logger.debug(f"WoCloud response headers: {resp.headers}")
# 如果状态码不是200,处理错误
if resp.status_code != 200:
error_info = handle_wo_error(resp)
app.logger.error(f"WoCloud API error: {error_info}")
return jsonify({"error": error_info}), 502
content = ""
# 解析响应,根据Content-Type处理不同格式
if resp.headers.get('Content-Type', '').startswith('application/json'):
try:
response_data = resp.json()
# 检查是否有错误代码
if response_data.get('code') and response_data['code'] != 0 and response_data['code'] != '0':
app.logger.error(f"WoCloud API error: {response_data}")
return jsonify({
"error": {
"code": response_data['code'],
"message": response_data.get('message', 'Unknown error')
}
}), 502
content = response_data.get('response', '')
reasoning = response_data.get('reasoningContent', '')
# 组合思考内容和回答内容
final_content = ''
if reasoning:
final_content += f"<think>\n{reasoning}\n</think>\n\n"
final_content += content
except json.JSONDecodeError:
app.logger.error(f"Invalid JSON response: {resp.text[:200]}")
return jsonify({
"error": {
"code": "PARSE_ERROR",
"message": "Failed to parse JSON response"
}
}), 502
else:
# 处理流式响应或其他格式
try:
response_data = {"response": "", "reasoningContent": ""}
for line in resp.text.split('\n'):
if line.startswith('data:'):
try:
data = json.loads(line[5:])
if data.get('code') and data['code'] != 0 and data['code'] != '0':
app.logger.error(f"WoCloud API error in stream: {data}")
return jsonify({
"error": {
"code": data['code'],
"message": data.get('message', 'Unknown error')
}
}), 502
response_data['response'] += data.get('response', '')
response_data['reasoningContent'] += data.get('reasoningContent', '')
except json.JSONDecodeError:
app.logger.warning(f"Invalid JSON in stream line: {line[:100]}")
# 组合思考内容和回答内容
final_content = ''
if response_data['reasoningContent']:
final_content += f"<think>\n{response_data['reasoningContent']}\n</think>\n\n"
final_content += response_data['response']
except Exception as e:
app.logger.error(f"Error parsing stream response: {str(e)}")
return jsonify({
"error": {
"code": "PARSE_ERROR",
"message": f"Failed to parse stream response: {str(e)}"
}
}), 502
if not final_content:
app.logger.warning("Empty content in response")
final_content = "抱歉,没有收到有效的回复。"
return jsonify({
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": "DeepSeek-R1",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": final_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": len(user_message),
"completion_tokens": len(final_content),
"total_tokens": len(user_message) + len(final_content)
}
})
except requests.exceptions.RequestException as e:
app.logger.error(f"Request failed: {str(e)}")
return jsonify({
"error": {
"code": "NETWORK_ERROR",
"message": str(e)
}
}), 502
except Exception as e:
app.logger.exception("Unexpected error")
return jsonify({
"error": {
"code": "INTERNAL_ERROR",
"message": f"Internal server error: {str(e)}"
}
}), 500
@app.route('/v1/models', methods=['GET'])
def list_models():
return jsonify({
"object": "list",
"data": [{
"id": "DeepSeek-R1",
"object": "model",
"created": int(time.time()),
"owned_by": "ChinaUnicom",
"capabilities": ["chat", "completions"]
}]
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)