virtual-characters / src /dialogue_engine.py
ShadowInk's picture
Upload complete Space runtime files
6bcddd0 verified
Raw
History Blame Contribute Delete
15.5 kB
from __future__ import annotations
import base64
import json
import mimetypes
import os
import re
import time
from collections.abc import Iterator
from pathlib import Path
from typing import Any
from src.character_registry import build_tavern_system_prompt, get_character
from src.persona_skills import select_skill
from src.stream_protocol import normalize_event, parse_sse_lines
from src.tts_engine import synthesize_sentence
DEFAULT_VLLM_BASE_URL = "https://veronicaulises0--virtual-characters-vllm-gemma-serve.modal.run"
STAGE_TAG_RE = re.compile(
r"<stage\s+expression=\"(?P<expression>[a-z_]+)\"\s+motion=\"(?P<motion>[a-z_]+)\"\s+intensity=\"(?P<intensity>[0-9.]+)\"\s*>"
)
def stream_reply(
user_text: str,
history: list[dict],
state: dict,
media_inputs: dict[str, list[dict[str, Any]]] | None = None,
voice_state: dict[str, Any] | None = None,
) -> Iterator[dict]:
media_inputs = media_inputs or {"images": []}
if os.environ.get("VC_USE_MOCK") == "1":
yield from _stream_mock_reply(user_text, state, voice_state)
return
modal_url = os.environ.get("VC_MODAL_LLM_URL")
if modal_url:
yield from _stream_modal_reply(modal_url, user_text, history, state, media_inputs, voice_state)
return
vllm_url = os.environ.get("VC_MODAL_VLLM_URL") or DEFAULT_VLLM_BASE_URL
if vllm_url:
yield from _stream_vllm_reply(vllm_url, user_text, history, state, media_inputs, voice_state)
return
yield from _stream_mock_reply(user_text, state, voice_state)
def _stream_modal_reply(
url: str,
user_text: str,
history: list[dict],
state: dict,
media_inputs: dict[str, list[dict[str, Any]]],
voice_state: dict[str, Any] | None,
) -> Iterator[dict]:
import httpx
character = state.get("character") or get_character(state.get("character_id", "star_knight"))
payload = {
"text": user_text,
"history": history[-8:],
"character": character,
"vision_note": state.get("last_vision_note"),
"image_urls": [_data_url(item["path"]) for item in media_inputs.get("images", []) if item.get("path")],
"max_new_tokens": 180,
}
last_stage = state.get("stage", {})
with httpx.stream("POST", url, json=payload, timeout=180) as response:
response.raise_for_status()
for event in parse_sse_lines(response.iter_lines()):
if event.get("type") == "stage":
last_stage = {**last_stage, **event}
yield event
if event.get("type") == "sentence_end" and _voice_enabled(voice_state):
yield from _synthesize_audio_event(event.get("text", ""), character, voice_state or state.get("voice", {}))
settle = _settled_stage(last_stage)
if settle:
yield settle
def _stream_vllm_reply(
base_url: str,
user_text: str,
history: list[dict],
state: dict,
media_inputs: dict[str, list[dict[str, Any]]] | None,
voice_state: dict[str, Any] | None,
) -> Iterator[dict]:
import httpx
media_inputs = media_inputs or {"images": []}
character = state.get("character") or get_character(state.get("character_id", "star_knight"))
skill = select_skill(user_text, character)
voice = {**character.get("voice", {}), **(voice_state or {})}
reply_text = ""
last_stage = {"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.55}
yield normalize_event({"type": "skill", "name": skill})
yield normalize_event(last_stage)
yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)})
yield {"type": "debug", "message": f"Modal vLLM endpoint: {base_url.rstrip('/')}"}
payload = {
"model": os.environ.get("VC_VLLM_SERVED_MODEL", "llm"),
"messages": _build_openai_messages(user_text, history, character, skill, state.get("last_vision_note"), media_inputs),
"max_tokens": int(os.environ.get("VC_VLLM_MAX_TOKENS", "220")),
"temperature": float(os.environ.get("VC_VLLM_TEMPERATURE", "0.75")),
"stream": True,
"chat_template_kwargs": {"enable_thinking": False},
}
timeout = httpx.Timeout(connect=20, read=float(os.environ.get("VC_VLLM_READ_TIMEOUT", "600")), write=20, pool=20)
parser_state = {"pending_stage": True, "buffer": "", "started_text": False}
url = base_url.rstrip("/") + "/v1/chat/completions"
max_attempts = int(os.environ.get("VC_VLLM_RETRIES", "2"))
for attempt in range(1, max_attempts + 1):
try:
with httpx.Client(timeout=timeout, trust_env=False) as client:
with client.stream("POST", url, json=payload) as response:
response.raise_for_status()
for raw_line in response.iter_lines():
if not raw_line:
continue
line = raw_line.strip()
if line.startswith("data:"):
line = line[5:].strip()
if line == "[DONE]":
break
try:
chunk = json.loads(line)
except json.JSONDecodeError:
yield {"type": "debug", "message": "invalid vLLM stream chunk", "raw": line[:200]}
continue
for text in _extract_delta_text(chunk):
for event in _events_from_vllm_delta(text, parser_state):
if event["type"] == "text_delta":
reply_text += event["text"]
elif event["type"] == "stage":
last_stage = {**last_stage, **event}
yield event
break
except Exception as exc:
if attempt < max_attempts and not reply_text:
parser_state = {"pending_stage": True, "buffer": "", "started_text": False}
yield {"type": "debug", "message": f"Modal vLLM retry {attempt}/{max_attempts}: {exc}"}
time.sleep(2 * attempt)
continue
worried = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.8})
yield worried
yield {"type": "error", "message": f"Modal vLLM 调用失败:{exc}"}
yield from _stream_mock_reply(user_text, state, voice_state)
return
for event in _flush_vllm_parser(parser_state):
if event["type"] == "text_delta":
reply_text += event["text"]
elif event["type"] == "stage":
last_stage = {**last_stage, **event}
yield event
clean_reply = reply_text.strip()
if clean_reply:
yield normalize_event({"type": "sentence_end", "text": clean_reply})
if _voice_enabled(voice_state):
yield from _synthesize_audio_event(clean_reply, character, voice_state or state.get("voice", {}))
settle = _settled_stage(last_stage)
if settle:
yield settle
yield {"type": "done"}
def _build_openai_messages(
user_text: str,
history: list[dict],
character: dict,
skill: str,
vision_note: str | None,
media_inputs: dict[str, list[dict[str, Any]]] | None = None,
) -> list[dict[str, Any]]:
system_prompt = build_tavern_system_prompt(
character,
skill=skill,
vision_note=vision_note,
include_examples=len(history) < 8,
)
messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}]
for item in history[-10:]:
role = item.get("role")
content = item.get("content")
if role not in {"user", "assistant"}:
continue
if isinstance(content, list):
continue
content_text = str(content or "").strip()
if content_text:
messages.append({"role": role, "content": content_text})
user_content = _build_user_content(user_text, media_inputs or {"images": []})
messages.append({"role": "user", "content": user_content})
return messages
def _build_user_content(user_text: str, media_inputs: dict[str, list[dict[str, Any]]]) -> str | list[dict[str, Any]]:
content: list[dict[str, Any]] = []
if user_text.strip():
content.append({"type": "text", "text": user_text.strip()})
for item in media_inputs.get("images", []):
path = item.get("path")
if path:
content.append({"type": "image_url", "image_url": {"url": _data_url(path)}})
if not content:
return user_text
return content
def _extract_delta_text(chunk: dict) -> Iterator[str]:
for choice in chunk.get("choices") or []:
delta = choice.get("delta") or {}
text = delta.get("content") or delta.get("reasoning") or delta.get("reasoning_content")
if text:
yield str(text)
def _events_from_vllm_delta(text: str, parser_state: dict) -> Iterator[dict]:
if not parser_state.get("pending_stage"):
if not parser_state.get("started_text"):
text = text.lstrip()
if not text:
return
parser_state["started_text"] = True
yield normalize_event({"type": "text_delta", "text": text})
return
parser_state["buffer"] += text
buffered = parser_state["buffer"]
stripped = buffered.lstrip()
if not (stripped.startswith("<stage") or "<stage".startswith(stripped)):
parser_state["pending_stage"] = False
yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
text = buffered.lstrip()
if text:
parser_state["started_text"] = True
yield normalize_event({"type": "text_delta", "text": text})
parser_state["buffer"] = ""
return
if ">" not in stripped:
if len(stripped) > 180:
parser_state["pending_stage"] = False
yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
text = buffered.lstrip()
if text:
parser_state["started_text"] = True
yield normalize_event({"type": "text_delta", "text": text})
parser_state["buffer"] = ""
return
tag, rest = stripped.split(">", 1)
tag = tag + ">"
parser_state["pending_stage"] = False
parser_state["buffer"] = ""
yield _parse_stage_tag(tag)
text = rest.lstrip()
if text:
parser_state["started_text"] = True
yield normalize_event({"type": "text_delta", "text": text})
def _flush_vllm_parser(parser_state: dict) -> Iterator[dict]:
if parser_state.get("pending_stage") and parser_state.get("buffer"):
parser_state["pending_stage"] = False
yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
text = parser_state["buffer"].lstrip()
if text:
parser_state["started_text"] = True
yield normalize_event({"type": "text_delta", "text": text})
parser_state["buffer"] = ""
def _parse_stage_tag(tag: str) -> dict:
match = STAGE_TAG_RE.fullmatch(tag.strip())
if not match:
return normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
return normalize_event(
{
"type": "stage",
"expression": match.group("expression"),
"motion": match.group("motion"),
"intensity": match.group("intensity"),
}
)
def _stream_mock_reply(user_text: str, state: dict, voice_state: dict[str, Any] | None = None) -> Iterator[dict]:
character = state.get("character") or get_character(state.get("character_id", "star_knight"))
skill = select_skill(user_text, character)
voice = {**character.get("voice", {}), **(voice_state or {})}
yield normalize_event({"type": "skill", "name": skill})
yield normalize_event({"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.45})
yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)})
if skill == "emotional_support":
reply = "我在听。你不用马上把自己整理好,先把这口气慢慢呼出来。"
last_stage = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.7})
yield last_stage
elif skill == "battle_focus":
reply = "收到。先确认边界,再判断风险。你站在我身后就好。"
last_stage = normalize_event({"type": "stage", "expression": "thinking", "motion": "focus", "intensity": 0.8})
yield last_stage
elif skill == "playful_reframe":
reply = "这听起来像一个支线任务。目标很小,但奖励可能意外地不错。"
last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "soft_sway", "intensity": 0.7})
yield last_stage
else:
catchphrase = (character.get("dialogue_style", {}).get("catchphrases") or ["我在。"])[0]
reply = f"{catchphrase} 你刚才说的我记下了,我们可以从最容易的一步开始。"
last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "gentle_blink", "intensity": 0.5})
yield last_stage
for part in _chunk_text(reply):
yield normalize_event({"type": "text_delta", "text": part})
yield normalize_event({"type": "sentence_end", "text": reply})
if _voice_enabled(voice_state):
yield from _synthesize_audio_event(reply, character, voice_state or state.get("voice", {}))
settle = _settled_stage(last_stage)
if settle:
yield settle
yield {"type": "done"}
def _chunk_text(text: str, size: int = 4) -> Iterator[str]:
for index in range(0, len(text), size):
yield text[index : index + size]
def _settled_stage(stage: dict[str, Any] | None) -> dict[str, Any] | None:
if not stage:
return None
if stage.get("motion") != "talk":
return None
return normalize_event(
{
"type": "stage",
"expression": stage.get("expression", "smile"),
"motion": "gentle_blink",
"intensity": min(float(stage.get("intensity", 0.55)), 0.6),
}
)
def _voice_enabled(voice_state: dict[str, Any] | None) -> bool:
if not voice_state:
return True
return bool(voice_state.get("enabled", True))
def _synthesize_audio_event(text: str, character: dict, voice_state: dict[str, Any]) -> Iterator[dict]:
try:
audio = synthesize_sentence(text, character, voice_state)
except Exception as exc:
yield {"type": "error", "message": f"TTS 调用失败:{exc}"}
return
if audio:
yield {"type": "audio", "path": audio}
def _data_url(path_value: str | Path) -> str:
path = Path(path_value)
mime_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
return f"data:{mime_type};base64,{_base64_file(path)}"
def _base64_file(path_value: str | Path) -> str:
return base64.b64encode(Path(path_value).read_bytes()).decode("ascii")