import requests import json import os from flask import Flask, request, Response, stream_with_context app = Flask(__name__) # --- 配置 (环境变量) --- CODESTORY_API_BASE_URL = os.environ.get("CODESTORY_API_BASE_URL", "https://codestory-provider-dot-anton-390822.ue.r.appspot.com") DEFAULT_SYSTEM_MESSAGE = os.environ.get("DEFAULT_SYSTEM_MESSAGE", "You are a helpful assistant that follows all user instructions.") MODEL_NAME = os.environ.get("MODEL_NAME", "anthropic/claude-3-opus:beta") # Hugging Face Spaces 会自动设置 PORT 环境变量 PORT = int(os.environ.get("PORT", 7860)) # 默认端口通常是 7860 # --- 访问令牌 (从环境变量获取) --- # 在 Hugging Face Spaces 中,你将在 Space 的 Secrets 中设置 CODESTORY_ACCESS_TOKEN。 access_token = os.environ.get("CODESTORY_ACCESS_TOKEN") if not access_token: raise ValueError("CODESTORY_ACCESS_TOKEN environment variable must be set in Hugging Face Space Secrets.") # --- 请求函数 --- def chat_request(messages, temp, system): if not system: system = [{"type": "text", "text": DEFAULT_SYSTEM_MESSAGE}] payload = { "model": MODEL_NAME, "temperature": temp, "stream": True, "messages": [ {"role": "system", "content": system}, *messages ] } try: resp = requests.post(f"{CODESTORY_API_BASE_URL}/openrouter-api", headers={"authorization": f"Bearer {access_token}", "content-type": "application/json"}, json=payload, stream=True, timeout=60) resp.raise_for_status() return resp except requests.exceptions.RequestException as e: print(f"Error during API request: {e}") # 错误日志 return None # --- 路由 --- # 健康检查端点 @app.route('/healthz') def health_check(): return {"status": "ok"} @app.route("/messages", methods=["POST"]) def handle_chat(): print(f"Received request: {request.json}") # 记录收到的请求 data = request.json streaming = data.get("stream", True) result = chat_request( messages=data.get("messages"), temp=data.get("temperature", 0.7), system=data.get("system") ) if not result: print("chat_request failed") # 错误日志 return {"error": "Request failed"}, 500 if streaming: def generate(): try: for l in result.iter_lines(): if not l: continue try: d = json.loads(l.decode('utf-8').replace('data: ', '')) if 'choices' in d and len(d['choices']) > 0: chunk = d['choices'][0].get('delta', {}).get('content', '') if chunk: resp = { 'type': 'content_block_delta', 'delta': {'type': 'text_delta', 'text': chunk}, 'index': 0 } yield f"event: content_block_delta\ndata: {json.dumps(resp, ensure_ascii=False)}\n\n" if d.get('choices', [{}])[0].get('finish_reason') is not None: yield 'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null}}\n\n' yield 'event: message_stop\ndata: {"type":"message_stop"}\n\n' break except (json.JSONDecodeError, KeyError) as e: print(f"Error processing streaming response: {e}") # 错误日志 continue except Exception as e: print(f"Error in stream: {e}") # 错误日志 yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" return Response(stream_with_context(generate()), content_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'}) else: try: txt = "" for l in result.iter_lines(): if not l: continue try: d = json.loads(l.decode('utf-8').replace('data: ', '')) if 'choices' in d and len(d['choices']) > 0: chunk = d['choices'][0].get('delta', {}).get('content', '') if chunk: txt += chunk if d.get('choices', [{}])[0].get('finish_reason') is not None: break except (json.JSONDecodeError,KeyError) as e: print(f"Error processing response: {e}") #错误日志 return {"error": "Error processing response"},500 return {"type": "message", "content": [{"type": "text", "text": txt}]} except Exception as e: print(f"Error in processing: {e}") #错误日志 return {"error": str(e)},500 if __name__ == '__main__': app.run(host='0.0.0.0', port=PORT, debug=False) #生产环境不要用debug模式