from __future__ import annotations import os import time from dataclasses import dataclass from pathlib import Path from typing import Any from urllib.parse import urlparse DEFAULT_VLLM_BASE_URL = "https://veronicaulises0--virtual-characters-vllm-gemma-serve.modal.run" DEFAULT_TTS_URL = "https://veronicaulises0--virtual-characters-tts-charactertts-tts.modal.run" PROJECT_ROOT = Path(__file__).resolve().parents[1] MODAL_SERVICE_FILES = { "tts": PROJECT_ROOT / "modal_apps" / "modal_tts.py", "image_generation": PROJECT_ROOT / "modal_apps" / "modal_character_spike.py", } IMAGE_GENERATION_WAIT_MESSAGE = "Modal 图像生成服务可能已休眠或正在冷启动,请等待容器启动和模型载入后重试。" @dataclass class ModelStatus: kind: str state: str label: str url: str | None = None model: str | None = None latency_s: float | None = None message: str = "" def as_dict(self) -> dict[str, Any]: return { "kind": self.kind, "state": self.state, "label": self.label, "url": self.url, "model": self.model, "latency_s": self.latency_s, "message": self.message, } STATE_LABELS = { "ready": "可用", "loading": "载入中", "sleeping": "已休眠", "error": "错误", "unconfigured": "未配置", "local": "本地服务", "mock": "Mock", "unknown": "待检测", } def configured_llm_url() -> str | None: if os.environ.get("VC_USE_MOCK") == "1": return None return os.environ.get("VC_MODAL_LLM_URL") or os.environ.get("VC_MODAL_VLLM_URL") or DEFAULT_VLLM_BASE_URL def configured_tts_url() -> str | None: return os.environ.get("VC_MODAL_TTS_URL") or DEFAULT_TTS_URL def initial_model_statuses() -> list[ModelStatus]: if os.environ.get("VC_USE_MOCK") == "1": llm = ModelStatus("llm", "mock", STATE_LABELS["mock"], message="当前使用本地 mock 对话。") else: llm = ModelStatus("llm", "unknown", STATE_LABELS["unknown"], url=configured_llm_url(), message="点击刷新检测 Modal LLM。") tts_url = configured_tts_url() return [ llm, _initial_endpoint_status("tts", tts_url, "VC_MODAL_TTS_URL"), _initial_image_generation_status(), ] def llm_loading_status(message: str = "正在启动主模型;首次加载可能需要 1-3 分钟。") -> ModelStatus: if os.environ.get("VC_USE_MOCK") == "1": return ModelStatus("llm", "mock", STATE_LABELS["mock"], message="当前使用本地 mock 对话。") return ModelStatus("llm", "loading", STATE_LABELS["loading"], url=configured_llm_url(), message=message) def statuses_with_llm_status(llm_status: ModelStatus) -> list[ModelStatus]: statuses = initial_model_statuses() return [llm_status, *statuses[1:]] def check_all_statuses(timeout_s: float = 4.0) -> list[ModelStatus]: if os.environ.get("VC_USE_MOCK") == "1": llm = ModelStatus("llm", "mock", STATE_LABELS["mock"], message="当前使用本地 mock 对话。") elif os.environ.get("VC_MODAL_LLM_URL"): llm = _check_simple_health("llm", os.environ["VC_MODAL_LLM_URL"], timeout_s, health_path=None) else: llm = _check_vllm(configured_llm_url(), timeout_s) return [ llm, _check_tts_endpoint(configured_tts_url(), timeout_s), check_image_generation_status(), ] def warm_llm_model(timeout_s: float = 600.0) -> ModelStatus: if os.environ.get("VC_USE_MOCK") == "1": return ModelStatus("llm", "mock", STATE_LABELS["mock"], message="当前使用本地 mock 对话,不需要启动远端模型。") if os.environ.get("VC_MODAL_LLM_URL"): return _warm_modal_llm(os.environ["VC_MODAL_LLM_URL"], timeout_s) return _warm_vllm(configured_llm_url(), timeout_s) def check_image_generation_status() -> ModelStatus: path = MODAL_SERVICE_FILES["image_generation"] if not path.exists(): return ModelStatus( "image_generation", "unconfigured", STATE_LABELS["unconfigured"], message="未找到 modal_apps/modal_character_spike.py。", ) started = time.perf_counter() try: from modal_apps.modal_character_spike import app, spike_health with app.run(): result = spike_health.remote() elapsed = time.perf_counter() - started if result.get("ok"): return ModelStatus( "image_generation", "ready", STATE_LABELS["ready"], url="modal_apps/modal_character_spike.py", model="Qwen/Qwen-Image", latency_s=elapsed, message="Modal 图像生成 health check 通过;生成时仍可能需要等待 GPU 模型载入。", ) return ModelStatus( "image_generation", "loading", STATE_LABELS["loading"], url="modal_apps/modal_character_spike.py", latency_s=elapsed, message=IMAGE_GENERATION_WAIT_MESSAGE, ) except ImportError as exc: return ModelStatus( "image_generation", "unconfigured", STATE_LABELS["unconfigured"], url="modal_apps/modal_character_spike.py", latency_s=time.perf_counter() - started, message=f"Modal Python 包或依赖不可用:{exc}", ) except Exception as exc: return ModelStatus( "image_generation", "sleeping", STATE_LABELS["sleeping"], url="modal_apps/modal_character_spike.py", latency_s=time.perf_counter() - started, message=f"{IMAGE_GENERATION_WAIT_MESSAGE} ({type(exc).__name__}: {exc})", ) def statuses_markdown(statuses: list[ModelStatus]) -> str: rows = [] for status in statuses: css_state = status.state latency = f" · {status.latency_s:.2f}s" if status.latency_s is not None else "" model = f" · `{status.model}`" if status.model else "" url = f"{status.url}" if status.url else f"{_empty_url_label(status.state)}" rows.append( f'
' f'{_kind_label(status.kind)}{status.label}{latency}{model}{url}' f'{status.message}
' ) return '
' + "".join(rows) + "
" def statuses_json(statuses: list[ModelStatus]) -> dict[str, Any]: return {"models": [status.as_dict() for status in statuses]} def _check_vllm(base_url: str | None, timeout_s: float) -> ModelStatus: if not base_url: return ModelStatus("llm", "unconfigured", STATE_LABELS["unconfigured"], message="未设置 vLLM URL。") for path in ("/v1/models", "/models"): status = _get_json("llm", base_url.rstrip("/") + path, timeout_s) if status.state == "ready": data = status.message_json or {} models = data.get("data") if isinstance(data, dict) else None model_id = None if isinstance(models, list) and models: model_id = str(models[0].get("id") or models[0].get("root") or "") return ModelStatus( "llm", "ready", STATE_LABELS["ready"], url=base_url, model=model_id or None, latency_s=status.latency_s, message="vLLM 模型列表可访问。", ) if status.state in {"sleeping", "loading"}: return ModelStatus( "llm", status.state, STATE_LABELS[status.state], url=base_url, latency_s=status.latency_s, message="Modal 模型服务已休眠或正在冷启动,如需体验请等待模型载入。", ) return ModelStatus("llm", "error", STATE_LABELS["error"], url=base_url, message="vLLM 模型状态检测失败。") def _warm_vllm(base_url: str | None, timeout_s: float) -> ModelStatus: if not base_url: return ModelStatus("llm", "unconfigured", STATE_LABELS["unconfigured"], message="未设置 vLLM URL。") import httpx url = base_url.rstrip("/") + "/v1/chat/completions" payload = { "model": os.environ.get("VC_VLLM_SERVED_MODEL", "llm"), "messages": [ {"role": "system", "content": "你是模型启动检查。"}, {"role": "user", "content": "只回复:已就绪"}, ], "max_tokens": 4, "temperature": 0, "stream": False, "chat_template_kwargs": {"enable_thinking": False}, } started = time.perf_counter() timeout = httpx.Timeout(connect=30, read=timeout_s, write=30, pool=30) try: response = httpx.post(url, json=payload, timeout=timeout, trust_env=False) elapsed = time.perf_counter() - started if response.status_code == 200: return ModelStatus( "llm", "ready", STATE_LABELS["ready"], url=base_url, model=os.environ.get("VC_VLLM_SERVED_MODEL", "llm"), latency_s=elapsed, message="主模型已完成短请求,接下来几分钟内对话会更快。", ) if response.status_code in {408, 425, 429, 500, 502, 503, 504}: return ModelStatus( "llm", "loading", STATE_LABELS["loading"], url=base_url, latency_s=elapsed, message=f"启动请求已到达,但服务仍在冷启动或排队:HTTP {response.status_code}", ) return ModelStatus("llm", "error", STATE_LABELS["error"], url=base_url, latency_s=elapsed, message=f"启动失败:HTTP {response.status_code}") except httpx.TimeoutException: return ModelStatus( "llm", "loading", STATE_LABELS["loading"], url=base_url, latency_s=time.perf_counter() - started, message="启动请求超时;Modal 可能仍在拉起容器或加载权重,请稍后刷新状态。", ) except (httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadError) as exc: return ModelStatus( "llm", "sleeping", STATE_LABELS["sleeping"], url=base_url, latency_s=time.perf_counter() - started, message=f"服务暂不可达:{exc}", ) except Exception as exc: return ModelStatus( "llm", "error", STATE_LABELS["error"], url=base_url, latency_s=time.perf_counter() - started, message=f"启动失败:{exc}", ) def _warm_modal_llm(url: str, timeout_s: float) -> ModelStatus: import httpx payload = { "text": "请只回复:已就绪", "character": {"display_name": "启动检查"}, "max_new_tokens": 4, } started = time.perf_counter() timeout = httpx.Timeout(connect=30, read=timeout_s, write=30, pool=30) try: with httpx.stream("POST", url, json=payload, timeout=timeout, trust_env=False) as response: elapsed = time.perf_counter() - started if response.status_code == 200: for line in response.iter_lines(): if line: break return ModelStatus( "llm", "ready", STATE_LABELS["ready"], url=url, latency_s=time.perf_counter() - started, message="主模型已完成短请求,接下来几分钟内对话会更快。", ) if response.status_code in {408, 425, 429, 500, 502, 503, 504}: return ModelStatus("llm", "loading", STATE_LABELS["loading"], url=url, latency_s=elapsed, message=f"服务仍在冷启动或排队:HTTP {response.status_code}") return ModelStatus("llm", "error", STATE_LABELS["error"], url=url, latency_s=elapsed, message=f"启动失败:HTTP {response.status_code}") except httpx.TimeoutException: return ModelStatus( "llm", "loading", STATE_LABELS["loading"], url=url, latency_s=time.perf_counter() - started, message="启动请求超时;Modal 可能仍在拉起容器或加载权重,请稍后刷新状态。", ) except (httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadError) as exc: return ModelStatus("llm", "sleeping", STATE_LABELS["sleeping"], url=url, latency_s=time.perf_counter() - started, message=f"服务暂不可达:{exc}") except Exception as exc: return ModelStatus("llm", "error", STATE_LABELS["error"], url=url, latency_s=time.perf_counter() - started, message=f"启动失败:{exc}") def _check_simple_health(kind: str, url: str | None, timeout_s: float, health_path: str | None) -> ModelStatus: if not url: local = _local_service_status(kind) if local: return local env_name = {"tts": "VC_MODAL_TTS_URL"}.get(kind, "URL") return ModelStatus(kind, "unconfigured", STATE_LABELS["unconfigured"], message=f"未设置 {env_name}。") targets = _health_targets(url, health_path) status = _HttpProbeResult(kind, "error", STATE_LABELS["error"], url=url, message="未执行检测") for target in targets: status = _get_json(kind, target, timeout_s) if status.state != "error": break if status.state == "ready": data = status.message_json if isinstance(status.message_json, dict) else {} return ModelStatus( kind, "ready", STATE_LABELS["ready"], url=url, model=str(data.get("backend") or data.get("model") or "") or None, latency_s=status.latency_s, message="服务健康检查通过。", ) if status.state in {"sleeping", "loading"}: return ModelStatus( kind, status.state, STATE_LABELS[status.state], url=url, latency_s=status.latency_s, message="Modal 模型服务已休眠或正在冷启动,如需体验请等待模型载入。", ) return ModelStatus(kind, "error", STATE_LABELS["error"], url=url, latency_s=status.latency_s, message=status.message) def _check_tts_endpoint(url: str | None, timeout_s: float) -> ModelStatus: if not url: local = _local_service_status("tts") if local: return local return ModelStatus("tts", "unconfigured", STATE_LABELS["unconfigured"], message="未设置 VC_MODAL_TTS_URL。") import httpx target = _tts_endpoint_url(url) timeout_s = max(timeout_s, 15.0) started = time.perf_counter() try: response = httpx.post(target, json={"probe_only": True}, timeout=timeout_s, trust_env=False) elapsed = time.perf_counter() - started if response.status_code == 200: data = response.json() return ModelStatus( "tts", "ready", STATE_LABELS["ready"], url=url, model=str(data.get("backend") or data.get("model") or "") or None, latency_s=elapsed, message="TTS endpoint 可访问;首次合成仍可能需要等待模型载入。", ) if response.status_code in {408, 425, 429, 500, 502, 503, 504}: return ModelStatus( "tts", "loading", STATE_LABELS["loading"], url=url, latency_s=elapsed, message=f"Modal TTS 服务已触达,但可能正在冷启动:HTTP {response.status_code}", ) return ModelStatus("tts", "error", STATE_LABELS["error"], url=url, latency_s=elapsed, message=f"HTTP {response.status_code}") except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadError) as exc: return ModelStatus( "tts", "sleeping", STATE_LABELS["sleeping"], url=url, latency_s=time.perf_counter() - started, message=f"Modal TTS 服务可能已休眠或正在冷启动:{exc}", ) except Exception as exc: return ModelStatus("tts", "error", STATE_LABELS["error"], url=url, latency_s=time.perf_counter() - started, message=f"TTS 状态检测失败:{exc}") @dataclass class _HttpProbeResult(ModelStatus): message_json: Any = None def _get_json(kind: str, url: str, timeout_s: float) -> _HttpProbeResult: import httpx started = time.perf_counter() try: response = httpx.get(url, timeout=timeout_s, trust_env=False) elapsed = time.perf_counter() - started if response.status_code == 200: result = _HttpProbeResult(kind, "ready", STATE_LABELS["ready"], url=url, latency_s=elapsed, message="OK") try: result.message_json = response.json() except ValueError: result.message_json = {} return result if response.status_code in {408, 425, 429, 500, 502, 503, 504}: return _HttpProbeResult(kind, "loading", STATE_LABELS["loading"], url=url, latency_s=elapsed, message=f"HTTP {response.status_code}") return _HttpProbeResult(kind, "error", STATE_LABELS["error"], url=url, latency_s=elapsed, message=f"HTTP {response.status_code}") except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadError) as exc: elapsed = time.perf_counter() - started return _HttpProbeResult(kind, "sleeping", STATE_LABELS["sleeping"], url=url, latency_s=elapsed, message=str(exc)) except Exception as exc: elapsed = time.perf_counter() - started return _HttpProbeResult(kind, "error", STATE_LABELS["error"], url=url, latency_s=elapsed, message=str(exc)) def _kind_label(kind: str) -> str: return {"llm": "LLM", "tts": "TTS", "image_generation": "Image Generation"}.get(kind, kind.upper()) def _empty_url_label(state: str) -> str: if state == "mock": return "本地模拟" if state == "local": return "本地服务定义" if state == "unconfigured": return "未绑定 endpoint" return "待检测" def _initial_endpoint_status(kind: str, url: str | None, env_name: str) -> ModelStatus: if url: return ModelStatus(kind, "unknown", STATE_LABELS["unknown"], url=url, message=f"点击刷新检测 {_kind_label(kind)}。") local = _local_service_status(kind) if local: return local return ModelStatus(kind, "unconfigured", STATE_LABELS["unconfigured"], message=f"未设置 {env_name}。") def _initial_image_generation_status() -> ModelStatus: local = _local_service_status("image_generation") if local: local.state = "unknown" local.label = STATE_LABELS["unknown"] local.model = "Qwen/Qwen-Image" local.message = "点击刷新检测 Modal 图像生成服务。" return local return ModelStatus( "image_generation", "unconfigured", STATE_LABELS["unconfigured"], message="未找到 Modal 图像生成服务定义。", ) def _local_service_status(kind: str) -> ModelStatus | None: path = MODAL_SERVICE_FILES.get(kind) if not path or not path.exists(): return None rel_path = path.relative_to(PROJECT_ROOT).as_posix() env_name = {"tts": "VC_MODAL_TTS_URL", "image_generation": "Modal app"}.get(kind, "URL") return ModelStatus( kind, "local", STATE_LABELS["local"], url=rel_path, message=f"{rel_path} 已存在;部署后设置 {env_name},或从 Modal 输出复制 endpoint URL。", ) def _health_targets(url: str, health_path: str | None) -> list[str]: if not health_path: return [url] base = url.rstrip("/") tail = base.rsplit("/", 1)[-1] service_base = base.rsplit("/", 1)[0] if tail in {"tts", "persona_events"} else base targets = [service_base + health_path] if health_path == "/health": targets.append(service_base + "/health_http") if service_base != base: targets.append(base + health_path) return list(dict.fromkeys(targets)) def _tts_endpoint_url(url: str) -> str: base = url.rstrip("/") parsed = urlparse(base) if not parsed.path or parsed.path == "/": return base if parsed.path.rstrip("/").rsplit("/", 1)[-1] == "tts": return base return base + "/tts"