Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| umans2api: 把 umans.ai 的私有 chat 接口转换为 Anthropic /v1/messages 兼容接口。 | |
| 适配 Claude Code (ANTHROPIC_BASE_URL + ANTHROPIC_AUTH_TOKEN)。 | |
| 特性: | |
| - Claude 型号自动映射到 umans 上游 (claude / 关键词 自动映射到 umans-coder / umans-glm-5.1 / umans-kimi-k2.5) | |
| - 通过 prompt 注入 + JSON 解析模拟 tool_use,兼容 Claude Code 的工具调用协议 | |
| - 同时暴露 OpenAI /v1/chat/completions 兼容端点 | |
| """ | |
| import json | |
| import re | |
| import time | |
| import uuid | |
| import logging | |
| from pathlib import Path | |
| import os | |
| import base64 | |
| import mimetypes | |
| from urllib.parse import unquote | |
| import requests | |
| from flask import Flask, Response, jsonify, request, stream_with_context | |
| # ---------- 配置 ---------- | |
| PORT = int(os.environ.get("PORT", "7860")) | |
| HOST = os.environ.get("HOST", "0.0.0.0") | |
| API_KEY = os.environ.get("API_KEY", "") | |
| ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "") | |
| UPSTREAM_URL = os.environ.get("UPSTREAM_URL", "https://app.umans.ai/api/chat") | |
| DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "umans-coder") | |
| AVAILABLE_MODELS = os.environ.get("AVAILABLE_MODELS", "umans-coder,umans-glm-5.1,umans-kimi-k2.5").split(",") | |
| CLAUDE_MODEL_MAP = json.loads(os.environ.get("CLAUDE_MODEL_MAP", '{"claude-opus-4-6":"umans-coder","claude-opus-4-7":"umans-coder","claude-sonnet-4-6":"umans-glm-5.1","claude-haiku-4-5":"umans-kimi-k2.5"}')) | |
| CLAUDE_KEYWORD_MAP = json.loads(os.environ.get("CLAUDE_KEYWORD_MAP", '{"opus":"umans-coder","sonnet":"umans-glm-5.1","haiku":"umans-kimi-k2.5","glm":"umans-glm-5.1","kimi":"umans-kimi-k2.5","coder":"umans-coder"}')) | |
| DATA_DIR = Path("/data") if Path("/data").exists() and os.access("/data", os.W_OK) else Path.home() | |
| COOKIES_PATH = DATA_DIR / "umans2api" / "cookies.json" | |
| def load_cookies(): | |
| env = os.environ.get("COOKIES_JSON", "").strip() | |
| if env: | |
| try: return json.loads(env) | |
| except: log.warning("COOKIES_JSON parse failed") | |
| if COOKIES_PATH.exists(): | |
| try: return json.loads(COOKIES_PATH.read_text("utf-8")) | |
| except: pass | |
| return {} | |
| def save_cookies(cookies): | |
| COOKIES_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| COOKIES_PATH.write_text(json.dumps(cookies, indent=2, ensure_ascii=False), "utf-8") | |
| COOKIES = load_cookies() | |
| UA = ( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36" | |
| ) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| ) | |
| log = logging.getLogger("umans2api") | |
| app = Flask(__name__) | |
| # ---------- 工具函数 ---------- | |
| def gen_uuid() -> str: | |
| return str(uuid.uuid4()) | |
| def check_auth() -> bool: | |
| """校验 Anthropic 风格鉴权头""" | |
| if not API_KEY: | |
| return True | |
| header = ( | |
| request.headers.get("x-api-key") | |
| or request.headers.get("X-Api-Key") | |
| or "" | |
| ) | |
| if header == API_KEY: | |
| return True | |
| auth = request.headers.get("Authorization", "") | |
| if auth.startswith("Bearer ") and auth[len("Bearer "):] == API_KEY: | |
| return True | |
| return False | |
| def check_admin() -> bool: | |
| if not ADMIN_PASSWORD: | |
| return True | |
| supplied = request.headers.get("X-Admin-Password", "") or request.args.get("password", "") | |
| return supplied == ADMIN_PASSWORD | |
| def resolve_model(req_model: str) -> str: | |
| """ | |
| 把客户端传入的 model 转成 umans 上游名字。 | |
| 顺序: | |
| 1. 精确命中 available_models → 透传 | |
| 2. 精确命中 claude_model_map | |
| 3. 名字里含 opus/sonnet/haiku → claude_keyword_map | |
| 4. default | |
| """ | |
| if not req_model: | |
| return DEFAULT_MODEL | |
| if req_model in AVAILABLE_MODELS: | |
| return req_model | |
| if req_model in CLAUDE_MODEL_MAP: | |
| return CLAUDE_MODEL_MAP[req_model] | |
| low = req_model.lower() | |
| for kw, up in CLAUDE_KEYWORD_MAP.items(): | |
| if kw in low: | |
| return up | |
| return DEFAULT_MODEL | |
| # ---------- tool_use 协议模拟 ---------- | |
| TOOL_SYSTEM_TEMPLATE = """You are connected to a client application through a tool-calling API. The client has registered the tools below and will execute them for you when you request a call. You do not have direct access to the user's environment — only these tools can act on it. | |
| Protocol: | |
| When a tool is the right way to answer, write your reply as exactly one block and nothing else: | |
| <tool_call> | |
| {"name": "<tool_name>", "input": { ... arguments matching the tool's schema ... }} | |
| </tool_call> | |
| The client parses this block, runs the tool, and sends the result back as a tool_result in the next turn. Then you can continue the conversation naturally. | |
| A few notes: | |
| - One tool call per response. Wait for the tool_result before planning the next step. | |
| - Keep the JSON strict (double quotes, no trailing commas, no code fences around it). | |
| - If the question is purely conversational, just reply in plain text — no <tool_call> needed. | |
| - Prefer the registered tools over describing what you would do; the user only sees tool_result output, not narration about tool calls. | |
| Registered tools: | |
| __TOOLS_JSON__ | |
| """ | |
| # 识别 <tool_call>{...}</tool_call>,兼容 ```json 代码块包裹 | |
| TOOL_CALL_RE = re.compile( | |
| r"<\s*tool_call\s*>\s*(?:```(?:json)?\s*)?(\{.*?\})\s*(?:```\s*)?<\s*/\s*tool_call\s*>", | |
| re.DOTALL | re.IGNORECASE, | |
| ) | |
| TOOL_TAG_RE = re.compile( | |
| r"<\s*tool\s*>\s*(?:```(?:json)?\s*)?(\{.*?\})\s*(?:```\s*)?<\s*/\s*tool\s*>", | |
| re.DOTALL | re.IGNORECASE, | |
| ) | |
| TOOL_BRACKET_RE = re.compile( | |
| r"\[\s*tool\s*\]\s*(?:```(?:json)?\s*)?(\{[\s\S]*?\})(?:\s*```)?\s*$", | |
| re.DOTALL | re.IGNORECASE, | |
| ) | |
| # 兜底:如果模型没加 <tool_call>,但整段就是一个严格 {"name":...,"input":...} JSON,也视为工具调用 | |
| TOOL_CALL_BARE_RE = re.compile( | |
| r'^\s*(\{\s*"name"\s*:\s*"[^"]+"\s*,\s*"input"\s*:\s*\{.*?\}\s*\})\s*$', | |
| re.DOTALL, | |
| ) | |
| def build_tools_prompt(tools): | |
| """把 Claude Code 的 tools 列表序列化成 system 片段""" | |
| if not tools: | |
| return None | |
| simplified = [] | |
| for t in tools: | |
| if not isinstance(t, dict): | |
| continue | |
| simplified.append( | |
| { | |
| "name": t.get("name"), | |
| "description": t.get("description", ""), | |
| "input_schema": t.get("input_schema", {}), | |
| } | |
| ) | |
| if not simplified: | |
| return None | |
| return TOOL_SYSTEM_TEMPLATE.replace( | |
| "__TOOLS_JSON__", | |
| json.dumps(simplified, ensure_ascii=False, indent=2), | |
| ) | |
| def parse_tool_call(text): | |
| """从文本里找出 tool_call/tool,返回 (tool_name, input_dict, rest_text) 或 None""" | |
| if not isinstance(text, str) or not text.strip(): | |
| return None | |
| patterns = [TOOL_CALL_RE, TOOL_TAG_RE, TOOL_BRACKET_RE] | |
| for pat in patterns: | |
| m = pat.search(text) | |
| if not m: | |
| continue | |
| raw_json = m.group(1).strip() | |
| try: | |
| obj = json.loads(raw_json) | |
| except json.JSONDecodeError: | |
| continue | |
| if not isinstance(obj, dict) or "name" not in obj: | |
| continue | |
| name = obj.get("name") | |
| tool_input = obj.get("input", {}) | |
| if not isinstance(tool_input, dict): | |
| tool_input = {} | |
| # 命中工具块后,默认丢弃周围解释文本,避免把“打包中/签名中”之类脏文本漏给用户 | |
| return name, tool_input, "" | |
| # bare JSON 兜底 | |
| m2 = TOOL_CALL_BARE_RE.match(text) | |
| if m2: | |
| try: | |
| obj = json.loads(m2.group(1)) | |
| if isinstance(obj, dict) and "name" in obj: | |
| ti = obj.get("input", {}) | |
| if not isinstance(ti, dict): | |
| ti = {} | |
| return obj["name"], ti, "" | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| # ---------- Anthropic → 纯文本 ---------- | |
| def normalize_media_type(media_type: str, default: str = "application/octet-stream") -> str: | |
| media_type = (media_type or "").strip().lower() | |
| return media_type or default | |
| def guess_filename_from_url(url: str, fallback: str = "upload.bin") -> str: | |
| try: | |
| name = unquote(url.split("?", 1)[0].rsplit("/", 1)[-1]).strip() | |
| return name or fallback | |
| except Exception: | |
| return fallback | |
| def build_file_part(url: str, filename: str, media_type: str) -> dict: | |
| return { | |
| "type": "file", | |
| "url": url, | |
| "filename": filename, | |
| "mediaType": normalize_media_type(media_type), | |
| } | |
| def anthropic_messages_to_text(system, messages, extra_system=None): | |
| """ | |
| 把 Anthropic /v1/messages 的 messages + system 拍扁成单条 user 文本。 | |
| 图像块会编码成占位符,后续由 build_upstream_payload 还原为 umans 的 file parts。 | |
| """ | |
| parts = [] | |
| if extra_system: | |
| parts.append( | |
| "(Client integration notes — please read before responding.)\n\n" | |
| + extra_system | |
| ) | |
| sys_parts = [] | |
| if isinstance(system, str) and system.strip(): | |
| sys_parts.append(system.strip()) | |
| elif isinstance(system, list): | |
| for blk in system: | |
| if isinstance(blk, dict) and blk.get("type") == "text": | |
| sys_parts.append(str(blk.get("text", ""))) | |
| if sys_parts: | |
| parts.append( | |
| "(Caller's system prompt)\n\n" + "\n\n".join(s for s in sys_parts if s) | |
| ) | |
| history = [] | |
| for m in messages or []: | |
| role = m.get("role", "user") | |
| content = m.get("content", "") | |
| if isinstance(content, str): | |
| text = content | |
| elif isinstance(content, list): | |
| buf = [] | |
| for blk in content: | |
| if not isinstance(blk, dict): | |
| continue | |
| t = blk.get("type") | |
| if t == "text": | |
| buf.append(str(blk.get("text", ""))) | |
| elif t == "tool_use": | |
| buf.append( | |
| "<tool_call>\n" | |
| + json.dumps( | |
| {"name": blk.get("name"), "input": blk.get("input", {})}, | |
| ensure_ascii=False, | |
| ) | |
| + "\n</tool_call>" | |
| ) | |
| elif t == "tool_result": | |
| res = blk.get("content", "") | |
| if isinstance(res, list): | |
| res = "\n".join( | |
| str(x.get("text", "")) if isinstance(x, dict) else str(x) | |
| for x in res | |
| ) | |
| tool_use_id = blk.get("tool_use_id", "") | |
| buf.append(f"<tool_result id=\"{tool_use_id}\">\n{res}\n</tool_result>") | |
| elif t == "image": | |
| src = blk.get("source", {}) if isinstance(blk, dict) else {} | |
| if isinstance(src, dict) and src.get("type") == "url": | |
| url = str(src.get("url", "")).strip() | |
| if url: | |
| media = normalize_media_type(src.get("media_type", "image/png"), "image/png") | |
| filename = guess_filename_from_url(url, f"image{mimetypes.guess_extension(media) or '.png'}") | |
| buf.append(f"[image_url|{media}|{filename}|{url}]") | |
| elif isinstance(src, dict) and src.get("type") == "base64": | |
| media = normalize_media_type(src.get("media_type", "application/octet-stream")) | |
| data = str(src.get("data", "")) | |
| if data: | |
| ext = mimetypes.guess_extension(media) or ".bin" | |
| filename = blk.get("filename") or f"upload{ext}" | |
| buf.append(f"[image_base64|{media}|{filename}|{data}]") | |
| else: | |
| buf.append("[image omitted]") | |
| text = "\n".join(buf) | |
| else: | |
| text = str(content) | |
| tag = { | |
| "user": "User", | |
| "assistant": "Assistant", | |
| "system": "System", | |
| "tool": "Tool", | |
| }.get(role, role.capitalize()) | |
| history.append(f"[{tag}]\n{text}") | |
| if history: | |
| parts.append("\n\n".join(history)) | |
| return "\n\n".join(parts).strip() or "hi" | |
| def upload_image_data_to_umans(data_b64: str, media_type: str, filename: str = "upload.png", chat_id: str = ""): | |
| upload_url = "https://app.umans.ai/api/files/upload" | |
| media_type = normalize_media_type(media_type) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Linux; Android 15; PKG110 Build/UKQ1.231108.001) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.7680.177 Mobile Safari/537.36", | |
| "Accept": "*/*", | |
| "Origin": "https://app.umans.ai", | |
| "X-Requested-With": "mark.via.gp", | |
| "Sec-Fetch-Site": "same-origin", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Dest": "empty", | |
| "Referer": f"https://app.umans.ai/chat/{chat_id or gen_uuid()}", | |
| "sec-ch-ua-platform": '"Android"', | |
| "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Android WebView";v="146"', | |
| "sec-ch-ua-mobile": "?1", | |
| } | |
| raw = base64.b64decode(data_b64) | |
| files = {"file": (filename, raw, media_type)} | |
| r = requests.post(upload_url, headers=headers, cookies=COOKIES, files=files, timeout=120) | |
| if not r.ok: | |
| raise RuntimeError(f"upload failed: {r.status_code} {r.text[:300]}") | |
| try: | |
| j = r.json() | |
| except Exception: | |
| raise RuntimeError(f"upload non-json response: {r.text[:300]}") | |
| if isinstance(j, dict) and isinstance(j.get("url"), str): | |
| return { | |
| "url": j["url"], | |
| "filename": j.get("filename") or filename, | |
| "mediaType": j.get("contentType") or media_type, | |
| } | |
| if isinstance(j, dict) and isinstance(j.get("file"), dict): | |
| return j["file"] | |
| if isinstance(j, dict) and isinstance(j.get("data"), dict): | |
| return j["data"] | |
| raise RuntimeError(f"upload response missing file object: {json.dumps(j, ensure_ascii=False)[:500]}") | |
| def build_upstream_payload(model: str, prompt_text: str): | |
| chat_id = gen_uuid() | |
| msg_id = gen_uuid() | |
| parts = [] | |
| text_lines = [] | |
| image_counter = 0 | |
| for raw_line in prompt_text.splitlines(): | |
| line = raw_line.strip() | |
| if line.startswith('[image_url|') and line.endswith(']'): | |
| try: | |
| payload = line[len('[image_url|'):-1] | |
| media_type, filename, url = payload.split('|', 2) | |
| if url.strip(): | |
| parts.append(build_file_part(url.strip(), filename.strip() or f'image-{image_counter+1}.bin', media_type)) | |
| image_counter += 1 | |
| continue | |
| except Exception as e: | |
| text_lines.append(f'(图片链接解析失败:{e})') | |
| continue | |
| if line.startswith('[image_base64|') and line.endswith(']'): | |
| try: | |
| payload = line[len('[image_base64|'):-1] | |
| media_type, filename, data_b64 = payload.split('|', 2) | |
| media_type = normalize_media_type(media_type) | |
| filename = filename.strip() or f'image-{image_counter+1}{mimetypes.guess_extension(media_type) or ".bin"}' | |
| uploaded = upload_image_data_to_umans(data_b64, media_type, filename, chat_id=chat_id) | |
| if not isinstance(uploaded, dict) or not uploaded.get('url'): | |
| raise RuntimeError(f'invalid upload response: {uploaded}') | |
| parts.append(build_file_part( | |
| uploaded['url'], | |
| uploaded.get('filename') or uploaded.get('name') or filename, | |
| uploaded.get('mediaType') or uploaded.get('mimeType') or uploaded.get('contentType') or media_type, | |
| )) | |
| image_counter += 1 | |
| continue | |
| except Exception as e: | |
| text_lines.append(f'(图片上传失败:{e})') | |
| continue | |
| text_lines.append(raw_line) | |
| text = "\n".join(text_lines).strip() | |
| if text: | |
| parts.append({"type": "text", "text": text}) | |
| if not parts: | |
| parts.append({"type": "text", "text": ""}) | |
| payload = { | |
| "selectedChatModel": model, | |
| "id": chat_id, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "parts": parts, | |
| "id": msg_id, | |
| } | |
| ], | |
| "knowledgeBaseId": None, | |
| } | |
| return payload, chat_id | |
| def build_upstream_headers(chat_id: str): | |
| return { | |
| "Accept": "*/*", | |
| "Accept-Language": "zh-CN,zh;q=0.9", | |
| "Cache-Control": "no-cache", | |
| "Origin": "https://app.umans.ai", | |
| "Referer": f"https://app.umans.ai/chat/{chat_id}", | |
| "User-Agent": UA, | |
| "Content-Type": "application/json", | |
| "Pragma": "no-cache", | |
| "sec-ch-ua": '"Google Chrome";v="147", "Not.A/Brand";v="8", "Chromium";v="147"', | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-ch-ua-platform": '"macOS"', | |
| } | |
| def iter_upstream_events(resp): | |
| """逐行解析 SSE 数据""" | |
| for raw in resp.iter_lines(): | |
| if raw is None: | |
| continue | |
| if isinstance(raw, bytes): | |
| try: | |
| line = raw.decode("utf-8", errors="replace").strip() | |
| except Exception: | |
| continue | |
| else: | |
| line = raw.strip() | |
| if not line or not line.startswith("data:"): | |
| continue | |
| data = line[len("data:"):].strip() | |
| if data == "[DONE]": | |
| yield {"__done__": True} | |
| return | |
| try: | |
| yield json.loads(data) | |
| except json.JSONDecodeError: | |
| log.warning("跳过无法解析的 SSE 行: %s", data[:200]) | |
| # ---------- Anthropic SSE 输出 ---------- | |
| def sse(event: str, data: dict) -> str: | |
| return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" | |
| def collect_full_text(upstream_resp): | |
| """ | |
| 把所有 text-delta 拼起来;同时收集上游原生 tool_use 事件。 | |
| 返回 (full_text, usage_in, usage_out, native_tool_calls) | |
| native_tool_calls: [{"id": ..., "name": ..., "input": {...}}] | |
| """ | |
| chunks = [] | |
| usage_in = 0 | |
| usage_out = 0 | |
| # 按 toolCallId 汇总 | |
| tool_acc = {} # id -> {"name": str, "input_text": str, "input": dict} | |
| tool_order = [] | |
| for ev in iter_upstream_events(upstream_resp): | |
| if ev.get("__done__"): | |
| break | |
| t = ev.get("type") | |
| if t == "text-delta": | |
| chunks.append(ev.get("delta", "")) | |
| elif t == "tool-input-start": | |
| tid = ev.get("toolCallId") or ev.get("toolCallID") or ev.get("id") | |
| if tid and tid not in tool_acc: | |
| tool_acc[tid] = { | |
| "name": ev.get("toolName") or ev.get("name") or "", | |
| "input_text": "", | |
| "input": None, | |
| } | |
| tool_order.append(tid) | |
| elif t == "tool-input-delta": | |
| tid = ev.get("toolCallId") or ev.get("toolCallID") or ev.get("id") | |
| if tid in tool_acc: | |
| tool_acc[tid]["input_text"] += ev.get("inputTextDelta", "") | |
| elif t == "tool-input-available": | |
| tid = ev.get("toolCallId") or ev.get("toolCallID") or ev.get("id") | |
| if tid in tool_acc: | |
| tool_acc[tid]["input"] = ev.get("input") or {} | |
| if not tool_acc[tid]["name"]: | |
| tool_acc[tid]["name"] = ev.get("toolName") or "" | |
| elif t == "finish": | |
| meta = ev.get("messageMetadata", {}) or {} | |
| usage = meta.get("usage", {}) or {} | |
| usage_in = int(usage.get("inputTokens", 0) or 0) | |
| usage_out = int(usage.get("outputTokens", 0) or 0) | |
| native_tools = [] | |
| for tid in tool_order: | |
| t = tool_acc[tid] | |
| inp = t["input"] | |
| if inp is None and t["input_text"]: | |
| try: | |
| inp = json.loads(t["input_text"]) | |
| except json.JSONDecodeError: | |
| inp = {"_raw": t["input_text"]} | |
| if inp is None: | |
| inp = {} | |
| native_tools.append({"id": tid, "name": t["name"], "input": inp}) | |
| return "".join(chunks), usage_in, usage_out, native_tools | |
| def build_tool_use_blocks(full_text, native_tools=None): | |
| """ | |
| 若上游原生 tool_use 存在,优先用原生的; | |
| 否则回退到从文本里提取 <tool_call>。 | |
| 返回 (stop_reason, content_blocks). | |
| """ | |
| blocks = [] | |
| if native_tools: | |
| if full_text.strip(): | |
| blocks.append({"type": "text", "text": full_text}) | |
| for t in native_tools: | |
| blocks.append( | |
| { | |
| "type": "tool_use", | |
| "id": t["id"] or ("toolu_" + uuid.uuid4().hex[:24]), | |
| "name": t["name"], | |
| "input": t["input"] or {}, | |
| } | |
| ) | |
| return "tool_use", blocks | |
| parsed = parse_tool_call(full_text) | |
| if parsed: | |
| name, tool_input, rest = parsed | |
| if rest: | |
| blocks.append({"type": "text", "text": rest}) | |
| blocks.append( | |
| { | |
| "type": "tool_use", | |
| "id": "toolu_" + uuid.uuid4().hex[:24], | |
| "name": name, | |
| "input": tool_input, | |
| } | |
| ) | |
| return "tool_use", blocks | |
| return "end_turn", [{"type": "text", "text": full_text}] | |
| def build_openai_tool_calls(full_text, native_tools=None): | |
| tool_calls = [] | |
| text_parts = [] | |
| stop_reason, blocks = build_tool_use_blocks(full_text, native_tools) | |
| parsed = parse_tool_call(full_text) if not native_tools else None | |
| parsed_name = parsed[0] if parsed else None | |
| parsed_input = parsed[1] if parsed else None | |
| for blk in blocks: | |
| if blk.get("type") == "text": | |
| txt = str(blk.get("text", "")) | |
| if txt.strip(): | |
| text_parts.append(txt) | |
| elif blk.get("type") == "tool_use": | |
| arguments_obj = blk.get("input") or {} | |
| if (not arguments_obj) and parsed_name and blk.get("name") == parsed_name and isinstance(parsed_input, dict): | |
| arguments_obj = parsed_input | |
| tool_calls.append( | |
| { | |
| "id": blk.get("id") or ("call_" + uuid.uuid4().hex[:24]), | |
| "type": "function", | |
| "function": { | |
| "name": blk.get("name") or "unknown_tool", | |
| "arguments": json.dumps(arguments_obj, ensure_ascii=False), | |
| }, | |
| } | |
| ) | |
| text = "\n\n".join(text_parts).strip() | |
| finish_reason = "tool_calls" if tool_calls else "stop" | |
| return text, tool_calls, finish_reason | |
| def convert_openai_tools_to_anthropic(tools): | |
| converted = [] | |
| for item in tools or []: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") == "function" and isinstance(item.get("function"), dict): | |
| fn = item.get("function") or {} | |
| converted.append( | |
| { | |
| "name": fn.get("name"), | |
| "description": fn.get("description", ""), | |
| "input_schema": fn.get("parameters") or {"type": "object", "properties": {}}, | |
| } | |
| ) | |
| else: | |
| converted.append(item) | |
| return converted | |
| def inject_openai_tools_prompt(messages, tools): | |
| tool_system = build_tools_prompt(convert_openai_tools_to_anthropic(tools)) | |
| if not tool_system: | |
| return messages | |
| injected = list(messages or []) | |
| if injected and isinstance(injected[0], dict) and injected[0].get("role") == "system": | |
| existing = injected[0].get("content", "") | |
| injected[0] = {**injected[0], "content": f"{existing}\n\n{tool_system}".strip()} | |
| else: | |
| injected.insert(0, {"role": "system", "content": tool_system}) | |
| return injected | |
| def anthropic_stream(upstream_resp, model_for_output: str, has_tools: bool): | |
| """ | |
| 把 umans SSE 转成 Anthropic 流式格式。 | |
| 如果声明了 tools,先收齐全部文本再判断是否是 tool_call, | |
| 这样可以保证 JSON 不被截断到中途。 | |
| """ | |
| msg_id = "msg_" + uuid.uuid4().hex[:24] | |
| yield sse( | |
| "message_start", | |
| { | |
| "type": "message_start", | |
| "message": { | |
| "id": msg_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "model": model_for_output, | |
| "content": [], | |
| "stop_reason": None, | |
| "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0}, | |
| }, | |
| }, | |
| ) | |
| # ----- 有 tools: 先缓存 ----- | |
| if has_tools: | |
| full, usage_in, usage_out, native_tools = collect_full_text(upstream_resp) | |
| stop_reason, blocks = build_tool_use_blocks(full, native_tools) | |
| for idx, blk in enumerate(blocks): | |
| if blk["type"] == "text": | |
| yield sse( | |
| "content_block_start", | |
| { | |
| "type": "content_block_start", | |
| "index": idx, | |
| "content_block": {"type": "text", "text": ""}, | |
| }, | |
| ) | |
| if blk["text"]: | |
| yield sse( | |
| "content_block_delta", | |
| { | |
| "type": "content_block_delta", | |
| "index": idx, | |
| "delta": {"type": "text_delta", "text": blk["text"]}, | |
| }, | |
| ) | |
| yield sse( | |
| "content_block_stop", | |
| {"type": "content_block_stop", "index": idx}, | |
| ) | |
| elif blk["type"] == "tool_use": | |
| yield sse( | |
| "content_block_start", | |
| { | |
| "type": "content_block_start", | |
| "index": idx, | |
| "content_block": { | |
| "type": "tool_use", | |
| "id": blk["id"], | |
| "name": blk["name"], | |
| }, | |
| }, | |
| ) | |
| _tool_json = json.dumps(blk["input"], ensure_ascii=False) | |
| for _i in range(0, len(_tool_json), 120): | |
| yield sse( | |
| "content_block_delta", | |
| { | |
| "type": "content_block_delta", | |
| "index": idx, | |
| "delta": { | |
| "type": "input_json_delta", | |
| "partial_json": _tool_json[_i:_i+120], | |
| }, | |
| }, | |
| ) | |
| yield sse( | |
| "content_block_stop", | |
| {"type": "content_block_stop", "index": idx}, | |
| ) | |
| yield sse( | |
| "message_delta", | |
| { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": stop_reason, "stop_sequence": None}, | |
| "usage": {"output_tokens": usage_out or max(1, len(full) // 4)}, | |
| }, | |
| ) | |
| yield sse("message_stop", {"type": "message_stop"}) | |
| return | |
| # ----- 无 tools: 实时流 ----- | |
| block_open = False | |
| output_text_len = 0 | |
| usage_out = 0 | |
| stop_reason = "end_turn" | |
| try: | |
| for ev in iter_upstream_events(upstream_resp): | |
| if ev.get("__done__"): | |
| break | |
| t = ev.get("type") | |
| if t == "text-start": | |
| if not block_open: | |
| yield sse( | |
| "content_block_start", | |
| { | |
| "type": "content_block_start", | |
| "index": 0, | |
| "content_block": {"type": "text", "text": ""}, | |
| }, | |
| ) | |
| block_open = True | |
| elif t == "text-delta": | |
| delta = ev.get("delta", "") | |
| if not delta: | |
| continue | |
| if not block_open: | |
| yield sse( | |
| "content_block_start", | |
| { | |
| "type": "content_block_start", | |
| "index": 0, | |
| "content_block": {"type": "text", "text": ""}, | |
| }, | |
| ) | |
| block_open = True | |
| output_text_len += len(delta) | |
| yield sse( | |
| "content_block_delta", | |
| { | |
| "type": "content_block_delta", | |
| "index": 0, | |
| "delta": {"type": "text_delta", "text": delta}, | |
| }, | |
| ) | |
| elif t == "text-end": | |
| if block_open: | |
| yield sse( | |
| "content_block_stop", | |
| {"type": "content_block_stop", "index": 0}, | |
| ) | |
| block_open = False | |
| elif t == "finish": | |
| meta = ev.get("messageMetadata", {}) or {} | |
| usage = meta.get("usage", {}) or {} | |
| usage_out = int(usage.get("outputTokens", 0) or 0) | |
| elif t == "error": | |
| err = ev.get("errorText") or ev.get("error") or "upstream error" | |
| if not block_open: | |
| yield sse( | |
| "content_block_start", | |
| { | |
| "type": "content_block_start", | |
| "index": 0, | |
| "content_block": {"type": "text", "text": ""}, | |
| }, | |
| ) | |
| block_open = True | |
| yield sse( | |
| "content_block_delta", | |
| { | |
| "type": "content_block_delta", | |
| "index": 0, | |
| "delta": {"type": "text_delta", "text": f"\n[upstream error] {err}"}, | |
| }, | |
| ) | |
| except (requests.exceptions.RequestException, GeneratorExit) as e: | |
| log.warning("流式中断: %s", e) | |
| if block_open: | |
| yield sse("content_block_stop", {"type": "content_block_stop", "index": 0}) | |
| yield sse( | |
| "message_delta", | |
| { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": stop_reason, "stop_sequence": None}, | |
| "usage": {"output_tokens": usage_out or max(1, output_text_len // 4)}, | |
| }, | |
| ) | |
| yield sse("message_stop", {"type": "message_stop"}) | |
| # ---------- 路由 ---------- | |
| def health(): | |
| return jsonify({"service": "umans2api", "ok": True, "cookies_set": len(COOKIES) > 0, "default_model": DEFAULT_MODEL}) | |
| def list_models(): | |
| if not check_auth(): | |
| return ( | |
| jsonify( | |
| { | |
| "type": "error", | |
| "error": {"type": "authentication_error", "message": "invalid api key"}, | |
| } | |
| ), | |
| 401, | |
| ) | |
| now = int(time.time()) | |
| ids = list(AVAILABLE_MODELS) + list(CLAUDE_MODEL_MAP.keys()) | |
| return jsonify( | |
| { | |
| "data": [ | |
| {"id": m, "object": "model", "created": now, "owned_by": "umans"} | |
| for m in ids | |
| ], | |
| "object": "list", | |
| } | |
| ) | |
| def messages(): | |
| if not check_auth(): | |
| return ( | |
| jsonify( | |
| { | |
| "type": "error", | |
| "error": {"type": "authentication_error", "message": "invalid api key"}, | |
| } | |
| ), | |
| 401, | |
| ) | |
| body = request.get_json(force=True, silent=True) or {} | |
| req_model = body.get("model", "") | |
| upstream_model = resolve_model(req_model) | |
| stream = bool(body.get("stream", False)) | |
| system = body.get("system") | |
| msgs = body.get("messages", []) | |
| tools = body.get("tools") or [] | |
| has_tools = bool(tools) | |
| tool_system = build_tools_prompt(tools) | |
| prompt_text = anthropic_messages_to_text(system, msgs, extra_system=tool_system) | |
| payload, chat_id = build_upstream_payload(upstream_model, prompt_text) | |
| headers = build_upstream_headers(chat_id) | |
| log.info( | |
| "请求: client=%s -> upstream=%s, stream=%s, tools=%d, prompt_len=%d", | |
| req_model, upstream_model, stream, len(tools), len(prompt_text), | |
| ) | |
| try: | |
| upstream = requests.post( | |
| UPSTREAM_URL, | |
| headers=headers, | |
| cookies=COOKIES, | |
| json=payload, | |
| stream=True, | |
| timeout=300, | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| log.error("上游连接失败: %s", e) | |
| return ( | |
| jsonify({"type": "error", "error": {"type": "api_error", "message": str(e)}}), | |
| 502, | |
| ) | |
| if upstream.status_code != 200: | |
| text = upstream.text[:500] | |
| log.error("上游 HTTP %s: %s", upstream.status_code, text) | |
| return ( | |
| jsonify( | |
| { | |
| "type": "error", | |
| "error": { | |
| "type": "api_error", | |
| "message": f"upstream {upstream.status_code}: {text}", | |
| }, | |
| } | |
| ), | |
| 502, | |
| ) | |
| # 流式 | |
| if stream: | |
| return Response( | |
| stream_with_context( | |
| anthropic_stream(upstream, req_model or upstream_model, has_tools) | |
| ), | |
| mimetype="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| "Connection": "keep-alive", | |
| }, | |
| ) | |
| # 非流式 | |
| full, usage_in, usage_out, native_tools = collect_full_text(upstream) | |
| stop_reason = "end_turn" | |
| content_blocks = [{"type": "text", "text": full}] | |
| if has_tools or native_tools: | |
| stop_reason, content_blocks = build_tool_use_blocks(full, native_tools) | |
| msg_id = "msg_" + uuid.uuid4().hex[:24] | |
| return jsonify( | |
| { | |
| "id": msg_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "model": req_model or upstream_model, | |
| "content": content_blocks, | |
| "stop_reason": stop_reason, | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": usage_in, | |
| "output_tokens": usage_out or max(1, len(full) // 4), | |
| }, | |
| } | |
| ) | |
| # ---------- OpenAI 兼容 ---------- | |
| def chat_completions(): | |
| if not check_auth(): | |
| return ( | |
| jsonify( | |
| { | |
| "error": { | |
| "message": "invalid api key", | |
| "type": "authentication_error", | |
| } | |
| } | |
| ), | |
| 401, | |
| ) | |
| body = request.get_json(force=True, silent=True) or {} | |
| req_model = body.get("model", "") | |
| upstream_model = resolve_model(req_model) | |
| stream = bool(body.get("stream", False)) | |
| tools = body.get("tools") or [] | |
| msgs = inject_openai_tools_prompt(body.get("messages", []), tools) | |
| # 把 OpenAI messages 拼成单条文本;图像块编码成占位符,后续转成 umans file parts | |
| parts = [] | |
| for m in msgs: | |
| role = m.get("role", "user") | |
| content = m.get("content", "") | |
| if isinstance(content, list): | |
| buf = [] | |
| for x in content: | |
| if not isinstance(x, dict): | |
| buf.append(str(x)) | |
| continue | |
| t = x.get("type") | |
| if t == "text": | |
| buf.append(str(x.get("text", ""))) | |
| elif t == "image_url": | |
| image_url = x.get("image_url") or {} | |
| if isinstance(image_url, dict): | |
| url = str(image_url.get("url", "")).strip() | |
| else: | |
| url = str(image_url).strip() | |
| if url: | |
| media = "image/png" | |
| if url.startswith("data:") and ";base64," in url: | |
| header, data_b64 = url.split(",", 1) | |
| media = normalize_media_type(header[5:].split(";", 1)[0], "image/png") | |
| filename = f"upload{mimetypes.guess_extension(media) or '.bin'}" | |
| buf.append(f"[image_base64|{media}|{filename}|{data_b64}]") | |
| else: | |
| filename = guess_filename_from_url(url, f"image{mimetypes.guess_extension(media) or '.png'}") | |
| buf.append(f"[image_url|{media}|{filename}|{url}]") | |
| else: | |
| buf.append(str(x.get("text", "")) if isinstance(x, dict) else str(x)) | |
| content = "\n".join(buf) | |
| tag = {"system": "System", "user": "User", "assistant": "Assistant"}.get(role, role) | |
| parts.append(f"[{tag}]\n{content}") | |
| prompt_text = "\n\n".join(parts).strip() or "hi" | |
| payload, chat_id = build_upstream_payload(upstream_model, prompt_text) | |
| headers = build_upstream_headers(chat_id) | |
| try: | |
| upstream = requests.post( | |
| UPSTREAM_URL, | |
| headers=headers, | |
| cookies=COOKIES, | |
| json=payload, | |
| stream=True, | |
| timeout=300, | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| return jsonify({"error": {"message": str(e), "type": "upstream_error"}}), 502 | |
| if upstream.status_code != 200: | |
| return ( | |
| jsonify({"error": {"message": upstream.text[:500], "type": "upstream_error"}}), | |
| 502, | |
| ) | |
| cmpl_id = "chatcmpl-" + uuid.uuid4().hex[:24] | |
| created = int(time.time()) | |
| if stream: | |
| def gen(): | |
| if tools: | |
| text, usage_in, usage_out, native_tools = collect_full_text(upstream) | |
| content_text, tool_calls, finish_reason = build_openai_tool_calls(text, native_tools) | |
| first_delta = {"role": "assistant"} | |
| if content_text: | |
| first_delta["content"] = content_text | |
| if tool_calls: | |
| first_delta["tool_calls"] = tool_calls | |
| first_chunk = { | |
| "id": cmpl_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": req_model or upstream_model, | |
| "choices": [{"index": 0, "delta": first_delta, "finish_reason": None}], | |
| } | |
| yield f"data: {json.dumps(first_chunk, ensure_ascii=False)}\n\n" | |
| done = { | |
| "id": cmpl_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": req_model or upstream_model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}], | |
| } | |
| yield f"data: {json.dumps(done, ensure_ascii=False)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| first = True | |
| for ev in iter_upstream_events(upstream): | |
| if ev.get("__done__"): | |
| break | |
| if ev.get("type") == "text-delta": | |
| delta = ev.get("delta", "") | |
| if not delta: | |
| continue | |
| chunk = { | |
| "id": cmpl_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": req_model or upstream_model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": ( | |
| {"role": "assistant", "content": delta} | |
| if first | |
| else {"content": delta} | |
| ), | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| first = False | |
| yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" | |
| done = { | |
| "id": cmpl_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": req_model or upstream_model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], | |
| } | |
| yield f"data: {json.dumps(done, ensure_ascii=False)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return Response( | |
| stream_with_context(gen()), | |
| mimetype="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| text, usage_in, usage_out, native_tools = collect_full_text(upstream) | |
| content_text, tool_calls, finish_reason = build_openai_tool_calls(text, native_tools) if tools else (text, [], "stop") | |
| message = {"role": "assistant", "content": content_text if content_text else None} | |
| if tool_calls: | |
| message["tool_calls"] = tool_calls | |
| return jsonify( | |
| { | |
| "id": cmpl_id, | |
| "object": "chat.completion", | |
| "created": created, | |
| "model": req_model or upstream_model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": message, | |
| "finish_reason": finish_reason, | |
| } | |
| ], | |
| "usage": { | |
| "prompt_tokens": usage_in, | |
| "completion_tokens": usage_out or max(1, len(text) // 4), | |
| "total_tokens": (usage_in or 0) + (usage_out or max(1, len(text) // 4)), | |
| }, | |
| } | |
| ) | |
| # ---------- 管理页辅助 ---------- | |
| def mask_cookie_value(v): | |
| if not isinstance(v, str) or not v: | |
| return "" | |
| if len(v) <= 8: | |
| return "*" * len(v) | |
| return v[:4] + "..." + v[-4:] | |
| def masked_cookies_dict(cookies): | |
| return {k: mask_cookie_value(v) for k, v in (cookies or {}).items()} | |
| # ---------- 管理页 ---------- | |
| def admin_get_cookies(): | |
| if not check_admin(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| return jsonify({ | |
| "cookies": masked_cookies_dict(COOKIES), | |
| "count": len(COOKIES), | |
| "saved": bool(COOKIES.get("__Secure-authjs.session-token")) | |
| }) | |
| def admin_set_cookies(): | |
| global COOKIES | |
| if not check_admin(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| body = request.get_json(force=True, silent=True) or {} | |
| new_cookies = body.get("cookies", {}) | |
| if not new_cookies: | |
| return jsonify({"error": "cookies is empty"}), 400 | |
| COOKIES = new_cookies | |
| save_cookies(COOKIES) | |
| return jsonify({"ok": True, "count": len(COOKIES)}) | |
| def admin_health(): | |
| if not check_admin(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| return jsonify({"status": "ok", "cookies_set": len(COOKIES) > 0, "upstream": UPSTREAM_URL, "default_model": DEFAULT_MODEL}) | |
| def admin_clear_cookies(): | |
| global COOKIES | |
| if not check_admin(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| COOKIES = {} | |
| save_cookies(COOKIES) | |
| return jsonify({"ok": True, "cleared": True}) | |
| def admin_test_chat(): | |
| if not check_admin(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| body = request.get_json(force=True, silent=True) or {} | |
| prompt = (body.get("prompt") or "你好,请回复 test ok").strip() | |
| if not COOKIES.get("__Secure-authjs.session-token"): | |
| return jsonify({"error": "no session-token saved"}), 400 | |
| try: | |
| payload = { | |
| "model": DEFAULT_MODEL, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "stream": False, | |
| } | |
| headers = {"User-Agent": UA, "Content-Type": "application/json", "Origin": "https://app.umans.ai", "Referer": "https://app.umans.ai/"} | |
| r = requests.post(UPSTREAM_URL, headers=headers, cookies=COOKIES, json=payload, timeout=60) | |
| text = r.text[:2000] | |
| return jsonify({"ok": r.ok, "status_code": r.status_code, "response_preview": text}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| def admin_form_save(): | |
| if not check_admin(): | |
| return "Unauthorized", 401 | |
| global COOKIES | |
| csrf = (request.form.get("csrf") or "").strip() | |
| session = (request.form.get("session") or "").strip() | |
| if not session: | |
| return admin_render_page(request.args.get("password",""), status_msg="session-token 必填", status_ok=False) | |
| COOKIES = { | |
| "__Host-authjs.csrf-token": csrf, | |
| "__Secure-authjs.callback-url": "https%3A%2F%2Fapp.umans.ai%2F", | |
| "__Secure-authjs.session-token": session, | |
| } | |
| save_cookies(COOKIES) | |
| return admin_render_page(request.args.get("password",""), status_msg="✅ 保存成功", status_ok=True) | |
| def admin_form_clear(): | |
| if not check_admin(): | |
| return "Unauthorized", 401 | |
| global COOKIES | |
| COOKIES = {} | |
| save_cookies(COOKIES) | |
| return admin_render_page(request.args.get("password",""), status_msg="✅ 已清空", status_ok=True) | |
| def admin_form_test(): | |
| if not check_admin(): | |
| return "Unauthorized", 401 | |
| prompt = (request.form.get("prompt") or "你好,请回复 test ok").strip() | |
| if not COOKIES.get("__Secure-authjs.session-token"): | |
| return admin_render_page(request.args.get("password",""), status_msg="❌ 未保存 session-token", status_ok=False) | |
| try: | |
| payload = {"model": DEFAULT_MODEL, "messages": [{"role": "user", "content": prompt}], "stream": False} | |
| headers = {"User-Agent": UA, "Content-Type": "application/json", "Origin": "https://app.umans.ai", "Referer": "https://app.umans.ai/"} | |
| r = requests.post(UPSTREAM_URL, headers=headers, cookies=COOKIES, json=payload, timeout=60) | |
| preview = r.text[:2000] | |
| return admin_render_page(request.args.get("password",""), status_msg=f"测试返回(HTTP {r.status_code}):\n{preview}", status_ok=r.ok) | |
| except Exception as e: | |
| return admin_render_page(request.args.get("password",""), status_msg=f"❌ {e}", status_ok=False) | |
| def admin_form_help(): | |
| if not check_admin(): | |
| return "Unauthorized", 401 | |
| msg = "获取方式:登录 app.umans.ai → F12 → Application → Cookies → 复制 __Host-authjs.csrf-token 和 __Secure-authjs.session-token。" | |
| return admin_render_page(request.args.get("password",""), status_msg=msg, status_ok=True) | |
| def admin_render_page(pw: str, status_msg: str = "", status_ok: bool = True): | |
| masked = masked_cookies_dict(COOKIES) | |
| saved = bool(COOKIES.get("__Secure-authjs.session-token")) | |
| csrf = COOKIES.get("__Host-authjs.csrf-token", "") | |
| session_token = COOKIES.get("__Secure-authjs.session-token", "") | |
| status_cls = "ok" if status_ok else "err" | |
| html = """<!doctype html> | |
| <html lang="zh-CN"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <title>Umans2API Admin</title><style> | |
| *{box-sizing:border-box;margin:0;padding:0}body{font-family:-apple-system,sans-serif;background:#0b1020;color:#e8eefc;padding:20px} | |
| .card{background:#141b34;border:1px solid #2a355e;border-radius:14px;padding:18px;margin-bottom:16px;max-width:900px} | |
| h1{font-size:18px;margin-bottom:12px}h2{font-size:15px;margin-bottom:8px;color:#94c7ff}.muted{color:#9fb0dd;font-size:12px;line-height:1.6;margin-bottom:10px}.warn{color:#ffb4b4} | |
| input,textarea{width:100%;border-radius:8px;border:1px solid #3a4b80;background:#0d1430;color:#fff;padding:8px 10px;font-size:13px;margin-bottom:8px} | |
| textarea{min-height:110px;resize:vertical}label{display:block;font-size:12px;color:#94c7ff;margin-bottom:4px} | |
| button{padding:9px 16px;border-radius:8px;border:none;background:#3451ff;color:#fff;font-weight:600;cursor:pointer;font-size:13px;margin-right:6px;margin-bottom:6px} | |
| button.secondary{background:#273053}button.danger{background:#8b1e3f}.badge{display:inline-block;padding:3px 8px;border-radius:999px;background:#273053;color:#cfe0ff;font-size:12px} | |
| .status{padding:8px 12px;border-radius:6px;margin-top:10px;font-size:12px;white-space:pre-wrap;word-break:break-all}.ok{background:#0d3a1a;color:#8dffb2}.err{background:#3a0d0d;color:#ff9d9d}.row{display:flex;gap:8px;flex-wrap:wrap} | |
| .inline{display:inline} | |
| </style></head><body> | |
| <div class="card"><h1>🤖 Umans2API 管理页</h1><p class="muted">Anthropic + OpenAI 兼容 · 适配 Claude Code</p><p class="muted">已保存状态:<span class="badge">__SAVED__</span></p></div> | |
| <div class="card"><h2>Cookie 管理</h2><p class="muted">页面不会显示完整 cookie,只显示掩码。所有按钮都走服务端提交,不依赖前端 JS。</p> | |
| <div class="status __STATUS_CLS__">__STATUS_MSG__</div> | |
| <form method="post" action="/admin/form/save?password=__PW__"> | |
| <label>__Host-authjs.csrf-token</label><input name="csrf" value="__CSRF__" placeholder="从浏览器 Cookie 复制"/> | |
| <label>__Secure-authjs.session-token <span class="warn">(必填)</span></label><input name="session" value="__SESSION__" placeholder="从浏览器 Cookie 复制"/> | |
| <button type="submit">💾 保存</button></form> | |
| <div class="row"> | |
| <form class="inline" method="get" action="/admin/form/help"><input type="hidden" name="password" value="__PW__"><button class="secondary" type="submit">🧩 获取 Cookie 说明</button></form> | |
| <form class="inline" method="get" action="/"><input type="hidden" name="password" value="__PW__"><button class="secondary" type="submit">🔄 查看已保存状态</button></form> | |
| </div> | |
| <textarea readonly>__MASKED__</textarea> | |
| <form method="post" action="/admin/form/test?password=__PW__"><label>测试对话</label><input name="prompt" value="你好,请回复 test ok" /><button type="submit">🧪 测试对话</button></form> | |
| <form method="post" action="/admin/form/clear?password=__PW__" onsubmit="return confirm('确定清空已保存的 cookie?')"><button class="danger" type="submit">🗑 清空 Cookie</button></form> | |
| <form method="get" action="/"><button class="secondary" type="submit">🚪 退出</button></form> | |
| </div></body></html>""" | |
| html = html.replace('__SAVED__', '已保存' if saved else '未保存') | |
| html = html.replace('__CSRF__', csrf).replace('__SESSION__', session_token) | |
| html = html.replace('__MASKED__', json.dumps(masked, ensure_ascii=False, indent=2)) | |
| html = html.replace('__PW__', pw) | |
| html = html.replace('__STATUS_MSG__', status_msg or '就绪') | |
| html = html.replace('__STATUS_CLS__', status_cls) | |
| return html | |
| def admin_page(): | |
| pw = request.args.get("password", "") | |
| authed = (not ADMIN_PASSWORD) or (pw == ADMIN_PASSWORD) | |
| if not authed: | |
| return """<!doctype html> | |
| <html lang="zh-CN"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <title>Umans2API</title><style> | |
| *{box-sizing:border-box;margin:0;padding:0}body{font-family:-apple-system,sans-serif;background:#0b1020;color:#e8eefc;padding:20px}.card{background:#141b34;border:1px solid #2a355e;border-radius:14px;padding:18px;margin-bottom:16px;max-width:720px}h1{font-size:18px;margin-bottom:12px}h2{font-size:15px;margin-bottom:8px;color:#94c7ff}.muted{color:#9fb0dd;font-size:12px;line-height:1.6;margin-bottom:10px}.warn{color:#ffb4b4}input{width:100%;border-radius:8px;border:1px solid #3a4b80;background:#0d1430;color:#fff;padding:10px 12px;font-size:14px;margin-bottom:10px}button{padding:10px 16px;border-radius:8px;border:none;background:#3451ff;color:#fff;font-weight:600;cursor:pointer;font-size:14px} | |
| </style></head><body> | |
| <div class="card"><h1>🤖 Umans2API</h1><p class="muted">Anthropic + OpenAI 兼容 · 适配 Claude Code</p><p class="muted warn">先输入管理员密码,再进入 Cookie 管理页面。</p></div> | |
| <div class="card"><h2>管理员登录</h2><form method="get" action="/"><input name="password" type="password" placeholder="输入管理员密码" autofocus /><button type="submit">🔓 进入管理页</button></form></div> | |
| </body></html>""" | |
| return admin_render_page(pw) | |
| if __name__ == "__main__": | |
| log.info("启动 umans2api:http://%s:%d", HOST, PORT) | |
| log.info("默认模型: %s", DEFAULT_MODEL) | |
| log.info("可用模型: %s", ", ".join(AVAILABLE_MODELS)) | |
| log.info("Claude 映射: %s", CLAUDE_MODEL_MAP) | |
| app.run(host=HOST, port=PORT, threaded=True) | |