u2a / app.py
exable324's picture
Update app.py
13fbcb2 verified
import json
import uuid
import time
import random
import os
import requests
import gradio as gr
from fastapi import FastAPI, Request, HTTPException, Depends, status
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# --- 1. 配置与状态缓存 ---
app = FastAPI()
security = HTTPBearer()
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "sk-admin-123456")
API_KEY = "AIzaSyAZaD22Mzi9HkTcW3ErNxRA_sNEFolLBCA"
SUPPORTED_MODELS = ["auto", "gpt-5-mini", "gemini-3-flash", "pro"]
class TokenCache:
def __init__(self):
self.token = ""
self.balance = 0
cache = TokenCache()
# --- 2. 核心后端逻辑 ---
def fetch_account():
"""注册新账号"""
try:
reg_url = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={API_KEY}"
reg_resp = requests.post(reg_url, json={"returnSecureToken": True}, headers={"origin": "https://www.aidocmaker.com"}, timeout=10).json()
token = reg_resp.get("idToken")
if not token: return None
# 同步余额
bal_url = "https://level2labs-prod--adm-agent-helper-user-get-credits.modal.run/"
bal_resp = requests.post(bal_url, headers={"Authorization": f"Bearer {token}"}, timeout=10).json()
cache.token = token
cache.balance = bal_resp.get("premium_quota", 0)
return token
except:
return None
def get_valid_token():
if cache.token and cache.balance > 0:
return cache.token
return fetch_account()
def sync_balance():
if not cache.token: return 0
url = "https://level2labs-prod--adm-agent-helper-user-get-credits.modal.run/"
try:
data = requests.post(url, headers={"Authorization": f"Bearer {cache.token}"}, timeout=10).json()
cache.balance = data.get("premium_quota", 0)
return cache.balance
except:
cache.balance = 0
return 0
# --- 3. 稳健的 SSE 解析器 ---
def parse_upstream_line(line):
"""解析 upstream 返回的每一行,兼容 data: 前缀和纯 JSON"""
if not line: return None
decoded_line = line.decode("utf-8").strip()
if not decoded_line: return None
# 处理 SSE 常见的 data: 前缀
json_str = decoded_line
if decoded_line.startswith("data: "):
json_str = decoded_line[6:].strip()
if json_str == "[DONE]": return None
try:
data_obj = json.loads(json_str)
# 根据之前的日志,内容通常在 "data" 字段
return data_obj.get("data", "")
except:
return None
# --- 4. OpenAI 兼容接口 ---
async def verify_api(auth: HTTPAuthorizationCredentials = Depends(security)):
if auth.credentials != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid Access Token")
return auth.credentials
@app.get("/v1/models")
async def list_models(t: str = Depends(verify_api)):
return {"object": "list", "data": [{"id": m, "object": "model"} for m in SUPPORTED_MODELS]}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, t: str = Depends(verify_api)):
body = await request.json()
model = body.get("model", "auto")
stream = body.get("stream", False)
content = body.get("messages", [])[-1]["content"] if body.get("messages") else ""
jwt = get_valid_token()
if not jwt: raise HTTPException(status_code=500, detail="Failed to get provider token")
url = "https://level2labs-prod--adm-agent-send-chat-message.modal.run/"
data = {
"content": (None, content), "model": (None, model),
"conversation_id": (None, f"c-{uuid.uuid4()}"),
"message_id": (None, f"m-{uuid.uuid4()}"), "max_mode": (None, "true")
}
target_resp = requests.post(url, files=data, headers={"Authorization": f"Bearer {jwt}"}, stream=True, timeout=60)
def generate():
cid = f"chatcmpl-{uuid.uuid4()}"
for line in target_resp.iter_lines():
text_chunk = parse_upstream_line(line)
if text_chunk:
chunk = {
"id": cid, "object": "chat.completion.chunk", "created": int(time.time()),
"model": model, "choices": [{"index": 0, "delta": {"content": text_chunk}, "finish_reason": None}]
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
sync_balance()
if stream:
return StreamingResponse(generate(), media_type="text/event-stream")
else:
full_text = ""
for line in target_resp.iter_lines():
text_chunk = parse_upstream_line(line)
if text_chunk:
full_text += text_chunk
sync_balance()
return {
"id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()),
"model": model, "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}]
}
# --- 5. Gradio 管理面板 ---
with gr.Blocks(title="AI Doc Maker 管理面板", theme=gr.themes.Soft()) as demo:
# 鉴权门禁
with gr.Column(visible=True) as login_view:
gr.Markdown("## 🔐 访问受限\n请输入系统配置的 `ACCESS_TOKEN` 以解锁界面。")
pwd = gr.Textbox(label="ACCESS_TOKEN", type="password")
login_btn = gr.Button("解锁系统", variant="primary")
login_err = gr.Markdown()
# 主操作区
with gr.Column(visible=False) as main_view:
gr.Markdown("# 🚀 AI Doc Maker 后台管理")
with gr.Row():
ui_token = gr.Textbox(label="当前 Token (缓存)", interactive=False)
ui_balance = gr.Textbox(label="剩余额度", interactive=False)
ui_refresh = gr.Button("🔄 刷新账号")
with gr.Row():
with gr.Column():
ui_input = gr.Textbox(label="输入内容", lines=5, placeholder="写个笑话...")
with gr.Row():
ui_chat_btn = gr.Button("💬 发送对话", variant="primary")
ui_tts_btn = gr.Button("🎙️ 生成语音", variant="secondary")
with gr.Column():
ui_output = gr.Textbox(label="AI 回复", lines=8, interactive=False)
ui_audio = gr.Audio(label="语音预览", interactive=False)
# --- UI 交互逻辑 ---
def handle_login(token):
if token == ACCESS_TOKEN:
jwt = get_valid_token()
bal = sync_balance()
return gr.update(visible=False), gr.update(visible=True), jwt, str(bal), ""
return gr.update(visible=True), gr.update(visible=False), "", "", "❌ 密码错误"
login_btn.click(handle_login, inputs=pwd, outputs=[login_view, main_view, ui_token, ui_balance, login_err])
def handle_refresh():
jwt = fetch_account()
bal = sync_balance()
return jwt, str(bal)
ui_refresh.click(handle_refresh, outputs=[ui_token, ui_balance])
def handle_ui_chat(text):
jwt = get_valid_token()
if not jwt: yield "Token 获取失败"; return
url = "https://level2labs-prod--adm-agent-send-chat-message.modal.run/"
data = {"content": (None, text), "model": (None, "auto"), "conversation_id": (None, "ui"), "message_id": (None, "ui"), "max_mode": (None, "true")}
full_text = ""
try:
resp = requests.post(url, files=data, headers={"Authorization": f"Bearer {jwt}"}, stream=True, timeout=60)
for line in resp.iter_lines():
chunk = parse_upstream_line(line)
if chunk:
full_text += chunk
yield full_text
sync_balance()
except Exception as e:
yield f"请求出错: {e}"
ui_chat_btn.click(handle_ui_chat, inputs=ui_input, outputs=ui_output)
def handle_ui_tts(text):
jwt = get_valid_token()
if not jwt: return None
try:
url_create = "https://level2labs-prod--adm-agent-audio-create-audio-with-assistant.modal.run/"
headers = {"Authorization": f"Bearer {jwt}"}
res = requests.post(url_create, files={"prompt": (None, text), "voice_id": (None, "clear"), "speed": (None, "1")}, headers=headers).json()
name = res.get("name")
url_get = f"https://level2labs-prod--adm-agent-audio-get-audio-playback-url.modal.run/?name={name}"
audio_url = requests.get(url_get, headers=headers).json().get("url")
return audio_url
except:
return None
ui_tts_btn.click(handle_ui_tts, inputs=ui_input, outputs=ui_audio)
# 挂载并启动
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860)