| from __future__ import annotations |
|
|
| import base64 |
| import json |
| import mimetypes |
| import os |
| import re |
| import time |
| from collections.abc import Iterator |
| from pathlib import Path |
| from typing import Any |
|
|
| from src.character_registry import build_tavern_system_prompt, get_character |
| from src.persona_skills import select_skill |
| from src.stream_protocol import normalize_event, parse_sse_lines |
| from src.tts_engine import synthesize_sentence |
|
|
|
|
| DEFAULT_VLLM_BASE_URL = "https://veronicaulises0--virtual-characters-vllm-gemma-serve.modal.run" |
| STAGE_TAG_RE = re.compile( |
| r"<stage\s+expression=\"(?P<expression>[a-z_]+)\"\s+motion=\"(?P<motion>[a-z_]+)\"\s+intensity=\"(?P<intensity>[0-9.]+)\"\s*>" |
| ) |
|
|
|
|
| def stream_reply( |
| user_text: str, |
| history: list[dict], |
| state: dict, |
| media_inputs: dict[str, list[dict[str, Any]]] | None = None, |
| voice_state: dict[str, Any] | None = None, |
| ) -> Iterator[dict]: |
| media_inputs = media_inputs or {"images": []} |
| if os.environ.get("VC_USE_MOCK") == "1": |
| yield from _stream_mock_reply(user_text, state, voice_state) |
| return |
|
|
| modal_url = os.environ.get("VC_MODAL_LLM_URL") |
| if modal_url: |
| yield from _stream_modal_reply(modal_url, user_text, history, state, media_inputs, voice_state) |
| return |
|
|
| vllm_url = os.environ.get("VC_MODAL_VLLM_URL") or DEFAULT_VLLM_BASE_URL |
| if vllm_url: |
| yield from _stream_vllm_reply(vllm_url, user_text, history, state, media_inputs, voice_state) |
| return |
|
|
| yield from _stream_mock_reply(user_text, state, voice_state) |
|
|
|
|
| def _stream_modal_reply( |
| url: str, |
| user_text: str, |
| history: list[dict], |
| state: dict, |
| media_inputs: dict[str, list[dict[str, Any]]], |
| voice_state: dict[str, Any] | None, |
| ) -> Iterator[dict]: |
| import httpx |
|
|
| character = state.get("character") or get_character(state.get("character_id", "star_knight")) |
| payload = { |
| "text": user_text, |
| "history": history[-8:], |
| "character": character, |
| "vision_note": state.get("last_vision_note"), |
| "image_urls": [_data_url(item["path"]) for item in media_inputs.get("images", []) if item.get("path")], |
| "max_new_tokens": 180, |
| } |
| last_stage = state.get("stage", {}) |
| with httpx.stream("POST", url, json=payload, timeout=180) as response: |
| response.raise_for_status() |
| for event in parse_sse_lines(response.iter_lines()): |
| if event.get("type") == "stage": |
| last_stage = {**last_stage, **event} |
| yield event |
| if event.get("type") == "sentence_end" and _voice_enabled(voice_state): |
| yield from _synthesize_audio_event(event.get("text", ""), character, voice_state or state.get("voice", {})) |
| settle = _settled_stage(last_stage) |
| if settle: |
| yield settle |
|
|
|
|
| def _stream_vllm_reply( |
| base_url: str, |
| user_text: str, |
| history: list[dict], |
| state: dict, |
| media_inputs: dict[str, list[dict[str, Any]]] | None, |
| voice_state: dict[str, Any] | None, |
| ) -> Iterator[dict]: |
| import httpx |
|
|
| media_inputs = media_inputs or {"images": []} |
| character = state.get("character") or get_character(state.get("character_id", "star_knight")) |
| skill = select_skill(user_text, character) |
| voice = {**character.get("voice", {}), **(voice_state or {})} |
| reply_text = "" |
| last_stage = {"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.55} |
|
|
| yield normalize_event({"type": "skill", "name": skill}) |
| yield normalize_event(last_stage) |
| yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)}) |
| yield {"type": "debug", "message": f"Modal vLLM endpoint: {base_url.rstrip('/')}"} |
|
|
| payload = { |
| "model": os.environ.get("VC_VLLM_SERVED_MODEL", "llm"), |
| "messages": _build_openai_messages(user_text, history, character, skill, state.get("last_vision_note"), media_inputs), |
| "max_tokens": int(os.environ.get("VC_VLLM_MAX_TOKENS", "220")), |
| "temperature": float(os.environ.get("VC_VLLM_TEMPERATURE", "0.75")), |
| "stream": True, |
| "chat_template_kwargs": {"enable_thinking": False}, |
| } |
| timeout = httpx.Timeout(connect=20, read=float(os.environ.get("VC_VLLM_READ_TIMEOUT", "600")), write=20, pool=20) |
| parser_state = {"pending_stage": True, "buffer": "", "started_text": False} |
| url = base_url.rstrip("/") + "/v1/chat/completions" |
|
|
| max_attempts = int(os.environ.get("VC_VLLM_RETRIES", "2")) |
| for attempt in range(1, max_attempts + 1): |
| try: |
| with httpx.Client(timeout=timeout, trust_env=False) as client: |
| with client.stream("POST", url, json=payload) as response: |
| response.raise_for_status() |
| for raw_line in response.iter_lines(): |
| if not raw_line: |
| continue |
| line = raw_line.strip() |
| if line.startswith("data:"): |
| line = line[5:].strip() |
| if line == "[DONE]": |
| break |
| try: |
| chunk = json.loads(line) |
| except json.JSONDecodeError: |
| yield {"type": "debug", "message": "invalid vLLM stream chunk", "raw": line[:200]} |
| continue |
|
|
| for text in _extract_delta_text(chunk): |
| for event in _events_from_vllm_delta(text, parser_state): |
| if event["type"] == "text_delta": |
| reply_text += event["text"] |
| elif event["type"] == "stage": |
| last_stage = {**last_stage, **event} |
| yield event |
| break |
| except Exception as exc: |
| if attempt < max_attempts and not reply_text: |
| parser_state = {"pending_stage": True, "buffer": "", "started_text": False} |
| yield {"type": "debug", "message": f"Modal vLLM retry {attempt}/{max_attempts}: {exc}"} |
| time.sleep(2 * attempt) |
| continue |
| worried = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.8}) |
| yield worried |
| yield {"type": "error", "message": f"Modal vLLM 调用失败:{exc}"} |
| yield from _stream_mock_reply(user_text, state, voice_state) |
| return |
|
|
| for event in _flush_vllm_parser(parser_state): |
| if event["type"] == "text_delta": |
| reply_text += event["text"] |
| elif event["type"] == "stage": |
| last_stage = {**last_stage, **event} |
| yield event |
|
|
| clean_reply = reply_text.strip() |
| if clean_reply: |
| yield normalize_event({"type": "sentence_end", "text": clean_reply}) |
| if _voice_enabled(voice_state): |
| yield from _synthesize_audio_event(clean_reply, character, voice_state or state.get("voice", {})) |
| settle = _settled_stage(last_stage) |
| if settle: |
| yield settle |
| yield {"type": "done"} |
|
|
|
|
| def _build_openai_messages( |
| user_text: str, |
| history: list[dict], |
| character: dict, |
| skill: str, |
| vision_note: str | None, |
| media_inputs: dict[str, list[dict[str, Any]]] | None = None, |
| ) -> list[dict[str, Any]]: |
| system_prompt = build_tavern_system_prompt( |
| character, |
| skill=skill, |
| vision_note=vision_note, |
| include_examples=len(history) < 8, |
| ) |
|
|
| messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] |
| for item in history[-10:]: |
| role = item.get("role") |
| content = item.get("content") |
| if role not in {"user", "assistant"}: |
| continue |
| if isinstance(content, list): |
| continue |
| content_text = str(content or "").strip() |
| if content_text: |
| messages.append({"role": role, "content": content_text}) |
|
|
| user_content = _build_user_content(user_text, media_inputs or {"images": []}) |
| messages.append({"role": "user", "content": user_content}) |
| return messages |
|
|
|
|
| def _build_user_content(user_text: str, media_inputs: dict[str, list[dict[str, Any]]]) -> str | list[dict[str, Any]]: |
| content: list[dict[str, Any]] = [] |
| if user_text.strip(): |
| content.append({"type": "text", "text": user_text.strip()}) |
| for item in media_inputs.get("images", []): |
| path = item.get("path") |
| if path: |
| content.append({"type": "image_url", "image_url": {"url": _data_url(path)}}) |
| if not content: |
| return user_text |
| return content |
|
|
|
|
| def _extract_delta_text(chunk: dict) -> Iterator[str]: |
| for choice in chunk.get("choices") or []: |
| delta = choice.get("delta") or {} |
| text = delta.get("content") or delta.get("reasoning") or delta.get("reasoning_content") |
| if text: |
| yield str(text) |
|
|
|
|
| def _events_from_vllm_delta(text: str, parser_state: dict) -> Iterator[dict]: |
| if not parser_state.get("pending_stage"): |
| if not parser_state.get("started_text"): |
| text = text.lstrip() |
| if not text: |
| return |
| parser_state["started_text"] = True |
| yield normalize_event({"type": "text_delta", "text": text}) |
| return |
|
|
| parser_state["buffer"] += text |
| buffered = parser_state["buffer"] |
| stripped = buffered.lstrip() |
| if not (stripped.startswith("<stage") or "<stage".startswith(stripped)): |
| parser_state["pending_stage"] = False |
| yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) |
| text = buffered.lstrip() |
| if text: |
| parser_state["started_text"] = True |
| yield normalize_event({"type": "text_delta", "text": text}) |
| parser_state["buffer"] = "" |
| return |
|
|
| if ">" not in stripped: |
| if len(stripped) > 180: |
| parser_state["pending_stage"] = False |
| yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) |
| text = buffered.lstrip() |
| if text: |
| parser_state["started_text"] = True |
| yield normalize_event({"type": "text_delta", "text": text}) |
| parser_state["buffer"] = "" |
| return |
|
|
| tag, rest = stripped.split(">", 1) |
| tag = tag + ">" |
| parser_state["pending_stage"] = False |
| parser_state["buffer"] = "" |
| yield _parse_stage_tag(tag) |
| text = rest.lstrip() |
| if text: |
| parser_state["started_text"] = True |
| yield normalize_event({"type": "text_delta", "text": text}) |
|
|
|
|
| def _flush_vllm_parser(parser_state: dict) -> Iterator[dict]: |
| if parser_state.get("pending_stage") and parser_state.get("buffer"): |
| parser_state["pending_stage"] = False |
| yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) |
| text = parser_state["buffer"].lstrip() |
| if text: |
| parser_state["started_text"] = True |
| yield normalize_event({"type": "text_delta", "text": text}) |
| parser_state["buffer"] = "" |
|
|
|
|
| def _parse_stage_tag(tag: str) -> dict: |
| match = STAGE_TAG_RE.fullmatch(tag.strip()) |
| if not match: |
| return normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) |
| return normalize_event( |
| { |
| "type": "stage", |
| "expression": match.group("expression"), |
| "motion": match.group("motion"), |
| "intensity": match.group("intensity"), |
| } |
| ) |
|
|
|
|
| def _stream_mock_reply(user_text: str, state: dict, voice_state: dict[str, Any] | None = None) -> Iterator[dict]: |
| character = state.get("character") or get_character(state.get("character_id", "star_knight")) |
| skill = select_skill(user_text, character) |
| voice = {**character.get("voice", {}), **(voice_state or {})} |
| yield normalize_event({"type": "skill", "name": skill}) |
| yield normalize_event({"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.45}) |
| yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)}) |
|
|
| if skill == "emotional_support": |
| reply = "我在听。你不用马上把自己整理好,先把这口气慢慢呼出来。" |
| last_stage = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.7}) |
| yield last_stage |
| elif skill == "battle_focus": |
| reply = "收到。先确认边界,再判断风险。你站在我身后就好。" |
| last_stage = normalize_event({"type": "stage", "expression": "thinking", "motion": "focus", "intensity": 0.8}) |
| yield last_stage |
| elif skill == "playful_reframe": |
| reply = "这听起来像一个支线任务。目标很小,但奖励可能意外地不错。" |
| last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "soft_sway", "intensity": 0.7}) |
| yield last_stage |
| else: |
| catchphrase = (character.get("dialogue_style", {}).get("catchphrases") or ["我在。"])[0] |
| reply = f"{catchphrase} 你刚才说的我记下了,我们可以从最容易的一步开始。" |
| last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "gentle_blink", "intensity": 0.5}) |
| yield last_stage |
|
|
| for part in _chunk_text(reply): |
| yield normalize_event({"type": "text_delta", "text": part}) |
| yield normalize_event({"type": "sentence_end", "text": reply}) |
| if _voice_enabled(voice_state): |
| yield from _synthesize_audio_event(reply, character, voice_state or state.get("voice", {})) |
| settle = _settled_stage(last_stage) |
| if settle: |
| yield settle |
| yield {"type": "done"} |
|
|
|
|
| def _chunk_text(text: str, size: int = 4) -> Iterator[str]: |
| for index in range(0, len(text), size): |
| yield text[index : index + size] |
|
|
|
|
| def _settled_stage(stage: dict[str, Any] | None) -> dict[str, Any] | None: |
| if not stage: |
| return None |
| if stage.get("motion") != "talk": |
| return None |
| return normalize_event( |
| { |
| "type": "stage", |
| "expression": stage.get("expression", "smile"), |
| "motion": "gentle_blink", |
| "intensity": min(float(stage.get("intensity", 0.55)), 0.6), |
| } |
| ) |
|
|
|
|
| def _voice_enabled(voice_state: dict[str, Any] | None) -> bool: |
| if not voice_state: |
| return True |
| return bool(voice_state.get("enabled", True)) |
|
|
|
|
| def _synthesize_audio_event(text: str, character: dict, voice_state: dict[str, Any]) -> Iterator[dict]: |
| try: |
| audio = synthesize_sentence(text, character, voice_state) |
| except Exception as exc: |
| yield {"type": "error", "message": f"TTS 调用失败:{exc}"} |
| return |
| if audio: |
| yield {"type": "audio", "path": audio} |
|
|
|
|
| def _data_url(path_value: str | Path) -> str: |
| path = Path(path_value) |
| mime_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream" |
| return f"data:{mime_type};base64,{_base64_file(path)}" |
|
|
|
|
| def _base64_file(path_value: str | Path) -> str: |
| return base64.b64encode(Path(path_value).read_bytes()).decode("ascii") |
|
|