aks2api / app.py
nanoppa's picture
Update app.py
07fa47a verified
raw
history blame
9.25 kB
from flask import Flask, request, jsonify, Response
import requests
import uuid
import json
import time
import os
import re
import logging
from itertools import cycle
# 配置日志输出(可调整级别为DEBUG以获得更详细日志)
logging.basicConfig(level=logging.INFO)
_COOKIES = os.getenv("COOKIES", "")
API_KEY = os.getenv("API_KEY", "linux.do")
app = Flask(__name__)
COOKIES = _COOKIES.split(',')
iterator = cycle(COOKIES)
cookie_index = 0
def get_cookie():
return next(iterator)
@app.before_request
def check_api_key():
key = request.headers.get("Authorization")
if key != "Bearer " + API_KEY:
logging.warning("Unauthorized access attempt with key: %s", key)
return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403
@app.route('/v1/models', methods=['GET'])
def get_models():
logging.info("Received /v1/models request")
_cookie = get_cookie()
logging.info(_cookie[:50])
headers = {"Content-Type": "application/json", "Cookie": _cookie}
response = requests.get('https://chat.akash.network/api/models', headers=headers)
models_data = response.json()
print(models_data)
current_timestamp = int(time.time())
converted_data = {
"object": "list",
"data": [
{
"id": model["id"],
"object": "model",
"created": current_timestamp,
"owned_by": "openai" if "Meta" in model["id"] else "third_party",
"permissions": [],
"root": model["id"],
"parent": None,
"capabilities": {
"temperature": model.get("temperature"),
"top_p": model.get("top_p")
},
"name": model.get("name"),
"description": model.get("description"),
"available": model.get("available")
}
for model in models_data
]
}
logging.info("Response for /v1/models: %s", json.dumps(converted_data, ensure_ascii=False))
return jsonify(converted_data)
def generate_stream(akash_response, chat_id, model):
"""
解析 Akash 接口的流式响应数据,并生成符合 OpenAI API 流返回格式的 chunk 数据。
"""
for line in akash_response.iter_lines():
if not line:
continue
try:
line_str = line.decode('utf-8').strip()
msg_type, msg_data = line_str.split(':', 1)
if msg_type == '0':
token = msg_data.strip()
# 去掉两边的引号并处理转义字符
if token.startswith('"') and token.endswith('"'):
token = token[1:-1].replace('\\"', '"')
token = token.replace("\\n", "\n")
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"delta": {"content": token},
"index": 0,
"finish_reason": None
}]
}
logging.debug("Streaming chunk: %s", json.dumps(chunk, ensure_ascii=False))
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
elif msg_type in ['e', 'd']:
chunk = {
"id": f"chatcmpl-{chat_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"delta": {},
"index": 0,
"finish_reason": "stop"
}]
}
logging.debug("Streaming finish chunk: %s", json.dumps(chunk, ensure_ascii=False))
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
break
except Exception as ex:
logging.error("Error processing stream line: %s", ex)
continue
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
try:
data = request.get_json()
logging.info("Received /v1/chat/completions request: %s", json.dumps(data, ensure_ascii=False))
chat_id = str(uuid.uuid4()).replace('-', '')[:16]
model = data.get('model', "DeepSeek-R1")
akash_data = {
"id": chat_id,
"messages": data.get('messages', []),
"model": model,
"system": data.get('system_message', "You are a helpful assistant."),
"temperature": data.get('temperature', 0.6),
"topP": data.get('top_p', 0.95)
}
_cookie = get_cookie()
logging.info(_cookie[:50])
headers = {"Content-Type": "application/json", "Cookie": _cookie}
# 默认 stream 模式开启,但针对 AkashGen 模型关闭流式响应
stream_flag = data.get('stream', True)
if model == "AkashGen":
stream_flag = False
akash_response = requests.post(
'https://chat.akash.network/api/chat',
json=akash_data,
headers=headers,
stream=stream_flag
)
logging.info("Akash API response status: %s", akash_response.status_code)
if stream_flag:
return Response(
generate_stream(akash_response, chat_id, model),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
else:
if model != "AkashGen":
text_matches = re.findall(r'0:"(.*?)"', akash_response.text)
parsed_text = "".join(text_matches)
response_payload = {
"object": "chat.completion",
"created": int(time.time() * 1000),
"model": model,
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": parsed_text},
"finish_reason": "stop"
}]
}
logging.info("Non-stream response payload: %s", json.dumps(response_payload, ensure_ascii=False))
return Response(
json.dumps(response_payload, ensure_ascii=False),
status=akash_response.status_code,
mimetype='application/json'
)
else:
match = re.search(r"jobId='([^']+)'", akash_response.text)
if match:
job_id = match.group(1)
logging.info("AkashGen jobId: %s", job_id)
while True:
try:
img_response = requests.get(
f'https://chat.akash.network/api/image-status?ids={job_id}',
headers=headers
)
img_data = img_response.json()
if img_data[0]["status"] == "completed":
response_payload = {
"object": "chat.completion",
"created": int(time.time() * 1000),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": f"根据您的描述,这里是一张生成的图片:\n\n![生成的图片]({img_data[0]['result']})"
},
"finish_reason": "stop"
}]
}
logging.info("AkashGen completed response payload: %s", json.dumps(response_payload, ensure_ascii=False))
return Response(
json.dumps(response_payload, ensure_ascii=False),
status=akash_response.status_code,
mimetype='application/json'
)
else:
logging.info("图片生成中,jobId: %s", job_id)
except Exception as e:
logging.error("请求图片状态异常: %s", e)
time.sleep(5)
else:
logging.error("未能解析到 jobId")
return jsonify({"error": "当前官方服务异常"}), 500
except Exception as e:
logging.exception("chat_completions error:")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5200)