Spaces:
Sleeping
Sleeping
| """ | |
| DesignState 解析与差分(用于"多轮可控迭代")。 | |
| 约定: | |
| - 模型需在回答中输出一个 ```json ...``` 代码块作为 DesignState | |
| - 该 JSON 应包含 new_variant_name / mechanics 等关键字段 | |
| """ | |
| import json | |
| import re | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # ==================== 版本历史管理 ==================== | |
| class DesignStateHistory: | |
| """ | |
| 管理 DesignState 的版本历史,支持回滚 | |
| 每个版本包含: | |
| - version: 版本号 | |
| - state_obj: DesignState 字典 | |
| - state_raw: 原始 JSON 字符串 | |
| - timestamp: 时间戳 | |
| - summary: 紧凑版摘要 | |
| """ | |
| def __init__(self, max_versions: int = 20): | |
| self.max_versions = max_versions | |
| self.versions: List[Dict[str, Any]] = [] | |
| self.current_index: int = -1 # 当前版本在 versions 中的索引 | |
| def add_version( | |
| self, | |
| version: int, | |
| state_obj: Dict[str, Any], | |
| state_raw: str, | |
| summary: str = "" | |
| ) -> None: | |
| """添加新版本""" | |
| if not state_obj: | |
| return | |
| record = { | |
| "version": version, | |
| "state_obj": state_obj.copy() if state_obj else {}, | |
| "state_raw": state_raw, | |
| "timestamp": datetime.now().strftime("%H:%M:%S"), | |
| "summary": summary or self._generate_summary(state_obj), | |
| } | |
| # 如果当前不在最新位置,截断后面的版本(类似 git 的分支) | |
| if self.current_index >= 0 and self.current_index < len(self.versions) - 1: | |
| self.versions = self.versions[:self.current_index + 1] | |
| self.versions.append(record) | |
| self.current_index = len(self.versions) - 1 | |
| # 超出最大版本数时,删除最早的 | |
| if len(self.versions) > self.max_versions: | |
| self.versions.pop(0) | |
| self.current_index = len(self.versions) - 1 | |
| def _generate_summary(self, state_obj: Dict[str, Any]) -> str: | |
| """生成简短摘要""" | |
| name = state_obj.get("new_variant_name", "(未命名)") | |
| mechanics = state_obj.get("mechanics", []) | |
| mech_count = len(mechanics) if isinstance(mechanics, list) else 0 | |
| return f"{name} ({mech_count}个机制)" | |
| def get_version(self, version: int) -> Optional[Dict[str, Any]]: | |
| """获取指定版本""" | |
| for record in self.versions: | |
| if record["version"] == version: | |
| return record | |
| return None | |
| def get_version_by_index(self, index: int) -> Optional[Dict[str, Any]]: | |
| """通过索引获取版本""" | |
| if 0 <= index < len(self.versions): | |
| return self.versions[index] | |
| return None | |
| def rollback_to(self, version: int) -> Optional[Tuple[Dict[str, Any], str]]: | |
| """ | |
| 回滚到指定版本 | |
| 返回:(state_obj, state_raw) 或 None | |
| """ | |
| for i, record in enumerate(self.versions): | |
| if record["version"] == version: | |
| self.current_index = i | |
| return record["state_obj"].copy(), record["state_raw"] | |
| return None | |
| def get_version_list(self) -> List[Dict[str, Any]]: | |
| """获取所有版本列表(用于 UI 显示)""" | |
| return [ | |
| { | |
| "version": r["version"], | |
| "timestamp": r["timestamp"], | |
| "summary": r["summary"], | |
| "is_current": i == self.current_index, | |
| } | |
| for i, r in enumerate(self.versions) | |
| ] | |
| def get_version_choices(self) -> List[str]: | |
| """生成下拉选项列表""" | |
| choices = [] | |
| for i, r in enumerate(self.versions): | |
| marker = " ← 当前" if i == self.current_index else "" | |
| choices.append(f"v{r['version']} [{r['timestamp']}] {r['summary']}{marker}") | |
| return choices | |
| def get_current_version(self) -> int: | |
| """获取当前版本号""" | |
| if self.current_index >= 0 and self.current_index < len(self.versions): | |
| return self.versions[self.current_index]["version"] | |
| return 0 | |
| def clear(self) -> None: | |
| """清空历史""" | |
| self.versions = [] | |
| self.current_index = -1 | |
| def to_serializable(self) -> Dict[str, Any]: | |
| """转为可序列化格式(用于 Gradio State)""" | |
| return { | |
| "versions": self.versions, | |
| "current_index": self.current_index, | |
| "max_versions": self.max_versions, | |
| } | |
| def from_serializable(cls, data: Dict[str, Any]) -> "DesignStateHistory": | |
| """从序列化格式恢复""" | |
| if not data or not isinstance(data, dict): | |
| return cls() | |
| history = cls(max_versions=data.get("max_versions", 20)) | |
| history.versions = data.get("versions", []) | |
| history.current_index = data.get("current_index", -1) | |
| return history | |
| def create_empty_history() -> Dict[str, Any]: | |
| """创建空的历史记录(用于初始化 State)""" | |
| return DesignStateHistory().to_serializable() | |
| def add_to_history( | |
| history_data: Dict[str, Any], | |
| version: int, | |
| state_obj: Dict[str, Any], | |
| state_raw: str, | |
| summary: str = "" | |
| ) -> Dict[str, Any]: | |
| """添加版本到历史""" | |
| history = DesignStateHistory.from_serializable(history_data) | |
| history.add_version(version, state_obj, state_raw, summary) | |
| return history.to_serializable() | |
| def rollback_history( | |
| history_data: Dict[str, Any], | |
| version: int | |
| ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], Optional[str]]: | |
| """ | |
| 回滚到指定版本 | |
| 返回:(更新后的 history_data, state_obj, state_raw) | |
| """ | |
| history = DesignStateHistory.from_serializable(history_data) | |
| result = history.rollback_to(version) | |
| if result: | |
| return history.to_serializable(), result[0], result[1] | |
| return history_data, None, None | |
| def get_history_choices(history_data: Dict[str, Any]) -> List[str]: | |
| """获取版本下拉选项""" | |
| history = DesignStateHistory.from_serializable(history_data) | |
| return history.get_version_choices() | |
| def parse_version_from_choice(choice: str) -> int: | |
| """从选项字符串中解析版本号""" | |
| if not choice: | |
| return 0 | |
| # 格式: "v3 [14:23:45] 玩法名 (2个机制)" | |
| match = re.match(r"v(\d+)", choice) | |
| if match: | |
| return int(match.group(1)) | |
| return 0 | |
| _JSON_FENCE_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE) | |
| _READY_RE = re.compile(r"READY_TO_GENERATE\s*:\s*(true|false)", re.IGNORECASE) | |
| def extract_design_state(text: str) -> Tuple[Optional[Dict[str, Any]], str]: | |
| """ | |
| 返回:(state_dict_or_none, raw_json_or_empty) | |
| """ | |
| if not text: | |
| return None, "" | |
| candidates = _JSON_FENCE_RE.findall(text) | |
| if not candidates: | |
| return None, "" | |
| # 选择“最像 DesignState”的 JSON:包含几个关键字段 | |
| best_raw = "" | |
| best_score = -1 | |
| best_obj = None | |
| for raw in candidates: | |
| raw = (raw or "").strip() | |
| try: | |
| obj = json.loads(raw) | |
| except Exception: | |
| continue | |
| score = 0 | |
| for k in ["new_variant_name", "mechanics", "tileset", "game_variant", "scoring_mode"]: | |
| if k in obj: | |
| score += 1 | |
| if score > best_score: | |
| best_score = score | |
| best_raw = raw | |
| best_obj = obj | |
| return best_obj, best_raw | |
| def extract_ready_to_generate(text: str) -> Optional[bool]: | |
| """ | |
| 从文本中提取 READY_TO_GENERATE: true/false | |
| 返回 None 表示未提供该标记。 | |
| """ | |
| if not text: | |
| return None | |
| m = _READY_RE.search(text) | |
| if not m: | |
| return None | |
| return m.group(1).lower() == "true" | |
| def summarize_design_state(state: Dict[str, Any]) -> str: | |
| if not state: | |
| return "" | |
| name = str(state.get("new_variant_name") or "").strip() | |
| base = state.get("base_variants") or [] | |
| fusion = state.get("fusion_variants") or [] | |
| mechanics = state.get("mechanics") or [] | |
| return "玩法:{0} | 底座:{1} | 融合:{2} | 机制数:{3}".format( | |
| name or "(未命名)", | |
| ",".join(base) if isinstance(base, list) else str(base), | |
| ",".join(fusion) if isinstance(fusion, list) else str(fusion), | |
| len(mechanics) if isinstance(mechanics, list) else 0, | |
| ) | |
| def diff_keys(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]: | |
| """ | |
| 粗粒度:仅比较顶层 key 的变化(新增/删除/值变更)。 | |
| """ | |
| if prev is None or cur is None: | |
| return [] | |
| changed = set() | |
| prev_keys = set(prev.keys()) | |
| cur_keys = set(cur.keys()) | |
| for k in prev_keys.symmetric_difference(cur_keys): | |
| changed.add(k) | |
| for k in prev_keys.intersection(cur_keys): | |
| if prev.get(k) != cur.get(k): | |
| changed.add(k) | |
| return sorted(changed) | |
| def diff_mechanics(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]: | |
| """ | |
| 简单机制差分:按 mechanic.name 对齐,输出变更摘要字符串列表。 | |
| """ | |
| prev_list = prev.get("mechanics") if isinstance(prev, dict) else None | |
| cur_list = cur.get("mechanics") if isinstance(cur, dict) else None | |
| if not isinstance(prev_list, list) or not isinstance(cur_list, list): | |
| return [] | |
| def to_map(lst: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: | |
| out = {} | |
| for it in lst: | |
| if not isinstance(it, dict): | |
| continue | |
| name = str(it.get("name") or "").strip() | |
| if not name: | |
| continue | |
| out[name] = it | |
| return out | |
| pm = to_map(prev_list) | |
| cm = to_map(cur_list) | |
| changes = [] | |
| for name in sorted(set(pm.keys()) | set(cm.keys())): | |
| if name not in pm: | |
| changes.append("新增机制: {0}".format(name)) | |
| continue | |
| if name not in cm: | |
| changes.append("删除机制: {0}".format(name)) | |
| continue | |
| if pm[name] == cm[name]: | |
| continue | |
| # 只输出变了哪些字段(不展开值) | |
| fields = sorted(set(pm[name].keys()) | set(cm[name].keys())) | |
| touched = [f for f in fields if pm[name].get(f) != cm[name].get(f)] | |
| changes.append("机制变更: {0} 字段={1}".format(name, ",".join(touched))) | |
| return changes | |
| def is_change_within_scope(changed_top_keys: List[str], scope: str) -> bool: | |
| """ | |
| 软约束:判断顶层改动是否落在预设范围内。 | |
| """ | |
| if not scope or scope == "自由迭代(仍保最小修改)": | |
| return True | |
| allowed = set() | |
| if scope == "仅优化创新机制": | |
| allowed.update(["mechanics", "open_questions", "core_constraints"]) | |
| elif scope == "仅优化计分与番型": | |
| allowed.update(["scoring_mode"]) | |
| elif scope == "仅优化流程与阶段": | |
| allowed.update(["game_variant"]) | |
| elif scope == "仅修复校验问题": | |
| # 允许改动更宽一些:常见缺失字段补齐 | |
| allowed.update([ | |
| "mechanics", | |
| "tileset", | |
| "core_constraints", | |
| "players", | |
| "game_variant", | |
| "scoring_mode", | |
| ]) | |
| else: | |
| return True | |
| # 名称/来源类字段默认不允许在"仅优化"里被改(除非自由迭代) | |
| forbidden = {"new_variant_name", "base_variants", "fusion_variants"} | |
| for k in changed_top_keys: | |
| if k in forbidden: | |
| return False | |
| if k not in allowed: | |
| return False | |
| return True | |
| # ==================== 差异可视化(新增) ==================== | |
| # 字段中文名映射 | |
| _FIELD_NAMES = { | |
| "new_variant_name": "玩法名称", | |
| "base_variants": "底座玩法", | |
| "fusion_variants": "融合玩法", | |
| "game_variant": "游戏模式", | |
| "players": "玩家人数", | |
| "scoring_mode": "计分模式", | |
| "tileset": "牌组配置", | |
| "core_constraints": "核心约束", | |
| "mechanics": "机制列表", | |
| "open_questions": "待定问题", | |
| } | |
| def _format_value(val: Any, max_len: int = 40) -> str: | |
| """格式化值为简短字符串""" | |
| if val is None: | |
| return "(空)" | |
| if isinstance(val, bool): | |
| return "是" if val else "否" | |
| if isinstance(val, (int, float)): | |
| return str(val) | |
| if isinstance(val, str): | |
| s = val.strip() | |
| return s if len(s) <= max_len else s[:max_len] + "..." | |
| if isinstance(val, list): | |
| if not val: | |
| return "(空列表)" | |
| if len(val) <= 3: | |
| return ", ".join(str(v) for v in val) | |
| return ", ".join(str(v) for v in val[:3]) + f"... 共{len(val)}项" | |
| if isinstance(val, dict): | |
| keys = list(val.keys()) | |
| if len(keys) <= 3: | |
| return "{" + ", ".join(keys) + "}" | |
| return "{" + ", ".join(keys[:3]) + f"... 共{len(keys)}项" + "}" | |
| return str(val)[:max_len] | |
| def diff_open_questions(prev: Dict[str, Any], cur: Dict[str, Any]) -> Tuple[List[str], List[str]]: | |
| """ | |
| 对比 open_questions 的变化 | |
| 返回:(已解决的问题列表, 新增的问题列表) | |
| """ | |
| prev_qs = prev.get("open_questions") if isinstance(prev, dict) else None | |
| cur_qs = cur.get("open_questions") if isinstance(cur, dict) else None | |
| if not isinstance(prev_qs, list): | |
| prev_qs = [] | |
| if not isinstance(cur_qs, list): | |
| cur_qs = [] | |
| prev_set = set(str(q) for q in prev_qs) | |
| cur_set = set(str(q) for q in cur_qs) | |
| resolved = sorted(prev_set - cur_set) | |
| added = sorted(cur_set - prev_set) | |
| return resolved, added | |
| def generate_diff_summary( | |
| prev: Optional[Dict[str, Any]], | |
| cur: Optional[Dict[str, Any]], | |
| prev_version: int = 0, | |
| cur_version: int = 0 | |
| ) -> str: | |
| """ | |
| 生成人可读的 DesignState 变更摘要(Markdown 格式) | |
| """ | |
| if not cur: | |
| return "" | |
| if not prev: | |
| # 首次建立 | |
| name = cur.get("new_variant_name", "(未命名)") | |
| mechanics = cur.get("mechanics", []) | |
| mech_count = len(mechanics) if isinstance(mechanics, list) else 0 | |
| return f"**DesignState v{cur_version} 已建立**\n\n玩法:{name}\n机制数:{mech_count}" | |
| lines = [] | |
| version_str = f"v{prev_version} → v{cur_version}" if prev_version and cur_version else "" | |
| lines.append(f"**DesignState 变更摘要** {version_str}") | |
| lines.append("") | |
| # 1. 顶层字段变更(排除 mechanics 和 open_questions,单独处理) | |
| skip_keys = {"mechanics", "open_questions"} | |
| changed_fields = [] | |
| all_keys = set(prev.keys()) | set(cur.keys()) | |
| for key in sorted(all_keys): | |
| if key in skip_keys: | |
| continue | |
| prev_val = prev.get(key) | |
| cur_val = cur.get(key) | |
| if prev_val != cur_val: | |
| field_name = _FIELD_NAMES.get(key, key) | |
| if key not in prev: | |
| changed_fields.append(f" • **[新增]** {field_name}: {_format_value(cur_val)}") | |
| elif key not in cur: | |
| changed_fields.append(f" • **[删除]** {field_name}") | |
| else: | |
| changed_fields.append(f" • {field_name}: `{_format_value(prev_val)}` → `{_format_value(cur_val)}`") | |
| if changed_fields: | |
| lines.append("📝 **顶层字段变更:**") | |
| lines.extend(changed_fields) | |
| lines.append("") | |
| # 2. 机制变更 | |
| mech_changes = diff_mechanics(prev, cur) | |
| if mech_changes: | |
| lines.append("🔧 **机制变更:**") | |
| for change in mech_changes: | |
| if change.startswith("新增机制"): | |
| lines.append(f" • **[新增]** {change.replace('新增机制: ', '')}") | |
| elif change.startswith("删除机制"): | |
| lines.append(f" • **[删除]** {change.replace('删除机制: ', '')}") | |
| else: | |
| # 机制变更: XXX 字段=a,b,c | |
| parts = change.replace("机制变更: ", "").split(" 字段=") | |
| if len(parts) == 2: | |
| lines.append(f" • **[修改]** {parts[0]}: {parts[1]}") | |
| else: | |
| lines.append(f" • {change}") | |
| lines.append("") | |
| # 3. 待定问题变化 | |
| resolved, added = diff_open_questions(prev, cur) | |
| if resolved or added: | |
| lines.append("❓ **待定问题变化:**") | |
| for q in resolved: | |
| lines.append(f" • ~~[已解决]~~ {q[:50]}{'...' if len(q) > 50 else ''}") | |
| for q in added: | |
| lines.append(f" • **[新增]** {q[:50]}{'...' if len(q) > 50 else ''}") | |
| lines.append("") | |
| # 如果没有任何变更 | |
| if len(lines) <= 2: | |
| lines.append("_(无变更)_") | |
| return "\n".join(lines).strip() | |
| def generate_diff_summary_compact( | |
| prev: Optional[Dict[str, Any]], | |
| cur: Optional[Dict[str, Any]] | |
| ) -> str: | |
| """ | |
| 生成紧凑版变更摘要(单行,用于状态栏) | |
| """ | |
| if not cur: | |
| return "无变更" | |
| if not prev: | |
| name = cur.get("new_variant_name", "(未命名)") | |
| return f"新建: {name}" | |
| changes = [] | |
| # 顶层字段变更数 | |
| field_changes = diff_keys(prev, cur) | |
| skip_keys = {"mechanics", "open_questions"} | |
| field_changes = [k for k in field_changes if k not in skip_keys] | |
| if field_changes: | |
| changes.append(f"{len(field_changes)}个字段") | |
| # 机制变更数 | |
| mech_changes = diff_mechanics(prev, cur) | |
| if mech_changes: | |
| changes.append(f"{len(mech_changes)}个机制") | |
| # 问题变化 | |
| resolved, added = diff_open_questions(prev, cur) | |
| if resolved: | |
| changes.append(f"解决{len(resolved)}个问题") | |
| if added: | |
| changes.append(f"新增{len(added)}个问题") | |
| if not changes: | |
| return "无变更" | |
| return "变更: " + ", ".join(changes) | |
| # ==================== 多阶段引导式交互 ==================== | |
| class InteractionPhase: | |
| """交互阶段枚举""" | |
| INITIAL = "initial" # 初始阶段:用户提出需求 | |
| UNDERSTAND = "understand" # 理解确认:确认对已有玩法的理解 | |
| DIVERGE = "diverge" # 方案发散:生成多种机制组合方案 | |
| SELECT = "select" # 方案选择:用户选择具体方案 | |
| ELABORATE = "elaborate" # 深入展开:对选定方案进行详细设计 | |
| ITERATE = "iterate" # 迭代优化:基于反馈优化方案 | |
| # 方案提取正则 | |
| _PROPOSAL_BLOCK_RE = re.compile( | |
| r"(?:###?\s*)?(?:方案|选项|Option)\s*([A-Z\d一二三四五六七八九十]+)[::\s]*(.+?)(?=(?:###?\s*)?(?:方案|选项|Option)\s*[A-Z\d一二三四五六七八九十]+[::\s]|$)", | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| # 理解确认问题提取 | |
| _CLARIFY_QUESTION_RE = re.compile( | |
| r"(?:❓|🤔|【确认】|【问题】|\[确认\]|\[问题\]|请确认|请问|是否是指)\s*(.+?\?)", | |
| re.DOTALL | |
| ) | |
| def extract_proposals(text: str) -> List[Dict[str, Any]]: | |
| """ | |
| 从模型输出中提取多个候选方案 | |
| 返回: [{"id": "A", "title": "...", "description": "...", "highlights": [...]}] | |
| """ | |
| if not text: | |
| return [] | |
| proposals = [] | |
| matches = _PROPOSAL_BLOCK_RE.findall(text) | |
| for idx, (proposal_id, content) in enumerate(matches): | |
| content = content.strip() | |
| # 提取标题(第一行或冒号前的部分) | |
| lines = content.split("\n") | |
| title = lines[0].strip() if lines else f"方案 {proposal_id}" | |
| # 清理标题中的 markdown 标记 | |
| title = re.sub(r"^[#\-\*]+\s*", "", title) | |
| title = re.sub(r"\*+", "", title) | |
| # 提取描述 | |
| description = "\n".join(lines[1:]).strip() if len(lines) > 1 else "" | |
| # 提取亮点/创新点 | |
| highlights = [] | |
| highlight_patterns = [ | |
| r"[★✦⭐🌟]\s*(.+)", | |
| r"(?:创新点|亮点|特色)[::]\s*(.+)", | |
| r"[-•]\s*(?:创新|特色|核心)[::]\s*(.+)", | |
| ] | |
| for pattern in highlight_patterns: | |
| hl_matches = re.findall(pattern, content) | |
| highlights.extend([h.strip() for h in hl_matches]) | |
| proposals.append({ | |
| "id": proposal_id.strip(), | |
| "title": title[:50], # 限制标题长度 | |
| "description": description[:200] + ("..." if len(description) > 200 else ""), | |
| "highlights": highlights[:3], # 最多3个亮点 | |
| "full_content": content, | |
| }) | |
| return proposals | |
| def extract_clarify_questions(text: str) -> List[str]: | |
| """ | |
| 从模型输出中提取需要用户确认的问题 | |
| """ | |
| if not text: | |
| return [] | |
| questions = [] | |
| matches = _CLARIFY_QUESTION_RE.findall(text) | |
| for q in matches: | |
| q = q.strip() | |
| if q and len(q) > 5: # 过滤太短的 | |
| questions.append(q) | |
| # 也尝试提取 open_questions 中的内容 | |
| try: | |
| ds_obj, _ = extract_design_state(text) | |
| if ds_obj and "open_questions" in ds_obj: | |
| for q in ds_obj.get("open_questions", []): | |
| if isinstance(q, str) and q not in questions: | |
| questions.append(q) | |
| except Exception: | |
| pass | |
| return questions[:5] # 最多5个问题 | |
| def detect_interaction_phase(text: str, ds_obj: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| 根据模型输出内容检测当前交互阶段 | |
| """ | |
| if not text: | |
| return InteractionPhase.INITIAL | |
| text_lower = text.lower() | |
| # 检测是否有多个方案供选择 | |
| proposals = extract_proposals(text) | |
| if len(proposals) >= 2: | |
| return InteractionPhase.DIVERGE | |
| # 检测是否有需要确认的问题 | |
| questions = extract_clarify_questions(text) | |
| if questions: | |
| return InteractionPhase.UNDERSTAND | |
| # 检测是否有完整的 DesignState 且 READY_TO_GENERATE | |
| ready = extract_ready_to_generate(text) | |
| if ready is True: | |
| return InteractionPhase.ELABORATE | |
| # 检测是否在迭代中 | |
| if ds_obj and ds_obj.get("mechanics"): | |
| return InteractionPhase.ITERATE | |
| return InteractionPhase.INITIAL | |
| def format_proposals_for_display(proposals: List[Dict[str, Any]]) -> str: | |
| """ | |
| 将方案列表格式化为 Markdown 显示 | |
| """ | |
| if not proposals: | |
| return "" | |
| lines = ["## 🎯 请选择一个方案深入展开\n"] | |
| for p in proposals: | |
| lines.append(f"### 方案 {p['id']}: {p['title']}") | |
| if p.get("highlights"): | |
| for hl in p["highlights"]: | |
| lines.append(f" ✦ {hl}") | |
| if p.get("description"): | |
| lines.append(f"\n{p['description'][:150]}...") | |
| lines.append("") | |
| lines.append("\n💡 请在下方选择方案编号,或输入「其他」描述你的想法。") | |
| return "\n".join(lines) | |
| def generate_phase_prompt_hint(phase: str, ds_obj: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| 根据当前阶段生成提示词补充 | |
| """ | |
| hints = { | |
| InteractionPhase.INITIAL: "", | |
| InteractionPhase.UNDERSTAND: ( | |
| "\n\n【阶段提示】当前处于「理解确认」阶段。" | |
| "请确保你完全理解用户描述的已有玩法,如有不明确之处请提出确认问题。" | |
| ), | |
| InteractionPhase.DIVERGE: ( | |
| "\n\n【阶段提示】当前处于「方案发散」阶段。" | |
| "请发散思维,给出 2-4 种不同方向的机制组合方案,每个方案需包含:\n" | |
| "1. 方案编号(A/B/C/D)\n" | |
| "2. 方案名称\n" | |
| "3. 核心创新点(1-2句话)\n" | |
| "4. 机制组合简述\n" | |
| "不要深入展开,等用户选择后再详细设计。" | |
| ), | |
| InteractionPhase.SELECT: ( | |
| "\n\n【阶段提示】用户已选择方案,请对该方案进行深入展开设计。" | |
| ), | |
| InteractionPhase.ELABORATE: ( | |
| "\n\n【阶段提示】当前处于「深入展开」阶段。" | |
| "请输出完整的玩法设计(含 DesignState、mGDL、自检报告)。" | |
| ), | |
| InteractionPhase.ITERATE: ( | |
| "\n\n【阶段提示】当前处于「迭代优化」阶段。" | |
| "请基于用户反馈对现有方案进行最小修改。" | |
| ), | |
| } | |
| return hints.get(phase, "") | |
| def create_phase_state() -> Dict[str, Any]: | |
| """创建阶段状态(用于 Gradio State)""" | |
| return { | |
| "current_phase": InteractionPhase.INITIAL, | |
| "confirmed_understanding": [], # 已确认的理解点 | |
| "proposals": [], # 当前可选方案 | |
| "selected_proposal": None, # 用户选择的方案 | |
| "phase_history": [], # 阶段历史 | |
| } | |
| def update_phase_state( | |
| phase_state: Dict[str, Any], | |
| new_phase: str, | |
| proposals: Optional[List[Dict[str, Any]]] = None, | |
| selected: Optional[str] = None, | |
| confirmed: Optional[List[str]] = None, | |
| ) -> Dict[str, Any]: | |
| """更新阶段状态""" | |
| state = phase_state.copy() if phase_state else create_phase_state() | |
| # 记录阶段变化历史 | |
| if state["current_phase"] != new_phase: | |
| state["phase_history"].append({ | |
| "from": state["current_phase"], | |
| "to": new_phase, | |
| "timestamp": datetime.now().strftime("%H:%M:%S"), | |
| }) | |
| state["current_phase"] = new_phase | |
| if proposals is not None: | |
| state["proposals"] = proposals | |
| if selected is not None: | |
| state["selected_proposal"] = selected | |
| if confirmed is not None: | |
| state["confirmed_understanding"].extend(confirmed) | |
| return state | |
| def get_phase_display_name(phase: str) -> str: | |
| """获取阶段的显示名称""" | |
| names = { | |
| InteractionPhase.INITIAL: "🚀 初始", | |
| InteractionPhase.UNDERSTAND: "🔍 理解确认", | |
| InteractionPhase.DIVERGE: "💡 方案发散", | |
| InteractionPhase.SELECT: "✅ 方案选择", | |
| InteractionPhase.ELABORATE: "📝 深入展开", | |
| InteractionPhase.ITERATE: "🔄 迭代优化", | |
| } | |
| return names.get(phase, phase) | |
| # ==================== 细粒度范围控制 ==================== | |
| # DesignState 可控字段定义 | |
| CONTROLLABLE_FIELDS = { | |
| "new_variant_name": {"label": "玩法名称", "category": "基础信息", "lockable": True}, | |
| "base_variants": {"label": "底座玩法", "category": "基础信息", "lockable": True}, | |
| "fusion_variants": {"label": "融合玩法", "category": "基础信息", "lockable": True}, | |
| "game_variant": {"label": "游戏模式", "category": "游戏规则", "lockable": True}, | |
| "players": {"label": "玩家人数", "category": "游戏规则", "lockable": True}, | |
| "scoring_mode": {"label": "计分模式", "category": "计分系统", "lockable": True}, | |
| "tileset": {"label": "牌组配置", "category": "游戏规则", "lockable": True}, | |
| "core_constraints": {"label": "核心约束", "category": "游戏规则", "lockable": False}, | |
| "mechanics": {"label": "机制列表", "category": "创新机制", "lockable": False}, | |
| "open_questions": {"label": "待定问题", "category": "其他", "lockable": False}, | |
| } | |
| # 字段分类 | |
| FIELD_CATEGORIES = { | |
| "基础信息": ["new_variant_name", "base_variants", "fusion_variants"], | |
| "游戏规则": ["game_variant", "players", "tileset", "core_constraints"], | |
| "计分系统": ["scoring_mode"], | |
| "创新机制": ["mechanics"], | |
| "其他": ["open_questions"], | |
| } | |
| class ScopeConstraint: | |
| """范围约束类型""" | |
| SOFT = "soft" # 软约束:检测到越界时提示,但不阻断 | |
| HARD = "hard" # 硬约束:检测到越界时阻断并要求重做 | |
| def create_scope_config() -> Dict[str, Any]: | |
| """创建默认的范围配置""" | |
| return { | |
| "mode": "preset", # preset(预设模式)或 custom(自定义模式) | |
| "preset": "自由迭代(仍保最小修改)", # 预设选项 | |
| "constraint_level": ScopeConstraint.SOFT, # 约束级别 | |
| "locked_fields": [], # 锁定的顶层字段 | |
| "allowed_fields": [], # 允许修改的顶层字段(custom 模式下使用) | |
| "locked_mechanics": [], # 锁定的机制名称(不允许修改这些机制) | |
| "allowed_mechanics": [], # 只允许修改这些机制(为空表示不限制) | |
| } | |
| def get_scope_preset_options() -> List[str]: | |
| """获取预设范围选项""" | |
| return [ | |
| "自由迭代(仍保最小修改)", | |
| "仅优化创新机制", | |
| "仅优化计分与番型", | |
| "仅优化流程与阶段", | |
| "仅修复校验问题", | |
| "锁定核心(仅微调细节)", | |
| ] | |
| def get_allowed_fields_for_preset(preset: str) -> List[str]: | |
| """根据预设获取允许修改的字段""" | |
| presets = { | |
| "自由迭代(仍保最小修改)": list(CONTROLLABLE_FIELDS.keys()), | |
| "仅优化创新机制": ["mechanics", "open_questions", "core_constraints"], | |
| "仅优化计分与番型": ["scoring_mode", "open_questions"], | |
| "仅优化流程与阶段": ["game_variant", "open_questions"], | |
| "仅修复校验问题": ["mechanics", "tileset", "core_constraints", "players", "game_variant", "scoring_mode", "open_questions"], | |
| "锁定核心(仅微调细节)": ["open_questions", "core_constraints"], | |
| } | |
| return presets.get(preset, list(CONTROLLABLE_FIELDS.keys())) | |
| def get_forbidden_fields_for_preset(preset: str) -> List[str]: | |
| """根据预设获取禁止修改的字段""" | |
| if preset == "自由迭代(仍保最小修改)": | |
| return [] | |
| allowed = set(get_allowed_fields_for_preset(preset)) | |
| all_fields = set(CONTROLLABLE_FIELDS.keys()) | |
| return list(all_fields - allowed) | |
| def validate_scope_compliance( | |
| prev: Dict[str, Any], | |
| cur: Dict[str, Any], | |
| scope_config: Dict[str, Any], | |
| ) -> Dict[str, Any]: | |
| """ | |
| 验证变更是否符合范围约束 | |
| 返回: | |
| { | |
| "compliant": bool, # 是否符合约束 | |
| "violations": [ # 违规列表 | |
| {"field": "xxx", "type": "field_locked|field_forbidden|mechanic_locked", "detail": "..."} | |
| ], | |
| "warnings": [], # 警告(软约束时) | |
| "summary": "..." # 摘要文本 | |
| } | |
| """ | |
| if not prev or not cur: | |
| return {"compliant": True, "violations": [], "warnings": [], "summary": "无变更"} | |
| result = { | |
| "compliant": True, | |
| "violations": [], | |
| "warnings": [], | |
| "summary": "", | |
| } | |
| mode = scope_config.get("mode", "preset") | |
| constraint_level = scope_config.get("constraint_level", ScopeConstraint.SOFT) | |
| # 确定允许和禁止的字段 | |
| if mode == "preset": | |
| preset = scope_config.get("preset", "自由迭代(仍保最小修改)") | |
| allowed_fields = set(get_allowed_fields_for_preset(preset)) | |
| forbidden_fields = set(get_forbidden_fields_for_preset(preset)) | |
| else: | |
| allowed_fields = set(scope_config.get("allowed_fields", [])) | |
| forbidden_fields = set(scope_config.get("locked_fields", [])) | |
| locked_mechanics = set(scope_config.get("locked_mechanics", [])) | |
| allowed_mechanics = set(scope_config.get("allowed_mechanics", [])) | |
| # 检查顶层字段变更 | |
| changed_fields = diff_keys(prev, cur) | |
| for field in changed_fields: | |
| # 检查是否在禁止列表中 | |
| if field in forbidden_fields: | |
| violation = { | |
| "field": field, | |
| "type": "field_forbidden", | |
| "detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」不在允许修改范围内", | |
| } | |
| result["violations"].append(violation) | |
| continue | |
| # 检查是否被显式锁定 | |
| if field in scope_config.get("locked_fields", []): | |
| violation = { | |
| "field": field, | |
| "type": "field_locked", | |
| "detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」已被锁定", | |
| } | |
| result["violations"].append(violation) | |
| continue | |
| # 检查机制级别变更 | |
| if "mechanics" in changed_fields: | |
| mech_changes = diff_mechanics(prev, cur) | |
| for change in mech_changes: | |
| # 提取机制名称 | |
| mech_name = "" | |
| if "新增机制:" in change: | |
| mech_name = change.replace("新增机制: ", "").strip() | |
| elif "删除机制:" in change: | |
| mech_name = change.replace("删除机制: ", "").strip() | |
| elif "机制变更:" in change: | |
| mech_name = change.split(" 字段=")[0].replace("机制变更: ", "").strip() | |
| # 检查机制是否被锁定 | |
| if mech_name and mech_name in locked_mechanics: | |
| violation = { | |
| "field": "mechanics", | |
| "type": "mechanic_locked", | |
| "detail": f"机制「{mech_name}」已被锁定,不允许修改", | |
| } | |
| result["violations"].append(violation) | |
| # 检查是否只允许修改特定机制 | |
| if allowed_mechanics and mech_name and mech_name not in allowed_mechanics: | |
| # 只对修改和删除进行限制,新增通常是允许的 | |
| if "删除机制:" in change or "机制变更:" in change: | |
| violation = { | |
| "field": "mechanics", | |
| "type": "mechanic_not_allowed", | |
| "detail": f"机制「{mech_name}」不在允许修改的机制列表中", | |
| } | |
| result["violations"].append(violation) | |
| # 根据约束级别处理违规 | |
| if result["violations"]: | |
| if constraint_level == ScopeConstraint.HARD: | |
| result["compliant"] = False | |
| result["summary"] = f"❌ 发现 {len(result['violations'])} 处范围越界(硬约束),请重新生成" | |
| else: | |
| result["warnings"] = result["violations"] | |
| result["violations"] = [] | |
| result["summary"] = f"⚠️ 发现 {len(result['warnings'])} 处范围越界(软约束),建议检查" | |
| else: | |
| result["summary"] = "✅ 变更符合范围约束" | |
| return result | |
| def format_scope_violations(validation_result: Dict[str, Any]) -> str: | |
| """格式化范围违规为 Markdown""" | |
| if validation_result.get("compliant", True) and not validation_result.get("warnings"): | |
| return validation_result.get("summary", "") | |
| lines = [validation_result.get("summary", "")] | |
| items = validation_result.get("violations", []) or validation_result.get("warnings", []) | |
| if items: | |
| lines.append("") | |
| for item in items: | |
| field_label = CONTROLLABLE_FIELDS.get(item["field"], {}).get("label", item["field"]) | |
| lines.append(f" • **{field_label}**: {item['detail']}") | |
| return "\n".join(lines) | |
| def get_mechanics_from_state(ds_obj: Dict[str, Any]) -> List[str]: | |
| """从 DesignState 中提取机制名称列表""" | |
| if not ds_obj: | |
| return [] | |
| mechanics = ds_obj.get("mechanics", []) | |
| if not isinstance(mechanics, list): | |
| return [] | |
| names = [] | |
| for m in mechanics: | |
| if isinstance(m, dict) and "name" in m: | |
| names.append(m["name"]) | |
| return names | |
| def generate_scope_prompt_hint(scope_config: Dict[str, Any], ds_obj: Optional[Dict[str, Any]] = None) -> str: | |
| """生成范围约束的提示词补充""" | |
| mode = scope_config.get("mode", "preset") | |
| if mode == "preset": | |
| preset = scope_config.get("preset", "自由迭代(仍保最小修改)") | |
| if preset == "自由迭代(仍保最小修改)": | |
| return "" | |
| allowed = get_allowed_fields_for_preset(preset) | |
| forbidden = get_forbidden_fields_for_preset(preset) | |
| allowed_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in allowed] | |
| forbidden_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in forbidden] | |
| hint = f"\n\n【范围约束】当前模式:{preset}\n" | |
| hint += f"允许修改:{', '.join(allowed_labels)}\n" | |
| if forbidden_labels: | |
| hint += f"禁止修改:{', '.join(forbidden_labels)}\n" | |
| return hint | |
| # 自定义模式 | |
| locked_fields = scope_config.get("locked_fields", []) | |
| locked_mechanics = scope_config.get("locked_mechanics", []) | |
| if not locked_fields and not locked_mechanics: | |
| return "" | |
| hint = "\n\n【范围约束】自定义模式\n" | |
| if locked_fields: | |
| locked_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in locked_fields] | |
| hint += f"锁定字段(禁止修改):{', '.join(locked_labels)}\n" | |
| if locked_mechanics: | |
| hint += f"锁定机制(禁止修改):{', '.join(locked_mechanics)}\n" | |
| return hint | |