StoryWeaver / app.py
wanghao76
fix: adapt app.py for Gradio 6.x API (remove Chatbot type, move theme/css to launch)
a8c7472
"""
app.py - StoryWeaver Gradio 交互界面
职责:
1. 构建游戏的 Web 前端界面 (Gradio)
2. 串联 NLU 引擎、叙事引擎、状态管理器
3. 管理用户交互流程(文本输入 + 选项点击)
4. 展示游戏状态(HP、背包、任务等)
数据流转:
用户输入 → NLU 引擎(意图识别) → 叙事引擎(两阶段生成)
↕ ↕
Gradio UI ← 状态管理器(校验 + 更新) ← 叙事引擎(文本 + 选项)
"""
import copy
from collections import Counter
import html
import json
import logging
from time import perf_counter
import gradio as gr
from state_manager import GameState
from nlu_engine import NLUEngine
from scene_assets import get_scene_image_path, IMAGE_DIR
from story_engine import StoryEngine
from telemetry import append_turn_log, create_session_metadata
from utils import logger
APP_UI_CSS = """
.story-chat {min-height: 500px;}
.status-panel {
font-family: "Microsoft YaHei UI", "Noto Sans SC", sans-serif;
font-size: 0.9em;
line-height: 1.5;
background: transparent !important;
border: none !important;
border-radius: 0 !important;
padding: 10px 12px !important;
box-shadow: none !important;
overflow: visible !important;
}
.status-panel > div,
.status-panel [class*="prose"],
.status-panel .markdown-body,
.status-panel [class*="wrap"] {
background: transparent !important;
border: none !important;
box-shadow: none !important;
padding: 0 !important;
overflow: visible !important;
}
.status-panel * {
word-break: break-word;
overflow-wrap: anywhere;
}
.option-btn {min-height: 50px !important;}
.side-action-btn,
.side-action-btn button {min-height: 50px !important;}
.backpack-btn button {
min-height: 50px !important;
background: #ffffff !important;
color: #0f172a !important;
border: 1px solid #d1d5db !important;
}
.scene-sidebar {gap: 12px;}
.scene-card {
border: 1px solid #e5e7eb !important;
border-radius: 12px !important;
background: #fcfcfd !important;
box-shadow: 0 4px 14px rgba(15, 23, 42, 0.04) !important;
}
.scene-image {
min-height: 260px;
padding: 10px !important;
}
.scene-image > div,
.scene-image img,
.scene-image button,
.scene-image [class*="image"],
.scene-image [class*="wrap"],
.scene-image [class*="frame"],
.scene-image [class*="preview"] {
border: none !important;
box-shadow: none !important;
background: transparent !important;
}
.scene-image img {
width: 100%;
height: 100%;
object-fit: contain !important;
border-radius: 10px;
padding: 4px;
background: #ffffff !important;
}
"""
# ============================================================
# 全局游戏实例(每个会话独立)
# ============================================================
# 使用 Gradio State 管理每个用户的游戏状态
# 这里先定义工厂函数
def create_new_game(player_name: str = "旅人") -> dict:
"""创建新游戏实例,返回包含所有引擎的字典"""
game_state = GameState(player_name=player_name)
nlu = NLUEngine(game_state)
story = StoryEngine(game_state, enable_rule_text_polish=True)
return {
"game_state": game_state,
"nlu": nlu,
"story": story,
"current_options": [],
"started": False,
**create_session_metadata(),
}
def _json_safe(value):
"""Convert nested values into JSON-serializable data for logs."""
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, dict):
return {str(key): _json_safe(val) for key, val in value.items()}
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
if hasattr(value, "model_dump"):
return _json_safe(value.model_dump())
return str(value)
def _build_state_snapshot(gs: GameState) -> dict:
"""Build a compact state snapshot for reproducible evaluation logs."""
active_quests = []
effective_stats = gs.get_effective_player_stats()
equipment_bonuses = gs.get_equipment_stat_bonuses()
environment_snapshot = gs.get_environment_snapshot(limit=3)
for quest in gs.world.quests.values():
if quest.status == "active":
active_quests.append(
{
"quest_id": quest.quest_id,
"title": quest.title,
"status": quest.status,
"objectives": _json_safe(quest.objectives),
}
)
return {
"turn": gs.turn,
"game_mode": gs.game_mode,
"location": gs.player.location,
"scene": gs.world.current_scene,
"day": gs.world.day_count,
"time_of_day": gs.world.time_of_day,
"weather": gs.world.weather,
"light_level": gs.world.light_level,
"environment": _json_safe(environment_snapshot),
"player": {
"name": gs.player.name,
"level": gs.player.level,
"hp": gs.player.hp,
"max_hp": gs.player.max_hp,
"mp": gs.player.mp,
"max_mp": gs.player.max_mp,
"attack": gs.player.attack,
"defense": gs.player.defense,
"speed": gs.player.speed,
"luck": gs.player.luck,
"perception": gs.player.perception,
"gold": gs.player.gold,
"morale": gs.player.morale,
"sanity": gs.player.sanity,
"hunger": gs.player.hunger,
"karma": gs.player.karma,
"effective_stats": _json_safe(effective_stats),
"equipment_bonuses": _json_safe(equipment_bonuses),
"inventory": list(gs.player.inventory),
"equipment": copy.deepcopy(gs.player.equipment),
"skills": list(gs.player.skills),
"status_effects": [effect.name for effect in gs.player.status_effects],
},
"active_quests": active_quests,
"event_log_size": len(gs.event_log),
}
def _record_interaction_log(
game_session: dict,
*,
input_source: str,
user_input: str,
intent_result: dict | None,
output_text: str,
latency_ms: float,
nlu_latency_ms: float | None = None,
generation_latency_ms: float | None = None,
final_result: dict | None = None,
selected_option: dict | None = None,
):
"""Append a structured interaction log without affecting gameplay."""
if not game_session or "game_state" not in game_session:
return
final_result = final_result or {}
telemetry = _json_safe(final_result.get("telemetry", {})) or {}
record = {
"input_source": input_source,
"user_input": user_input,
"selected_option": _json_safe(selected_option),
"nlu_result": _json_safe(intent_result),
"latency_ms": round(latency_ms, 2),
"nlu_latency_ms": None if nlu_latency_ms is None else round(nlu_latency_ms, 2),
"generation_latency_ms": None if generation_latency_ms is None else round(generation_latency_ms, 2),
"used_fallback": bool(telemetry.get("used_fallback", False)),
"fallback_reason": telemetry.get("fallback_reason"),
"engine_mode": telemetry.get("engine_mode"),
"state_changes": _json_safe(final_result.get("state_changes", {})),
"change_log": _json_safe(final_result.get("change_log", [])),
"consistency_issues": _json_safe(final_result.get("consistency_issues", [])),
"output_text": output_text,
"story_text": final_result.get("story_text"),
"options": _json_safe(final_result.get("options", game_session.get("current_options", []))),
"post_turn_snapshot": _build_state_snapshot(game_session["game_state"]),
}
try:
append_turn_log(game_session, record)
except Exception as exc:
logger.warning(f"Failed to append interaction log: {exc}")
def _build_option_intent(selected_option: dict) -> dict:
"""Represent button clicks in the same schema as free-text NLU output."""
option_text = selected_option.get("text", "")
return {
"intent": selected_option.get("action_type", "EXPLORE"),
"target": selected_option.get("target"),
"details": option_text,
"raw_input": option_text,
"parser_source": "option_click",
}
def _get_scene_image_value(gs: GameState) -> str | None:
focus_npc = getattr(gs, "last_interacted_npc", None)
return get_scene_image_path(gs, focus_npc=focus_npc)
def _get_scene_image_update(gs: GameState):
image_value = _get_scene_image_value(gs)
return gr.update(value=image_value, visible=bool(image_value))
def _build_map_graph_data(gs: GameState) -> dict:
"""基于已发现地点与连接关系构建地图拓扑数据。"""
world_locations = getattr(getattr(gs, "world", None), "locations", {}) or {}
discovered = list(getattr(getattr(gs, "world", None), "discovered_locations", []) or [])
history = list(getattr(gs, "location_history", []) or [])
current_location = str(getattr(gs, "current_location", None) or "").strip()
if not current_location:
current_location = str(getattr(getattr(gs, "player", None), "location", None) or "未知之地")
visible_set: set[str] = set(discovered) | set(history)
if current_location:
visible_set.add(current_location)
# 使用世界注册顺序保证地图输出稳定,便于玩家快速扫描。
ordered_nodes: list[str] = [name for name in world_locations.keys() if name in visible_set]
for name in discovered + history + [current_location]:
if name and name in visible_set and name not in ordered_nodes:
ordered_nodes.append(name)
visited_set = set(history)
if current_location:
visited_set.add(current_location)
adjacency: dict[str, list[str]] = {}
for node in ordered_nodes:
loc_info = world_locations.get(node)
if not loc_info:
adjacency[node] = []
continue
neighbors = []
for neighbor in list(getattr(loc_info, "connected_to", []) or []):
if neighbor in visible_set and neighbor != node:
neighbors.append(neighbor)
adjacency[node] = neighbors
node_state: dict[str, str] = {}
for node in ordered_nodes:
if node == current_location:
node_state[node] = "current"
elif node in visited_set:
node_state[node] = "visited"
else:
node_state[node] = "known"
return {
"current_location": current_location,
"nodes": ordered_nodes,
"adjacency": adjacency,
"node_state": node_state,
}
def _build_location_hover_text(gs: GameState, location_name: str) -> str:
"""构造地点 hover 提示:展示 NPC 与怪物。"""
world = getattr(gs, "world", None)
locations = getattr(world, "locations", {}) or {}
npcs = getattr(world, "npcs", {}) or {}
loc = locations.get(location_name)
if not loc:
return f"{location_name}\nNPC: 无\n怪物: 无"
npc_names: set[str] = set(getattr(loc, "npcs_present", []) or [])
for npc in npcs.values():
if getattr(npc, "location", None) == location_name and getattr(npc, "is_alive", True):
npc_names.add(getattr(npc, "name", ""))
npc_names = {name for name in npc_names if name}
enemy_names = [str(name) for name in list(getattr(loc, "enemies", []) or []) if str(name)]
npc_text = "、".join(sorted(npc_names)) if npc_names else "无"
enemy_text = "、".join(enemy_names) if enemy_names else "无"
return f"{location_name}\nNPC: {npc_text}\n怪物: {enemy_text}"
def _truncate_map_label(name: str, max_len: int = 8) -> str:
text = str(name or "")
return text if len(text) <= max_len else f"{text[:max_len]}..."
def _build_fixed_branch_layout(nodes: list[str], adjacency: dict[str, list[str]]) -> dict[str, tuple[int, int]]:
"""固定起点的分层布局:保持总体顺序稳定,同时展示分支。"""
if not nodes:
return {}
node_set = set(nodes)
layers: dict[str, int] = {}
def _bfs(seed: str, base_layer: int) -> None:
if seed not in node_set or seed in layers:
return
queue: list[str] = [seed]
layers[seed] = base_layer
cursor = 0
while cursor < len(queue):
node = queue[cursor]
cursor += 1
next_layer = layers[node] + 1
for nxt in adjacency.get(node, []):
if nxt in node_set and nxt not in layers:
layers[nxt] = next_layer
queue.append(nxt)
# 第一出现地点作为固定起点,避免因当前位置变化而重排。
_bfs(nodes[0], 0)
for name in nodes:
if name not in layers:
base = (max(layers.values()) + 1) if layers else 0
_bfs(name, base)
level_nodes: dict[int, list[str]] = {}
for name in nodes:
level = layers.get(name, 0)
level_nodes.setdefault(level, []).append(name)
positions: dict[str, tuple[int, int]] = {}
for col_idx, level in enumerate(sorted(level_nodes.keys())):
for row_idx, name in enumerate(level_nodes[level]):
positions[name] = (col_idx, row_idx)
return positions
def _render_text_map(gs: GameState | None) -> str:
"""拓扑地图:从左到右显示地点关系图。"""
if gs is None:
return "地图关系图\n(未开始)"
graph = _build_map_graph_data(gs)
nodes = graph["nodes"]
adjacency = graph["adjacency"]
node_state = graph["node_state"]
current_location = graph["current_location"]
if not nodes:
current = current_location or "未知之地"
return f"地图关系图\n当前位置:{current}"
positions = _build_fixed_branch_layout(nodes, adjacency)
if not positions:
return "地图关系图\n(暂无可显示节点)"
node_width = 110
node_height = 34
col_gap = 140
row_gap = 50
x_margin = 16
y_margin = 16
max_col = max(col for col, _ in positions.values())
max_row = max(row for _, row in positions.values())
canvas_width = x_margin * 2 + max_col * col_gap + node_width
canvas_height = y_margin * 2 + max_row * row_gap + node_height
canvas_height = max(canvas_height, 74)
node_boxes: dict[str, tuple[int, int, int, int]] = {}
centers: dict[str, tuple[int, int]] = {}
for name, (col, row) in positions.items():
x = x_margin + col * col_gap
y = y_margin + row * row_gap
node_boxes[name] = (x, y, node_width, node_height)
centers[name] = (x + node_width // 2, y + node_height // 2)
edge_pairs: set[tuple[str, str]] = set()
for source, neighbors in adjacency.items():
for target in neighbors:
if source in positions and target in positions and source != target:
edge_pairs.add(tuple(sorted((source, target))))
edge_svg: list[str] = []
def _segment_hits_box_horizontal(y: float, x_start: float, x_end: float, box: tuple[int, int, int, int]) -> bool:
bx, by, bw, bh = box
left = min(x_start, x_end)
right = max(x_start, x_end)
return (by + 1) <= y <= (by + bh - 1) and not (right <= bx + 1 or left >= bx + bw - 1)
def _segment_hits_box_vertical(x: float, y_start: float, y_end: float, box: tuple[int, int, int, int]) -> bool:
bx, by, bw, bh = box
top = min(y_start, y_end)
bottom = max(y_start, y_end)
return (bx + 1) <= x <= (bx + bw - 1) and not (bottom <= by + 1 or top >= by + bh - 1)
for source, target in sorted(edge_pairs):
sx, sy, sw, sh = node_boxes[source]
tx, ty, tw, th = node_boxes[target]
source_exit_x = sx + sw
source_exit_y = sy + sh / 2
target_entry_x = tx
target_entry_y = ty + th / 2
mid_x = (source_exit_x + target_entry_x) / 2
needs_detour = False
for name, box in node_boxes.items():
if name in {source, target}:
continue
if (
_segment_hits_box_horizontal(source_exit_y, source_exit_x, mid_x, box)
or _segment_hits_box_vertical(mid_x, source_exit_y, target_entry_y, box)
or _segment_hits_box_horizontal(target_entry_y, mid_x, target_entry_x, box)
):
needs_detour = True
break
if not needs_detour:
points = (
f"{source_exit_x},{source_exit_y} "
f"{mid_x},{source_exit_y} "
f"{mid_x},{target_entry_y} "
f"{target_entry_x},{target_entry_y}"
)
else:
# 局部上绕:仅在必要时走上方,减少杂乱感同时避免压到节点。
route_y = max(4, min(source_exit_y, target_entry_y) - node_height / 2 - 10)
route_left_x = source_exit_x + 6
route_right_x = target_entry_x - 6
points = (
f"{source_exit_x},{source_exit_y} "
f"{route_left_x},{source_exit_y} "
f"{route_left_x},{route_y} "
f"{route_right_x},{route_y} "
f"{route_right_x},{target_entry_y} "
f"{target_entry_x},{target_entry_y}"
)
edge_svg.append(
f"<polyline points='{points}' "
"fill='none' stroke='#94a3b8' stroke-width='1.7' "
"stroke-linecap='round' stroke-linejoin='round' />"
)
node_svg: list[str] = []
for name in nodes:
col, row = positions[name]
x = x_margin + col * col_gap
y = y_margin + row * row_gap
state = node_state.get(name, "known")
escaped_name = html.escape(_truncate_map_label(name))
hover_text = html.escape(_build_location_hover_text(gs, name))
if state == "current":
fill = "#fff7ed"
stroke = "#f97316"
text_color = "#9a3412"
stroke_width = 1.8
display_name = escaped_name
elif state == "visited":
fill = "#f1f5f9"
stroke = "#64748b"
text_color = "#334155"
stroke_width = 1.4
display_name = escaped_name
else:
fill = "#ffffff"
stroke = "#cbd5e1"
text_color = "#334155"
stroke_width = 1.2
display_name = escaped_name
rect_class_attr = " class='map-current-node'" if state == "current" else ""
node_svg.append(
"<g cursor='help'>"
f"<title>{hover_text}</title>"
f"<rect x='{x}' y='{y}' width='{node_width}' height='{node_height}' "
f"rx='8' ry='8' fill='{fill}' stroke='{stroke}' stroke-width='{stroke_width}'{rect_class_attr} />"
f"<text x='{x + node_width / 2}' y='{y + node_height / 2 + 5}' "
"text-anchor='middle' font-size='11.5' font-family='Microsoft YaHei UI, Noto Sans SC, sans-serif' "
f"fill='{text_color}'>{display_name}</text>"
"</g>"
)
svg = (
f"<svg width='{canvas_width}' height='{canvas_height}' viewBox='0 0 {canvas_width} {canvas_height}' "
"xmlns='http://www.w3.org/2000/svg'>"
"<style>"
"@keyframes mapNodePulse{"
"0%{fill:#fff7ed;stroke:#f97316;stroke-width:1.8;opacity:0.9;}"
"50%{fill:#fdba74;stroke:#c2410c;stroke-width:1.8;opacity:1;}"
"100%{fill:#fff7ed;stroke:#f97316;stroke-width:1.8;opacity:0.9;}"
"}"
".map-current-node{animation:mapNodePulse 1.2s ease-in-out infinite;}"
"</style>"
+ "".join(edge_svg)
+ "".join(node_svg)
+ "</svg>"
)
return (
"<div style='font-size:0.9em;'>"
"<details>"
"<summary style='cursor:pointer;font-weight:700;'>展开地图关系图</summary>"
"<div style='font-size:0.8em;color:#475569;margin:8px 0 6px 0;'>"
"鼠标悬停于地点格可查看NPC与怪物。"
"</div>"
"<div style='overflow-x:auto;padding-bottom:2px;'>"
+ svg
+ "</div>"
"</details>"
"</div>"
)
def restart_game() -> tuple:
"""
重启冒险:清空所有数据,回到初始输入名称阶段。
Returns:
(空聊天历史, 初始状态面板, 隐藏选项按钮×3, 空游戏会话,
禁用文本输入框, 重置角色名称)
"""
loading = _get_loading_button_updates()
return (
[], # 清空聊天历史
_format_world_info_panel(None), # 重置世界信息
"## 等待开始...\n\n请输入角色名称并点击「开始冒险」", # 重置状态面板
"地图关系图\n(未开始)", # 清空地图
gr.update(value=None, visible=False), # 清空场景图片
*loading, # 占位选项按钮
{}, # 清空游戏会话
gr.update(value="", interactive=False), # 禁用并清空文本输入
gr.update(value="旅人"), # 重置角色名称
)
# ============================================================
# 核心交互函数
# ============================================================
def start_game(player_name: str, game_session: dict):
"""
开始新游戏:流式生成开场叙事。
使用生成器 yield 实现流式输出,让用户看到文字逐步出现。
"""
if not player_name.strip():
player_name = "旅人"
# 创建新游戏
game_session = create_new_game(player_name)
game_session["started"] = True
# 初始 yield:显示加载状态,按钮保持可见但禁用
chat_history = [{"role": "assistant", "content": "⏳ 正在生成开场..."}]
world_info_text = _format_world_info_panel(game_session["game_state"])
status_text = _format_status_panel(game_session["game_state"])
loading = _get_loading_button_updates()
yield (
chat_history,
world_info_text,
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*loading,
game_session,
gr.update(interactive=False),
)
# 流式生成开场(选项仅在流结束后从 final 事件中提取,流式期间不解析选项)
turn_started = perf_counter()
story_text = ""
final_result = None
for update in game_session["story"].generate_opening_stream():
if update["type"] == "story_chunk":
story_text = update["text"]
chat_history[-1]["content"] = story_text
yield (
chat_history,
world_info_text,
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*loading,
game_session,
gr.update(interactive=False),
)
elif update["type"] == "final":
final_result = update
generation_latency_ms = (perf_counter() - turn_started) * 1000
# ★ 只在数据流完全结束后,从 final_result 中提取选项
if final_result:
story_text = final_result.get("story_text", story_text)
options = final_result.get("options", [])
else:
options = []
# ★ 安全兜底:强制确保恰好 3 个选项
options = _finalize_session_options(options)
# 最终 yield:显示完整文本 + 选项 + 启用按钮
game_session["current_options"] = options
full_message = story_text
if not final_result:
final_result = {
"story_text": story_text,
"options": options,
"state_changes": {},
"change_log": [],
"consistency_issues": [],
"telemetry": {
"engine_mode": "opening_app",
"used_fallback": True,
"fallback_reason": "missing_final_event",
},
}
chat_history[-1]["content"] = full_message
world_info_text = _format_world_info_panel(game_session["game_state"])
status_text = _format_status_panel(game_session["game_state"])
btn_updates = _get_button_updates(options)
_record_interaction_log(
game_session,
input_source="system_opening",
user_input="",
intent_result=None,
output_text=full_message,
latency_ms=generation_latency_ms,
generation_latency_ms=generation_latency_ms,
final_result=final_result,
)
yield (
chat_history,
world_info_text,
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*btn_updates,
game_session,
gr.update(interactive=True),
)
def process_user_input(user_input: str, chat_history: list, game_session: dict):
"""
处理用户文本输入(流式版本)。
流程:
1. NLU 引擎解析意图
2. 叙事引擎流式生成故事
3. 逐步更新 UI
"""
if not game_session or not game_session.get("started"):
chat_history = chat_history or []
chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"})
loading = _get_loading_button_updates()
yield (
chat_history,
_format_world_info_panel(None),
"",
"",
gr.update(value=None, visible=False),
*loading,
game_session,
)
return
if not user_input.strip():
btn_updates = _get_button_updates(game_session.get("current_options", []))
yield (
chat_history,
_format_world_info_panel(game_session["game_state"]),
_format_status_panel(game_session["game_state"]),
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*btn_updates,
game_session,
)
return
gs: GameState = game_session["game_state"]
nlu: NLUEngine = game_session["nlu"]
story: StoryEngine = game_session["story"]
turn_started = perf_counter()
# 检查游戏是否已结束
if gs.is_game_over():
chat_history.append({"role": "user", "content": user_input})
chat_history.append({"role": "assistant", "content": "游戏已结束。请点击「重新开始」按钮开始新的冒险。"})
restart_buttons = _get_button_updates(
[
{"id": 1, "text": "重新开始", "action_type": "RESTART"},
]
)
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*restart_buttons,
game_session,
)
return
# 1. NLU 解析
nlu_started = perf_counter()
intent = nlu.parse_intent(user_input)
nlu_latency_ms = (perf_counter() - nlu_started) * 1000
# 1.5 预校验:立即驳回违反一致性的操作(不调用 LLM,不消耗回合)
is_valid, rejection_msg = gs.pre_validate_action(intent)
if not is_valid:
chat_history.append({"role": "user", "content": user_input})
options = game_session.get("current_options", [])
options = _finalize_session_options(options)
rejection_content = (
f"⚠️ **行动被驳回**:{rejection_msg}\n\n"
f"请重新选择行动,或输入其他指令。"
)
chat_history.append({"role": "assistant", "content": rejection_content})
rejection_result = {
"story_text": rejection_content,
"options": options,
"state_changes": {},
"change_log": [],
"consistency_issues": [],
"telemetry": {
"engine_mode": "pre_validation",
"used_fallback": False,
"fallback_reason": None,
},
}
_record_interaction_log(
game_session,
input_source="text_input",
user_input=user_input,
intent_result=intent,
output_text=rejection_content,
latency_ms=(perf_counter() - turn_started) * 1000,
nlu_latency_ms=nlu_latency_ms,
generation_latency_ms=0.0,
final_result=rejection_result,
)
btn_updates = _get_button_updates(options)
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
return
# 2. 添加用户消息 + 空的 assistant 消息(用于流式填充)
chat_history.append({"role": "user", "content": user_input})
chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."})
# 按钮保持可见但禁用,防止流式期间点击
loading = _get_loading_button_updates(
max(len(game_session.get("current_options", [])), MIN_OPTION_BUTTONS)
)
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*loading,
game_session,
)
# 3. 流式生成故事
generation_started = perf_counter()
final_result = None
for update in story.generate_story_stream(intent):
if update["type"] == "story_chunk":
chat_history[-1]["content"] = update["text"]
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*loading,
game_session,
)
elif update["type"] == "final":
final_result = update
generation_latency_ms = (perf_counter() - generation_started) * 1000
# 4. 最终更新:完整文本 + 状态变化 + 选项 + 按钮
if final_result:
# ★ 安全兜底:强制确保恰好 3 个选项
options = _finalize_session_options(final_result.get("options", []))
game_session["current_options"] = options
change_log = final_result.get("change_log", [])
log_text = ""
if change_log:
log_text = "\n".join(f" {c}" for c in change_log)
log_text = f"\n\n**状态变化:**\n{log_text}"
issues = final_result.get("consistency_issues", [])
issues_text = ""
if issues:
issues_text = "\n".join(f" {i}" for i in issues)
issues_text = f"\n\n**一致性提示:**\n{issues_text}"
full_message = f"{final_result['story_text']}{log_text}{issues_text}"
chat_history[-1]["content"] = full_message
status_text = _format_status_panel(gs)
btn_updates = _get_button_updates(options)
_record_interaction_log(
game_session,
input_source="text_input",
user_input=user_input,
intent_result=intent,
output_text=full_message,
latency_ms=(perf_counter() - turn_started) * 1000,
nlu_latency_ms=nlu_latency_ms,
generation_latency_ms=generation_latency_ms,
final_result=final_result,
)
yield (
chat_history,
_format_world_info_panel(gs),
status_text,
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
else:
# ★ 兜底:final_result 为空,说明流式生成未产生 final 事件
logger.warning("流式生成未产生 final 事件,使用兜底文本")
fallback_text = "你环顾四周,思考着接下来该做什么..."
fallback_options = _finalize_session_options([])
game_session["current_options"] = fallback_options
full_message = fallback_text
fallback_result = {
"story_text": fallback_text,
"options": fallback_options,
"state_changes": {},
"change_log": [],
"consistency_issues": [],
"telemetry": {
"engine_mode": "app_fallback",
"used_fallback": True,
"fallback_reason": "missing_final_event",
},
}
chat_history[-1]["content"] = full_message
status_text = _format_status_panel(gs)
btn_updates = _get_button_updates(fallback_options)
_record_interaction_log(
game_session,
input_source="text_input",
user_input=user_input,
intent_result=intent,
output_text=full_message,
latency_ms=(perf_counter() - turn_started) * 1000,
nlu_latency_ms=nlu_latency_ms,
generation_latency_ms=generation_latency_ms,
final_result=fallback_result,
)
yield (
chat_history,
_format_world_info_panel(gs),
status_text,
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
return
def process_option_click(option_idx: int, chat_history: list, game_session: dict):
"""
处理玩家点击选项按钮(流式版本)。
Args:
option_idx: 选项索引 (0-5)
"""
if not game_session or not game_session.get("started"):
chat_history = chat_history or []
chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"})
loading = _get_loading_button_updates()
yield (
chat_history,
_format_world_info_panel(None),
"",
"",
gr.update(value=None, visible=False),
*loading,
game_session,
)
return
options = game_session.get("current_options", [])
if option_idx >= len(options):
btn_updates = _get_button_updates(options)
yield (
chat_history,
_format_world_info_panel(game_session["game_state"]),
_format_status_panel(game_session["game_state"]),
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*btn_updates,
game_session,
)
return
selected_option = options[option_idx]
gs: GameState = game_session["game_state"]
story: StoryEngine = game_session["story"]
option_intent = _build_option_intent(selected_option)
turn_started = perf_counter()
# 检查特殊选项:退出背包(不消耗回合)
if selected_option.get("action_type") == "BACKPACK_EXIT":
chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"})
chat_history.append({"role": "assistant", "content": "你合上背包,把注意力重新放回当前局势。"})
restored_options = game_session.pop("backpack_return_options", None)
if not isinstance(restored_options, list):
restored_options = _finalize_session_options([])
game_session["current_options"] = restored_options
btn_updates = _get_button_updates(restored_options)
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
return
from_backpack_menu = str(selected_option.get("menu", "")) == "backpack"
# 检查特殊选项:重新开始
if selected_option.get("action_type") == "RESTART":
# 重新开始时使用流式开场
game_session = create_new_game(gs.player.name)
game_session["started"] = True
chat_history = [{"role": "assistant", "content": "⏳ 正在重新生成开场..."}]
world_info_text = _format_world_info_panel(game_session["game_state"])
status_text = _format_status_panel(game_session["game_state"])
loading = _get_loading_button_updates()
yield (
chat_history,
world_info_text,
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*loading,
game_session,
)
story_text = ""
restart_final = None
for update in game_session["story"].generate_opening_stream():
if update["type"] == "story_chunk":
story_text = update["text"]
chat_history[-1]["content"] = story_text
yield (
chat_history,
world_info_text,
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*loading,
game_session,
)
elif update["type"] == "final":
restart_final = update
# ★ 只在流完全结束后提取选项
if restart_final:
story_text = restart_final.get("story_text", story_text)
restart_options = restart_final.get("options", [])
else:
restart_options = []
# ★ 安全兜底:强制确保恰好 3 个选项
restart_options = _finalize_session_options(restart_options)
game_session["current_options"] = restart_options
full_message = story_text
chat_history[-1]["content"] = full_message
status_text = _format_status_panel(game_session["game_state"])
btn_updates = _get_button_updates(restart_options)
yield (
chat_history,
_format_world_info_panel(game_session["game_state"]),
status_text,
_render_text_map(game_session["game_state"]),
_get_scene_image_update(game_session["game_state"]),
*btn_updates,
game_session,
)
return
# 检查特殊选项:退出
if selected_option.get("action_type") == "QUIT":
chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"})
chat_history.append({"role": "assistant", "content": "感谢游玩 StoryWeaver!\n你的冒险到此结束,但故事永远不会真正终结...\n\n点击「开始冒险」可以重新开始。"})
quit_buttons = _get_button_updates(
[
{"id": 1, "text": "重新开始", "action_type": "RESTART"},
]
)
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*quit_buttons,
game_session,
)
return
# 正常选项处理:流式生成
chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"})
chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."})
# 按钮保持可见但禁用
loading = _get_loading_button_updates(max(len(options), MIN_OPTION_BUTTONS))
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*loading,
game_session,
)
generation_started = perf_counter()
final_result = None
for update in story.process_option_selection_stream(selected_option):
if update["type"] == "story_chunk":
chat_history[-1]["content"] = update["text"]
yield (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*loading,
game_session,
)
elif update["type"] == "final":
final_result = update
generation_latency_ms = (perf_counter() - generation_started) * 1000
if final_result:
# 背包菜单内执行使用/装备后,继续停留在背包菜单
if from_backpack_menu:
options = _build_backpack_options(gs)
else:
options = _finalize_session_options(final_result.get("options", []))
game_session["current_options"] = options
change_log = final_result.get("change_log", [])
log_text = ""
if change_log:
log_text = "\n".join(f" {c}" for c in change_log)
log_text = f"\n\n**状态变化:**\n{log_text}"
full_message = f"{final_result['story_text']}{log_text}"
chat_history[-1]["content"] = full_message
status_text = _format_status_panel(gs)
btn_updates = _get_button_updates(options)
_record_interaction_log(
game_session,
input_source="option_click",
user_input=selected_option.get("text", ""),
intent_result=option_intent,
output_text=full_message,
latency_ms=(perf_counter() - turn_started) * 1000,
generation_latency_ms=generation_latency_ms,
final_result=final_result,
selected_option=selected_option,
)
yield (
chat_history,
_format_world_info_panel(gs),
status_text,
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
else:
# ★ 兜底:final_result 为空,说明流式生成未产生 final 事件
logger.warning("[选项点击] 流式生成未产生 final 事件,使用兜底文本")
if from_backpack_menu:
fallback_text = "你整理了一下背包,却一时没想好先使用哪件物品。"
fallback_options = _build_backpack_options(gs)
else:
fallback_text = "你环顾四周,思考着接下来该做什么..."
fallback_options = _finalize_session_options([])
game_session["current_options"] = fallback_options
full_message = fallback_text
fallback_result = {
"story_text": fallback_text,
"options": fallback_options,
"state_changes": {},
"change_log": [],
"consistency_issues": [],
"telemetry": {
"engine_mode": "app_fallback",
"used_fallback": True,
"fallback_reason": "missing_final_event",
},
}
chat_history[-1]["content"] = full_message
status_text = _format_status_panel(gs)
btn_updates = _get_button_updates(fallback_options)
_record_interaction_log(
game_session,
input_source="option_click",
user_input=selected_option.get("text", ""),
intent_result=option_intent,
output_text=full_message,
latency_ms=(perf_counter() - turn_started) * 1000,
generation_latency_ms=generation_latency_ms,
final_result=fallback_result,
selected_option=selected_option,
)
yield (
chat_history,
_format_world_info_panel(gs),
status_text,
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
return
# ============================================================
# UI 辅助函数
# ============================================================
MIN_OPTION_BUTTONS = 3
MAX_OPTION_BUTTONS = 6
# 兜底默认选项(当解析出的选项为空时使用)
_FALLBACK_BUTTON_OPTIONS = [
{"id": 1, "text": "查看周围", "action_type": "EXPLORE"},
{"id": 2, "text": "等待一会", "action_type": "REST"},
{"id": 3, "text": "检查状态", "action_type": "EXPLORE"},
]
def _normalize_options(
options: list[dict],
*,
minimum: int = 0,
maximum: int = MAX_OPTION_BUTTONS,
) -> list[dict]:
"""
规范化选项列表:
- 至多保留 maximum 个选项
- 仅当 minimum > 0 时补充兜底项
- 始终重新编号
"""
if not isinstance(options, list):
options = []
normalized = [opt for opt in options if isinstance(opt, dict)][:maximum]
for fb in _FALLBACK_BUTTON_OPTIONS:
if len(normalized) >= minimum:
break
if not any(o.get("text") == fb["text"] for o in normalized):
normalized.append(fb.copy())
while len(normalized) < minimum:
normalized.append({
"id": len(normalized) + 1,
"text": "继续探索",
"action_type": "EXPLORE",
})
for i, opt in enumerate(normalized[:maximum], 1):
if isinstance(opt, dict):
opt["id"] = i
return normalized[:maximum]
def _finalize_session_options(options: list[dict]) -> list[dict]:
minimum = MIN_OPTION_BUTTONS if not options else 0
return _normalize_options(options, minimum=minimum)
def _format_options(options: list[dict]) -> str:
"""将选项列表格式化为可读的文本(纯文字,绝不显示 JSON)"""
if not options:
return ""
lines = ["---", "**你的选择:**"]
for i, opt in enumerate(options):
# 安全提取:兼容 dict 和异常情况
if isinstance(opt, dict):
idx = opt.get("id", i + 1)
text = opt.get("text", "未知选项")
else:
idx = i + 1
text = str(opt)
lines.append(f" **[{idx}]** {text}")
return "\n".join(lines)
def _is_backpack_menu_active(options: list[dict]) -> bool:
return any(
isinstance(opt, dict)
and str(opt.get("action_type", "")).upper() == "BACKPACK_EXIT"
and str(opt.get("menu", "")) == "backpack"
for opt in (options or [])
)
def _format_item_function(item_info) -> str:
if item_info is None:
return "功能未知"
if item_info.use_effect:
return f"效果:{item_info.use_effect}"
if item_info.stat_bonus:
bonus_text = ",".join(
f"{stat}{'+' if int(value) >= 0 else ''}{int(value)}"
for stat, value in item_info.stat_bonus.items()
)
return f"装备加成:{bonus_text}"
if item_info.lore_text:
return f"线索:{item_info.lore_text}"
return "暂无可用效果"
def _build_backpack_options(gs: GameState) -> list[dict]:
inventory = list(gs.player.inventory)
if not inventory:
return [
{"id": 1, "text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"},
]
inventory_order = list(dict.fromkeys(inventory))
equip_types = {"weapon", "armor", "accessory", "helmet", "boots"}
consumable_options: list[dict] = []
equip_options: list[dict] = []
for item_name in inventory_order:
item_info = gs.world.item_registry.get(item_name)
if item_info and gs.is_item_consumable(item_name):
consumable_options.append(
{
"text": f"使用{item_name}",
"action_type": "USE_ITEM",
"target": item_name,
"menu": "backpack",
}
)
continue
if item_info and item_info.item_type in equip_types:
equip_options.append(
{
"text": f"装备{item_name}",
"action_type": "EQUIP",
"target": item_name,
"menu": "backpack",
}
)
max_action_slots = MAX_OPTION_BUTTONS - 1
merged_actions = (consumable_options + equip_options)[:max_action_slots]
merged_actions.append(
{"text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"}
)
return _normalize_options(merged_actions, minimum=0, maximum=MAX_OPTION_BUTTONS)
def _format_backpack_story(gs: GameState) -> str:
inventory = list(gs.player.inventory)
if not inventory:
return "你打开背包,里面空空如也。"
inventory_counter = Counter(inventory)
inventory_order = list(dict.fromkeys(inventory))
lines = ["你打开背包,快速检查随身物资:"]
for item_name in inventory_order:
item_info = gs.world.item_registry.get(item_name)
quantity = inventory_counter.get(item_name, 1)
quantity_text = f"x{quantity} " if quantity > 1 else ""
description = item_info.description if item_info else "暂无描述"
function_text = _format_item_function(item_info)
lines.append(f"- {quantity_text}**{item_name}**:{description}{function_text})")
lines.append("\n你可以直接在下方选择“使用/装备”对应物品,或退出背包。")
return "\n".join(lines)
def open_backpack(chat_history: list, game_session: dict):
chat_history = chat_history or []
if not game_session or not game_session.get("started"):
chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"})
loading = _get_loading_button_updates()
return (
chat_history,
_format_world_info_panel(None),
"",
"",
gr.update(value=None, visible=False),
*loading,
game_session,
)
gs: GameState = game_session["game_state"]
current_options = game_session.get("current_options", [])
if not _is_backpack_menu_active(current_options):
game_session["backpack_return_options"] = copy.deepcopy(current_options)
backpack_story = _format_backpack_story(gs)
backpack_options = _build_backpack_options(gs)
game_session["current_options"] = backpack_options
chat_history.append({"role": "user", "content": "打开背包"})
chat_history.append({"role": "assistant", "content": backpack_story})
_record_interaction_log(
game_session,
input_source="backpack_button",
user_input="打开背包",
intent_result={"intent": "OPEN_BACKPACK", "target": None},
output_text=backpack_story,
latency_ms=0.0,
generation_latency_ms=0.0,
final_result={
"story_text": backpack_story,
"options": backpack_options,
"state_changes": {},
"change_log": [],
"consistency_issues": [],
"telemetry": {
"engine_mode": "backpack_menu",
"used_fallback": False,
"fallback_reason": None,
},
},
)
btn_updates = _get_button_updates(backpack_options)
return (
chat_history,
_format_world_info_panel(gs),
_format_status_panel(gs),
_render_text_map(gs),
_get_scene_image_update(gs),
*btn_updates,
game_session,
)
def _get_loading_button_updates(visible_count: int = MIN_OPTION_BUTTONS) -> list:
"""返回加载中占位按钮更新,支持最多 6 个选项槽位。"""
visible_count = max(0, min(int(visible_count or 0), MAX_OPTION_BUTTONS))
updates = []
for index in range(MAX_OPTION_BUTTONS):
updates.append(
gr.update(
value="...",
visible=index < visible_count,
interactive=False,
)
)
return updates
def _get_button_updates(options: list[dict]) -> list:
"""从选项列表生成按钮更新,始终返回 6 个槽位。"""
options = _normalize_options(options, minimum=0)
updates = []
for i in range(MAX_OPTION_BUTTONS):
opt = options[i] if i < len(options) else None
if isinstance(opt, dict):
text = opt.get("text", "...")
visible = True
else:
text = "..."
visible = False
updates.append(gr.update(value=text, visible=visible, interactive=visible))
return updates
def _format_status_panel(gs: GameState) -> str:
"""格式化状态面板文本(双列 HTML 布局,减少滚动)"""
p = gs.player
w = gs.world
effective_stats = gs.get_effective_player_stats()
equipment_bonuses = gs.get_equipment_stat_bonuses()
env_snapshot = gs.get_environment_snapshot(limit=3)
survival_snapshot = gs.get_survival_state_snapshot()
scene_summary = gs.get_scene_summary().replace("\n", "<br>")
clock_display = gs.get_clock_display()
# 属性进度条
hp_bar = _progress_bar(p.hp, p.max_hp, "HP")
mp_bar = _progress_bar(p.mp, p.max_mp, "MP")
stamina_bar = _progress_bar(p.stamina, p.max_stamina, "体力")
hunger_bar = _progress_bar(p.hunger, 100, "饱食")
sanity_bar = _progress_bar(p.sanity, 100, "理智")
morale_bar = _progress_bar(p.morale, 100, "士气")
# 装备
slot_names = {
"weapon": "武器", "armor": "护甲", "accessory": "饰品",
"helmet": "头盔", "boots": "靴子",
}
equip_lines = []
for slot, item in p.equipment.items():
equip_lines.append(f"{slot_names.get(slot, slot)}: {item or '无'}")
equip_text = "<br>".join(equip_lines)
def render_stat(stat_key: str, label: str) -> str:
base_value = int(getattr(p, stat_key))
bonus_value = int(equipment_bonuses.get(stat_key, 0))
effective_value = int(effective_stats.get(stat_key, base_value))
if bonus_value > 0:
return f"{label}: {effective_value} <span style='color:#4a6;'>(+{bonus_value} 装备)</span>"
if bonus_value < 0:
return f"{label}: {effective_value} <span style='color:#b44;'>({bonus_value} 装备)</span>"
return f"{label}: {base_value}"
def badge(text: str, bg: str, fg: str = "#1f2937") -> str:
return (
f"<span style='display:inline-block;margin:0 6px 6px 0;padding:3px 10px;"
f"border-radius:999px;background:{bg};color:{fg};font-size:0.8em;"
f"font-weight:600;'>{text}</span>"
)
# 状态效果
if p.status_effects:
effect_lines = "<br>".join(
f"{e.name}{e.duration}回合)" for e in p.status_effects
)
else:
effect_lines = "无"
# 背包
if p.inventory:
inventory_text = "<br>".join(p.inventory)
else:
inventory_text = "空"
weather_colors = {
"晴朗": "#fef3c7",
"多云": "#e5e7eb",
"小雨": "#dbeafe",
"浓雾": "#e0e7ff",
"暴风雨": "#c7d2fe",
"大雪": "#f3f4f6",
}
light_colors = {
"明亮": "#fde68a",
"柔和": "#fcd34d",
"昏暗": "#cbd5e1",
"幽暗": "#94a3b8",
"漆黑": "#334155",
}
danger_level = int(env_snapshot.get("danger_level", 0))
if danger_level >= 7:
danger_badge = badge(f"危险 {danger_level}/10", "#fecaca", "#7f1d1d")
elif danger_level >= 4:
danger_badge = badge(f"危险 {danger_level}/10", "#fed7aa", "#9a3412")
else:
danger_badge = badge(f"危险 {danger_level}/10", "#dcfce7", "#166534")
env_badges = "".join(
[
badge(f"天气 {w.weather}", weather_colors.get(w.weather, "#e5e7eb")),
badge(
f"光照 {w.light_level}",
light_colors.get(w.light_level, "#e5e7eb"),
"#0f172a" if w.light_level not in {"幽暗", "漆黑"} else "#f8fafc",
),
danger_badge,
badge(f"场景 {env_snapshot.get('location_type', 'unknown')}", "#ede9fe", "#4c1d95"),
]
)
recent_env_events = env_snapshot.get("recent_events", [])
if recent_env_events:
latest_event = recent_env_events[-1]
latest_event_html = (
f"<div style='padding:8px 10px;border-radius:10px;background:#f8fafc;"
f"border:1px solid #dbeafe;margin-bottom:6px;'>"
f"<b>{latest_event.get('title', '环境事件')}</b>"
f"<br><span style='font-size:0.82em;color:#475569;'>{latest_event.get('description', '')}</span>"
f"</div>"
)
recent_event_lines = "<br>".join(
f"- {event.get('title', '环境事件')}"
for event in reversed(recent_env_events[-3:])
)
else:
latest_event_html = (
"<div style='padding:8px 10px;border-radius:10px;background:#f8fafc;"
"border:1px dashed #cbd5e1;color:#64748b;'>本回合暂无显式环境事件</div>"
)
recent_event_lines = "无"
# 活跃任务(完整展示:描述、子目标、奖励、来源)
active_quests = [q for q in w.quests.values() if q.status == "active"]
if active_quests:
quest_blocks = []
for q in active_quests:
done = sum(1 for v in q.objectives.values() if v)
total = len(q.objectives)
tag = "主线" if q.quest_type == "main" else "支线" if q.quest_type == "side" else "🟡 " + q.quest_type
# 子目标列表
obj_lines = "".join(
f"<br>&nbsp;&nbsp;{'✅' if v else '⬜'} {k}"
for k, v in q.objectives.items()
)
# 奖励摘要
reward_parts = []
if q.rewards.gold:
reward_parts.append(f"{q.rewards.gold}💰")
if q.rewards.experience:
reward_parts.append(f"{q.rewards.experience}经验")
if q.rewards.items:
reward_parts.append("、".join(q.rewards.items))
if q.rewards.unlock_skill:
reward_parts.append(f"技能:{q.rewards.unlock_skill}")
if q.rewards.title:
reward_parts.append(f"称号:{q.rewards.title}")
reward_str = " | ".join(reward_parts) if reward_parts else "无"
block = (
f"<details open><summary><b>{tag} {q.title}</b>({done}/{total})</summary>"
f"<span style='font-size:0.9em;color:#666;'>来源: {q.giver_npc or '未知'}</span><br>"
f"<span style='font-size:0.9em;'>{q.description}</span>"
f"{obj_lines}"
f"<br><span style='font-size:0.9em;color:#2f7a4a;'>奖励: {reward_str}</span>"
f"</details>"
)
quest_blocks.append(block)
quest_text = "".join(quest_blocks)
else:
quest_text = "无活跃任务"
# 使用 HTML 双列布局
status = f"""<div style="font-size:0.9em;">
<h3 style="margin:0 0 4px 0;text-align:center;">{p.name}{p.title}</h3>
<p style="text-align:center;margin:2px 0 6px 0;">等级 {p.level} | 经验 {p.experience}/{p.exp_to_next_level}</p>
<div style="display:grid;grid-template-columns:1fr 1fr;gap:6px 12px;">
<div>
<h4 style="margin:4px 0 2px 0;">🩸 生命与状态</h4>
<span style="font-size:0.85em;">
{hp_bar}<br>
{mp_bar}<br>
{stamina_bar}<br>
{hunger_bar}<br>
{sanity_bar}<br>
{morale_bar}
</span>
</div>
<div>
<h4 style="margin:4px 0 2px 0;">🎒 背包</h4>
<span style="font-size:0.85em;">
{inventory_text}
</span>
</div>
<div>
<h4 style="margin:4px 0 2px 0;">⚔️ 战斗属性</h4>
<span style="font-size:0.85em;">
{render_stat("attack", "攻击")}<br>
{render_stat("defense", "防御")}<br>
{render_stat("speed", "速度")}<br>
{render_stat("luck", "幸运")}<br>
{render_stat("perception", "感知")}
</span>
</div>
<div>
<h4 style="margin:4px 0 2px 0;">🛡️ 装备</h4>
<span style="font-size:0.85em;">
{equip_text}
</span>
</div>
<div>
<h4 style="margin:4px 0 2px 0;">💰 资源</h4>
<span style="font-size:0.85em;">
金币: {p.gold}<br>
善恶值: {p.karma}
</span>
</div>
<div>
<h4 style="margin:4px 0 2px 0;">✨ 状态效果</h4>
<span style="font-size:0.85em;">
{effect_lines}
</span>
</div>
<div style="grid-column: 1 / -1;">
<h4 style="margin:4px 0 2px 0;">📜 任务</h4>
<span style="font-size:0.9em;line-height:1.55;">
{quest_text}
</span>
</div>
<div style="grid-column: 1 / -1;">
<h4 style="margin:4px 0 2px 0;">🧭 当前场景信息</h4>
<div style="font-size:0.85em;line-height:1.5;padding:8px 10px;border-radius:12px;background:#fff7ed;border:1px solid #fed7aa;">
{env_badges}
<div style="margin:6px 0 8px 0;color:#475569;">
时间 {clock_display} | 场景 {w.current_scene} | 状态系数 {survival_snapshot.get('combined_multiplier', 1.0)}
</div>
{latest_event_html}
<div style="margin-top:8px;">{scene_summary}</div>
<div style="margin-top:6px;color:#475569;">最近环境事件: {recent_event_lines}</div>
</div>
</div>
</div>
</div>"""
return status
def _format_world_info_panel(gs: GameState | None) -> str:
"""格式化世界信息面板(放在故事框上方)。"""
if gs is None:
return "🌍 **世界信息**:未开始冒险"
w = gs.world
if hasattr(gs, "get_clock_display"):
clock_display = gs.get_clock_display()
else:
minute_of_day = int(getattr(w, "time_progress_units", 0)) * 10 % (24 * 60)
clock_display = f"{minute_of_day // 60:02d}:{minute_of_day % 60:02d}"
current_scene = getattr(w, "current_scene", "未知地点")
day_count = getattr(w, "day_count", 1)
time_of_day = getattr(w, "time_of_day", "未知时段")
weather = getattr(w, "weather", "未知")
light_level = getattr(w, "light_level", "未知")
season = getattr(w, "season", "未知")
return (
"🌍 **世界信息**:"
f"位置 {current_scene} | "
f"第{day_count}{time_of_day}{clock_display}) | "
f"天气 {weather} | 光照 {light_level} | "
f"季节 {season} | 回合 {getattr(gs, 'turn', 0)}"
)
def _progress_bar(current: int, maximum: int, label: str, length: int = 10) -> str:
"""生成 HTML 进度条(数值单独一行,低于 40% 高亮红色)"""
ratio = current / maximum if maximum > 0 else 0
filled = int(ratio * length)
empty = length - filled
bar = "█" * filled + "░" * empty
value_color = "#b91c1c" if ratio < 0.4 else "#0f172a"
return (
f"{label}: <span style='font-family:monospace;'>{bar}</span>"
f"<br><span style='color:{value_color};font-weight:600;'>{current}/{maximum}</span>"
)
# ============================================================
# Gradio 界面构建
# ============================================================
def build_app() -> gr.Blocks:
"""构建 Gradio 界面"""
with gr.Blocks(
title="StoryWeaver - 交互式叙事系统",
) as app:
app.css = APP_UI_CSS
gr.Markdown(
"""
# StoryWeaver — 交互式叙事系统
*基于 AI 的动态分支剧情 RPG 体验*
"""
)
# 游戏会话状态(Gradio State)
game_session = gr.State(value={})
with gr.Row():
# ==================
# 左侧:聊天区域
# ==================
with gr.Column(scale=10):
# 玩家姓名输入 + 开始按钮
with gr.Row():
player_name_input = gr.Textbox(
label="角色名称",
placeholder="输入你的角色名称(默认: 旅人)",
value="旅人",
scale=3,
)
start_btn = gr.Button(
"开始冒险",
variant="primary",
scale=2,
)
restart_btn = gr.Button(
"重启冒险",
variant="stop",
scale=2,
)
world_info_panel = gr.Markdown(
value=_format_world_info_panel(None),
)
# 聊天窗口
chatbot = gr.Chatbot(
label="故事",
height=480,
)
location_map_panel = gr.Markdown(
elem_classes=["scene-card", "status-panel"],
value="地图关系图\n(未开始)",
label="地图",
)
# 选项按钮(最多 6 个,分两行显示)
with gr.Column():
with gr.Row():
option_btn_1 = gr.Button(
"...",
visible=True,
interactive=False,
elem_classes=["option-btn"],
)
option_btn_2 = gr.Button(
"...",
visible=True,
interactive=False,
elem_classes=["option-btn"],
)
option_btn_3 = gr.Button(
"...",
visible=True,
interactive=False,
elem_classes=["option-btn"],
)
with gr.Row():
option_btn_4 = gr.Button(
"...",
visible=False,
interactive=False,
elem_classes=["option-btn"],
)
option_btn_5 = gr.Button(
"...",
visible=False,
interactive=False,
elem_classes=["option-btn"],
)
option_btn_6 = gr.Button(
"...",
visible=False,
interactive=False,
elem_classes=["option-btn"],
)
option_buttons = [
option_btn_1,
option_btn_2,
option_btn_3,
option_btn_4,
option_btn_5,
option_btn_6,
]
# 自由输入
with gr.Row():
user_input = gr.Textbox(
label="自由输入(也可以直接点击上方选项)",
placeholder="输入你想做的事情,例如:和村长说话、攻击哥布林、搜索这个区域...",
scale=5,
interactive=False,
)
with gr.Column(scale=1):
send_btn = gr.Button("发送", variant="primary", elem_classes=["side-action-btn"])
open_backpack_btn = gr.Button(
"打开背包",
variant="secondary",
elem_classes=["side-action-btn", "backpack-btn"],
)
# ==================
# 右侧:状态面板
# ==================
with gr.Column(scale=2, min_width=320, elem_classes=["scene-sidebar"]):
scene_image = gr.Image(
value=None,
type="filepath",
label="场景画面",
show_label=False,
container=False,
interactive=False,
height=260,
visible=False,
elem_classes=["scene-card", "scene-image"],
)
status_panel = gr.Markdown(
elem_classes=["scene-card", "status-panel"],
value="## 等待开始...\n\n请输入角色名称并点击「开始冒险」",
label="角色状态",
)
# ============================================================
# 事件绑定
# ============================================================
# 开始游戏
start_btn.click(
fn=start_game,
inputs=[player_name_input, game_session],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session, user_input,
],
)
# 重启冒险
restart_btn.click(
fn=restart_game,
inputs=[],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session, user_input, player_name_input,
],
)
# 文本输入发送
send_btn.click(
fn=process_user_input,
inputs=[user_input, chatbot, game_session],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session,
],
).then(
fn=lambda: "",
outputs=[user_input],
)
# 打开背包(常驻按钮)
open_backpack_btn.click(
fn=open_backpack,
inputs=[chatbot, game_session],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session,
],
)
# 回车发送
user_input.submit(
fn=process_user_input,
inputs=[user_input, chatbot, game_session],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session,
],
).then(
fn=lambda: "",
outputs=[user_input],
)
# 选项按钮点击(需要使用 yield from 的生成器包装函数,
# 使 Gradio 能正确识别为流式输出)
def _make_option_click_handler(index: int):
def _handler(ch, gs):
yield from process_option_click(index, ch, gs)
return _handler
for index, option_button in enumerate(option_buttons):
option_button.click(
fn=_make_option_click_handler(index),
inputs=[chatbot, game_session],
outputs=[
chatbot, world_info_panel, status_panel, location_map_panel, scene_image,
*option_buttons,
game_session,
],
)
return app
# ============================================================
# 启动入口
# ============================================================
if __name__ == "__main__":
logger.info("启动 StoryWeaver 交互式叙事系统...")
app = build_app()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
theme=gr.themes.Soft(
primary_hue="emerald",
secondary_hue="blue",
),
css=APP_UI_CSS,
allowed_paths=[str(IMAGE_DIR)],
)