test2 / app.py
kavein44's picture
Update app.py
a250b99 verified
import requests
import json
import os
from flask import Flask, request, Response, stream_with_context
app = Flask(__name__)
# --- 配置 (环境变量) ---
CODESTORY_API_BASE_URL = os.environ.get("CODESTORY_API_BASE_URL", "https://codestory-provider-dot-anton-390822.ue.r.appspot.com")
DEFAULT_SYSTEM_MESSAGE = os.environ.get("DEFAULT_SYSTEM_MESSAGE", "You are a helpful assistant that follows all user instructions.")
MODEL_NAME = os.environ.get("MODEL_NAME", "anthropic/claude-3-opus:beta")
# Hugging Face Spaces 会自动设置 PORT 环境变量
PORT = int(os.environ.get("PORT", 7860)) # 默认端口通常是 7860
# --- 访问令牌 (从环境变量获取) ---
# 在 Hugging Face Spaces 中,你将在 Space 的 Secrets 中设置 CODESTORY_ACCESS_TOKEN。
access_token = os.environ.get("CODESTORY_ACCESS_TOKEN")
if not access_token:
raise ValueError("CODESTORY_ACCESS_TOKEN environment variable must be set in Hugging Face Space Secrets.")
# --- 请求函数 ---
def chat_request(messages, temp, system):
if not system:
system = [{"type": "text", "text": DEFAULT_SYSTEM_MESSAGE}]
payload = {
"model": MODEL_NAME,
"temperature": temp,
"stream": True,
"messages": [
{"role": "system", "content": system},
*messages
]
}
try:
resp = requests.post(f"{CODESTORY_API_BASE_URL}/openrouter-api",
headers={"authorization": f"Bearer {access_token}", "content-type": "application/json"},
json=payload,
stream=True,
timeout=60)
resp.raise_for_status()
return resp
except requests.exceptions.RequestException as e:
print(f"Error during API request: {e}") # 错误日志
return None
# --- 路由 ---
# 健康检查端点
@app.route('/healthz')
def health_check():
return {"status": "ok"}
@app.route("/messages", methods=["POST"])
def handle_chat():
print(f"Received request: {request.json}") # 记录收到的请求
data = request.json
streaming = data.get("stream", True)
result = chat_request(
messages=data.get("messages"),
temp=data.get("temperature", 0.7),
system=data.get("system")
)
if not result:
print("chat_request failed") # 错误日志
return {"error": "Request failed"}, 500
if streaming:
def generate():
try:
for l in result.iter_lines():
if not l:
continue
try:
d = json.loads(l.decode('utf-8').replace('data: ', ''))
if 'choices' in d and len(d['choices']) > 0:
chunk = d['choices'][0].get('delta', {}).get('content', '')
if chunk:
resp = {
'type': 'content_block_delta',
'delta': {'type': 'text_delta', 'text': chunk},
'index': 0
}
yield f"event: content_block_delta\ndata: {json.dumps(resp, ensure_ascii=False)}\n\n"
if d.get('choices', [{}])[0].get('finish_reason') is not None:
yield 'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null}}\n\n'
yield 'event: message_stop\ndata: {"type":"message_stop"}\n\n'
break
except (json.JSONDecodeError, KeyError) as e:
print(f"Error processing streaming response: {e}") # 错误日志
continue
except Exception as e:
print(f"Error in stream: {e}") # 错误日志
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
return Response(stream_with_context(generate()), content_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'})
else:
try:
txt = ""
for l in result.iter_lines():
if not l: continue
try:
d = json.loads(l.decode('utf-8').replace('data: ', ''))
if 'choices' in d and len(d['choices']) > 0:
chunk = d['choices'][0].get('delta', {}).get('content', '')
if chunk: txt += chunk
if d.get('choices', [{}])[0].get('finish_reason') is not None: break
except (json.JSONDecodeError,KeyError) as e:
print(f"Error processing response: {e}") #错误日志
return {"error": "Error processing response"},500
return {"type": "message", "content": [{"type": "text", "text": txt}]}
except Exception as e:
print(f"Error in processing: {e}") #错误日志
return {"error": str(e)},500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=PORT, debug=False) #生产环境不要用debug模式