|
|
import json |
|
|
import uuid |
|
|
import time |
|
|
import random |
|
|
import os |
|
|
import requests |
|
|
import gradio as gr |
|
|
from fastapi import FastAPI, Request, HTTPException, Depends, status |
|
|
from fastapi.responses import StreamingResponse |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
security = HTTPBearer() |
|
|
|
|
|
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN", "sk-admin-123456") |
|
|
API_KEY = "AIzaSyAZaD22Mzi9HkTcW3ErNxRA_sNEFolLBCA" |
|
|
SUPPORTED_MODELS = ["auto", "gpt-5-mini", "gemini-3-flash", "pro"] |
|
|
|
|
|
class TokenCache: |
|
|
def __init__(self): |
|
|
self.token = "" |
|
|
self.balance = 0 |
|
|
|
|
|
cache = TokenCache() |
|
|
|
|
|
|
|
|
|
|
|
def fetch_account(): |
|
|
"""注册新账号""" |
|
|
try: |
|
|
reg_url = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={API_KEY}" |
|
|
reg_resp = requests.post(reg_url, json={"returnSecureToken": True}, headers={"origin": "https://www.aidocmaker.com"}, timeout=10).json() |
|
|
token = reg_resp.get("idToken") |
|
|
if not token: return None |
|
|
|
|
|
|
|
|
bal_url = "https://level2labs-prod--adm-agent-helper-user-get-credits.modal.run/" |
|
|
bal_resp = requests.post(bal_url, headers={"Authorization": f"Bearer {token}"}, timeout=10).json() |
|
|
|
|
|
cache.token = token |
|
|
cache.balance = bal_resp.get("premium_quota", 0) |
|
|
return token |
|
|
except: |
|
|
return None |
|
|
|
|
|
def get_valid_token(): |
|
|
if cache.token and cache.balance > 0: |
|
|
return cache.token |
|
|
return fetch_account() |
|
|
|
|
|
def sync_balance(): |
|
|
if not cache.token: return 0 |
|
|
url = "https://level2labs-prod--adm-agent-helper-user-get-credits.modal.run/" |
|
|
try: |
|
|
data = requests.post(url, headers={"Authorization": f"Bearer {cache.token}"}, timeout=10).json() |
|
|
cache.balance = data.get("premium_quota", 0) |
|
|
return cache.balance |
|
|
except: |
|
|
cache.balance = 0 |
|
|
return 0 |
|
|
|
|
|
|
|
|
|
|
|
def parse_upstream_line(line): |
|
|
"""解析 upstream 返回的每一行,兼容 data: 前缀和纯 JSON""" |
|
|
if not line: return None |
|
|
decoded_line = line.decode("utf-8").strip() |
|
|
if not decoded_line: return None |
|
|
|
|
|
|
|
|
json_str = decoded_line |
|
|
if decoded_line.startswith("data: "): |
|
|
json_str = decoded_line[6:].strip() |
|
|
|
|
|
if json_str == "[DONE]": return None |
|
|
|
|
|
try: |
|
|
data_obj = json.loads(json_str) |
|
|
|
|
|
return data_obj.get("data", "") |
|
|
except: |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
async def verify_api(auth: HTTPAuthorizationCredentials = Depends(security)): |
|
|
if auth.credentials != ACCESS_TOKEN: |
|
|
raise HTTPException(status_code=401, detail="Invalid Access Token") |
|
|
return auth.credentials |
|
|
|
|
|
@app.get("/v1/models") |
|
|
async def list_models(t: str = Depends(verify_api)): |
|
|
return {"object": "list", "data": [{"id": m, "object": "model"} for m in SUPPORTED_MODELS]} |
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def chat_completions(request: Request, t: str = Depends(verify_api)): |
|
|
body = await request.json() |
|
|
model = body.get("model", "auto") |
|
|
stream = body.get("stream", False) |
|
|
content = body.get("messages", [])[-1]["content"] if body.get("messages") else "" |
|
|
|
|
|
jwt = get_valid_token() |
|
|
if not jwt: raise HTTPException(status_code=500, detail="Failed to get provider token") |
|
|
|
|
|
url = "https://level2labs-prod--adm-agent-send-chat-message.modal.run/" |
|
|
data = { |
|
|
"content": (None, content), "model": (None, model), |
|
|
"conversation_id": (None, f"c-{uuid.uuid4()}"), |
|
|
"message_id": (None, f"m-{uuid.uuid4()}"), "max_mode": (None, "true") |
|
|
} |
|
|
|
|
|
target_resp = requests.post(url, files=data, headers={"Authorization": f"Bearer {jwt}"}, stream=True, timeout=60) |
|
|
|
|
|
def generate(): |
|
|
cid = f"chatcmpl-{uuid.uuid4()}" |
|
|
for line in target_resp.iter_lines(): |
|
|
text_chunk = parse_upstream_line(line) |
|
|
if text_chunk: |
|
|
chunk = { |
|
|
"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), |
|
|
"model": model, "choices": [{"index": 0, "delta": {"content": text_chunk}, "finish_reason": None}] |
|
|
} |
|
|
yield f"data: {json.dumps(chunk)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
sync_balance() |
|
|
|
|
|
if stream: |
|
|
return StreamingResponse(generate(), media_type="text/event-stream") |
|
|
else: |
|
|
full_text = "" |
|
|
for line in target_resp.iter_lines(): |
|
|
text_chunk = parse_upstream_line(line) |
|
|
if text_chunk: |
|
|
full_text += text_chunk |
|
|
sync_balance() |
|
|
return { |
|
|
"id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()), |
|
|
"model": model, "choices": [{"index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="AI Doc Maker 管理面板", theme=gr.themes.Soft()) as demo: |
|
|
|
|
|
with gr.Column(visible=True) as login_view: |
|
|
gr.Markdown("## 🔐 访问受限\n请输入系统配置的 `ACCESS_TOKEN` 以解锁界面。") |
|
|
pwd = gr.Textbox(label="ACCESS_TOKEN", type="password") |
|
|
login_btn = gr.Button("解锁系统", variant="primary") |
|
|
login_err = gr.Markdown() |
|
|
|
|
|
|
|
|
with gr.Column(visible=False) as main_view: |
|
|
gr.Markdown("# 🚀 AI Doc Maker 后台管理") |
|
|
|
|
|
with gr.Row(): |
|
|
ui_token = gr.Textbox(label="当前 Token (缓存)", interactive=False) |
|
|
ui_balance = gr.Textbox(label="剩余额度", interactive=False) |
|
|
ui_refresh = gr.Button("🔄 刷新账号") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
ui_input = gr.Textbox(label="输入内容", lines=5, placeholder="写个笑话...") |
|
|
with gr.Row(): |
|
|
ui_chat_btn = gr.Button("💬 发送对话", variant="primary") |
|
|
ui_tts_btn = gr.Button("🎙️ 生成语音", variant="secondary") |
|
|
|
|
|
with gr.Column(): |
|
|
ui_output = gr.Textbox(label="AI 回复", lines=8, interactive=False) |
|
|
ui_audio = gr.Audio(label="语音预览", interactive=False) |
|
|
|
|
|
|
|
|
def handle_login(token): |
|
|
if token == ACCESS_TOKEN: |
|
|
jwt = get_valid_token() |
|
|
bal = sync_balance() |
|
|
return gr.update(visible=False), gr.update(visible=True), jwt, str(bal), "" |
|
|
return gr.update(visible=True), gr.update(visible=False), "", "", "❌ 密码错误" |
|
|
|
|
|
login_btn.click(handle_login, inputs=pwd, outputs=[login_view, main_view, ui_token, ui_balance, login_err]) |
|
|
|
|
|
def handle_refresh(): |
|
|
jwt = fetch_account() |
|
|
bal = sync_balance() |
|
|
return jwt, str(bal) |
|
|
|
|
|
ui_refresh.click(handle_refresh, outputs=[ui_token, ui_balance]) |
|
|
|
|
|
def handle_ui_chat(text): |
|
|
jwt = get_valid_token() |
|
|
if not jwt: yield "Token 获取失败"; return |
|
|
|
|
|
url = "https://level2labs-prod--adm-agent-send-chat-message.modal.run/" |
|
|
data = {"content": (None, text), "model": (None, "auto"), "conversation_id": (None, "ui"), "message_id": (None, "ui"), "max_mode": (None, "true")} |
|
|
full_text = "" |
|
|
try: |
|
|
resp = requests.post(url, files=data, headers={"Authorization": f"Bearer {jwt}"}, stream=True, timeout=60) |
|
|
for line in resp.iter_lines(): |
|
|
chunk = parse_upstream_line(line) |
|
|
if chunk: |
|
|
full_text += chunk |
|
|
yield full_text |
|
|
sync_balance() |
|
|
except Exception as e: |
|
|
yield f"请求出错: {e}" |
|
|
|
|
|
ui_chat_btn.click(handle_ui_chat, inputs=ui_input, outputs=ui_output) |
|
|
|
|
|
def handle_ui_tts(text): |
|
|
jwt = get_valid_token() |
|
|
if not jwt: return None |
|
|
try: |
|
|
url_create = "https://level2labs-prod--adm-agent-audio-create-audio-with-assistant.modal.run/" |
|
|
headers = {"Authorization": f"Bearer {jwt}"} |
|
|
res = requests.post(url_create, files={"prompt": (None, text), "voice_id": (None, "clear"), "speed": (None, "1")}, headers=headers).json() |
|
|
name = res.get("name") |
|
|
url_get = f"https://level2labs-prod--adm-agent-audio-get-audio-playback-url.modal.run/?name={name}" |
|
|
audio_url = requests.get(url_get, headers=headers).json().get("url") |
|
|
return audio_url |
|
|
except: |
|
|
return None |
|
|
|
|
|
ui_tts_btn.click(handle_ui_tts, inputs=ui_input, outputs=ui_audio) |
|
|
|
|
|
|
|
|
app = gr.mount_gradio_app(app, demo, path="/") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run("app:app", host="0.0.0.0", port=7860) |