""" DesignState 解析与差分(用于"多轮可控迭代")。 约定: - 模型需在回答中输出一个 ```json ...``` 代码块作为 DesignState - 该 JSON 应包含 new_variant_name / mechanics 等关键字段 """ import json import re from datetime import datetime from typing import Any, Dict, List, Optional, Tuple # ==================== 版本历史管理 ==================== class DesignStateHistory: """ 管理 DesignState 的版本历史,支持回滚 每个版本包含: - version: 版本号 - state_obj: DesignState 字典 - state_raw: 原始 JSON 字符串 - timestamp: 时间戳 - summary: 紧凑版摘要 """ def __init__(self, max_versions: int = 20): self.max_versions = max_versions self.versions: List[Dict[str, Any]] = [] self.current_index: int = -1 # 当前版本在 versions 中的索引 def add_version( self, version: int, state_obj: Dict[str, Any], state_raw: str, summary: str = "" ) -> None: """添加新版本""" if not state_obj: return record = { "version": version, "state_obj": state_obj.copy() if state_obj else {}, "state_raw": state_raw, "timestamp": datetime.now().strftime("%H:%M:%S"), "summary": summary or self._generate_summary(state_obj), } # 如果当前不在最新位置,截断后面的版本(类似 git 的分支) if self.current_index >= 0 and self.current_index < len(self.versions) - 1: self.versions = self.versions[:self.current_index + 1] self.versions.append(record) self.current_index = len(self.versions) - 1 # 超出最大版本数时,删除最早的 if len(self.versions) > self.max_versions: self.versions.pop(0) self.current_index = len(self.versions) - 1 def _generate_summary(self, state_obj: Dict[str, Any]) -> str: """生成简短摘要""" name = state_obj.get("new_variant_name", "(未命名)") mechanics = state_obj.get("mechanics", []) mech_count = len(mechanics) if isinstance(mechanics, list) else 0 return f"{name} ({mech_count}个机制)" def get_version(self, version: int) -> Optional[Dict[str, Any]]: """获取指定版本""" for record in self.versions: if record["version"] == version: return record return None def get_version_by_index(self, index: int) -> Optional[Dict[str, Any]]: """通过索引获取版本""" if 0 <= index < len(self.versions): return self.versions[index] return None def rollback_to(self, version: int) -> Optional[Tuple[Dict[str, Any], str]]: """ 回滚到指定版本 返回:(state_obj, state_raw) 或 None """ for i, record in enumerate(self.versions): if record["version"] == version: self.current_index = i return record["state_obj"].copy(), record["state_raw"] return None def get_version_list(self) -> List[Dict[str, Any]]: """获取所有版本列表(用于 UI 显示)""" return [ { "version": r["version"], "timestamp": r["timestamp"], "summary": r["summary"], "is_current": i == self.current_index, } for i, r in enumerate(self.versions) ] def get_version_choices(self) -> List[str]: """生成下拉选项列表""" choices = [] for i, r in enumerate(self.versions): marker = " ← 当前" if i == self.current_index else "" choices.append(f"v{r['version']} [{r['timestamp']}] {r['summary']}{marker}") return choices def get_current_version(self) -> int: """获取当前版本号""" if self.current_index >= 0 and self.current_index < len(self.versions): return self.versions[self.current_index]["version"] return 0 def clear(self) -> None: """清空历史""" self.versions = [] self.current_index = -1 def to_serializable(self) -> Dict[str, Any]: """转为可序列化格式(用于 Gradio State)""" return { "versions": self.versions, "current_index": self.current_index, "max_versions": self.max_versions, } @classmethod def from_serializable(cls, data: Dict[str, Any]) -> "DesignStateHistory": """从序列化格式恢复""" if not data or not isinstance(data, dict): return cls() history = cls(max_versions=data.get("max_versions", 20)) history.versions = data.get("versions", []) history.current_index = data.get("current_index", -1) return history def create_empty_history() -> Dict[str, Any]: """创建空的历史记录(用于初始化 State)""" return DesignStateHistory().to_serializable() def add_to_history( history_data: Dict[str, Any], version: int, state_obj: Dict[str, Any], state_raw: str, summary: str = "" ) -> Dict[str, Any]: """添加版本到历史""" history = DesignStateHistory.from_serializable(history_data) history.add_version(version, state_obj, state_raw, summary) return history.to_serializable() def rollback_history( history_data: Dict[str, Any], version: int ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], Optional[str]]: """ 回滚到指定版本 返回:(更新后的 history_data, state_obj, state_raw) """ history = DesignStateHistory.from_serializable(history_data) result = history.rollback_to(version) if result: return history.to_serializable(), result[0], result[1] return history_data, None, None def get_history_choices(history_data: Dict[str, Any]) -> List[str]: """获取版本下拉选项""" history = DesignStateHistory.from_serializable(history_data) return history.get_version_choices() def parse_version_from_choice(choice: str) -> int: """从选项字符串中解析版本号""" if not choice: return 0 # 格式: "v3 [14:23:45] 玩法名 (2个机制)" match = re.match(r"v(\d+)", choice) if match: return int(match.group(1)) return 0 _JSON_FENCE_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE) _READY_RE = re.compile(r"READY_TO_GENERATE\s*:\s*(true|false)", re.IGNORECASE) def extract_design_state(text: str) -> Tuple[Optional[Dict[str, Any]], str]: """ 返回:(state_dict_or_none, raw_json_or_empty) """ if not text: return None, "" candidates = _JSON_FENCE_RE.findall(text) if not candidates: return None, "" # 选择“最像 DesignState”的 JSON:包含几个关键字段 best_raw = "" best_score = -1 best_obj = None for raw in candidates: raw = (raw or "").strip() try: obj = json.loads(raw) except Exception: continue score = 0 for k in ["new_variant_name", "mechanics", "tileset", "game_variant", "scoring_mode"]: if k in obj: score += 1 if score > best_score: best_score = score best_raw = raw best_obj = obj return best_obj, best_raw def extract_ready_to_generate(text: str) -> Optional[bool]: """ 从文本中提取 READY_TO_GENERATE: true/false 返回 None 表示未提供该标记。 """ if not text: return None m = _READY_RE.search(text) if not m: return None return m.group(1).lower() == "true" def summarize_design_state(state: Dict[str, Any]) -> str: if not state: return "" name = str(state.get("new_variant_name") or "").strip() base = state.get("base_variants") or [] fusion = state.get("fusion_variants") or [] mechanics = state.get("mechanics") or [] return "玩法:{0} | 底座:{1} | 融合:{2} | 机制数:{3}".format( name or "(未命名)", ",".join(base) if isinstance(base, list) else str(base), ",".join(fusion) if isinstance(fusion, list) else str(fusion), len(mechanics) if isinstance(mechanics, list) else 0, ) def diff_keys(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]: """ 粗粒度:仅比较顶层 key 的变化(新增/删除/值变更)。 """ if prev is None or cur is None: return [] changed = set() prev_keys = set(prev.keys()) cur_keys = set(cur.keys()) for k in prev_keys.symmetric_difference(cur_keys): changed.add(k) for k in prev_keys.intersection(cur_keys): if prev.get(k) != cur.get(k): changed.add(k) return sorted(changed) def diff_mechanics(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]: """ 简单机制差分:按 mechanic.name 对齐,输出变更摘要字符串列表。 """ prev_list = prev.get("mechanics") if isinstance(prev, dict) else None cur_list = cur.get("mechanics") if isinstance(cur, dict) else None if not isinstance(prev_list, list) or not isinstance(cur_list, list): return [] def to_map(lst: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: out = {} for it in lst: if not isinstance(it, dict): continue name = str(it.get("name") or "").strip() if not name: continue out[name] = it return out pm = to_map(prev_list) cm = to_map(cur_list) changes = [] for name in sorted(set(pm.keys()) | set(cm.keys())): if name not in pm: changes.append("新增机制: {0}".format(name)) continue if name not in cm: changes.append("删除机制: {0}".format(name)) continue if pm[name] == cm[name]: continue # 只输出变了哪些字段(不展开值) fields = sorted(set(pm[name].keys()) | set(cm[name].keys())) touched = [f for f in fields if pm[name].get(f) != cm[name].get(f)] changes.append("机制变更: {0} 字段={1}".format(name, ",".join(touched))) return changes def is_change_within_scope(changed_top_keys: List[str], scope: str) -> bool: """ 软约束:判断顶层改动是否落在预设范围内。 """ if not scope or scope == "自由迭代(仍保最小修改)": return True allowed = set() if scope == "仅优化创新机制": allowed.update(["mechanics", "open_questions", "core_constraints"]) elif scope == "仅优化计分与番型": allowed.update(["scoring_mode"]) elif scope == "仅优化流程与阶段": allowed.update(["game_variant"]) elif scope == "仅修复校验问题": # 允许改动更宽一些:常见缺失字段补齐 allowed.update([ "mechanics", "tileset", "core_constraints", "players", "game_variant", "scoring_mode", ]) else: return True # 名称/来源类字段默认不允许在"仅优化"里被改(除非自由迭代) forbidden = {"new_variant_name", "base_variants", "fusion_variants"} for k in changed_top_keys: if k in forbidden: return False if k not in allowed: return False return True # ==================== 差异可视化(新增) ==================== # 字段中文名映射 _FIELD_NAMES = { "new_variant_name": "玩法名称", "base_variants": "底座玩法", "fusion_variants": "融合玩法", "game_variant": "游戏模式", "players": "玩家人数", "scoring_mode": "计分模式", "tileset": "牌组配置", "core_constraints": "核心约束", "mechanics": "机制列表", "open_questions": "待定问题", } def _format_value(val: Any, max_len: int = 40) -> str: """格式化值为简短字符串""" if val is None: return "(空)" if isinstance(val, bool): return "是" if val else "否" if isinstance(val, (int, float)): return str(val) if isinstance(val, str): s = val.strip() return s if len(s) <= max_len else s[:max_len] + "..." if isinstance(val, list): if not val: return "(空列表)" if len(val) <= 3: return ", ".join(str(v) for v in val) return ", ".join(str(v) for v in val[:3]) + f"... 共{len(val)}项" if isinstance(val, dict): keys = list(val.keys()) if len(keys) <= 3: return "{" + ", ".join(keys) + "}" return "{" + ", ".join(keys[:3]) + f"... 共{len(keys)}项" + "}" return str(val)[:max_len] def diff_open_questions(prev: Dict[str, Any], cur: Dict[str, Any]) -> Tuple[List[str], List[str]]: """ 对比 open_questions 的变化 返回:(已解决的问题列表, 新增的问题列表) """ prev_qs = prev.get("open_questions") if isinstance(prev, dict) else None cur_qs = cur.get("open_questions") if isinstance(cur, dict) else None if not isinstance(prev_qs, list): prev_qs = [] if not isinstance(cur_qs, list): cur_qs = [] prev_set = set(str(q) for q in prev_qs) cur_set = set(str(q) for q in cur_qs) resolved = sorted(prev_set - cur_set) added = sorted(cur_set - prev_set) return resolved, added def generate_diff_summary( prev: Optional[Dict[str, Any]], cur: Optional[Dict[str, Any]], prev_version: int = 0, cur_version: int = 0 ) -> str: """ 生成人可读的 DesignState 变更摘要(Markdown 格式) """ if not cur: return "" if not prev: # 首次建立 name = cur.get("new_variant_name", "(未命名)") mechanics = cur.get("mechanics", []) mech_count = len(mechanics) if isinstance(mechanics, list) else 0 return f"**DesignState v{cur_version} 已建立**\n\n玩法:{name}\n机制数:{mech_count}" lines = [] version_str = f"v{prev_version} → v{cur_version}" if prev_version and cur_version else "" lines.append(f"**DesignState 变更摘要** {version_str}") lines.append("") # 1. 顶层字段变更(排除 mechanics 和 open_questions,单独处理) skip_keys = {"mechanics", "open_questions"} changed_fields = [] all_keys = set(prev.keys()) | set(cur.keys()) for key in sorted(all_keys): if key in skip_keys: continue prev_val = prev.get(key) cur_val = cur.get(key) if prev_val != cur_val: field_name = _FIELD_NAMES.get(key, key) if key not in prev: changed_fields.append(f" • **[新增]** {field_name}: {_format_value(cur_val)}") elif key not in cur: changed_fields.append(f" • **[删除]** {field_name}") else: changed_fields.append(f" • {field_name}: `{_format_value(prev_val)}` → `{_format_value(cur_val)}`") if changed_fields: lines.append("📝 **顶层字段变更:**") lines.extend(changed_fields) lines.append("") # 2. 机制变更 mech_changes = diff_mechanics(prev, cur) if mech_changes: lines.append("🔧 **机制变更:**") for change in mech_changes: if change.startswith("新增机制"): lines.append(f" • **[新增]** {change.replace('新增机制: ', '')}") elif change.startswith("删除机制"): lines.append(f" • **[删除]** {change.replace('删除机制: ', '')}") else: # 机制变更: XXX 字段=a,b,c parts = change.replace("机制变更: ", "").split(" 字段=") if len(parts) == 2: lines.append(f" • **[修改]** {parts[0]}: {parts[1]}") else: lines.append(f" • {change}") lines.append("") # 3. 待定问题变化 resolved, added = diff_open_questions(prev, cur) if resolved or added: lines.append("❓ **待定问题变化:**") for q in resolved: lines.append(f" • ~~[已解决]~~ {q[:50]}{'...' if len(q) > 50 else ''}") for q in added: lines.append(f" • **[新增]** {q[:50]}{'...' if len(q) > 50 else ''}") lines.append("") # 如果没有任何变更 if len(lines) <= 2: lines.append("_(无变更)_") return "\n".join(lines).strip() def generate_diff_summary_compact( prev: Optional[Dict[str, Any]], cur: Optional[Dict[str, Any]] ) -> str: """ 生成紧凑版变更摘要(单行,用于状态栏) """ if not cur: return "无变更" if not prev: name = cur.get("new_variant_name", "(未命名)") return f"新建: {name}" changes = [] # 顶层字段变更数 field_changes = diff_keys(prev, cur) skip_keys = {"mechanics", "open_questions"} field_changes = [k for k in field_changes if k not in skip_keys] if field_changes: changes.append(f"{len(field_changes)}个字段") # 机制变更数 mech_changes = diff_mechanics(prev, cur) if mech_changes: changes.append(f"{len(mech_changes)}个机制") # 问题变化 resolved, added = diff_open_questions(prev, cur) if resolved: changes.append(f"解决{len(resolved)}个问题") if added: changes.append(f"新增{len(added)}个问题") if not changes: return "无变更" return "变更: " + ", ".join(changes) # ==================== 多阶段引导式交互 ==================== class InteractionPhase: """交互阶段枚举""" INITIAL = "initial" # 初始阶段:用户提出需求 UNDERSTAND = "understand" # 理解确认:确认对已有玩法的理解 DIVERGE = "diverge" # 方案发散:生成多种机制组合方案 SELECT = "select" # 方案选择:用户选择具体方案 ELABORATE = "elaborate" # 深入展开:对选定方案进行详细设计 ITERATE = "iterate" # 迭代优化:基于反馈优化方案 # 方案提取正则 _PROPOSAL_BLOCK_RE = re.compile( r"(?:###?\s*)?(?:方案|选项|Option)\s*([A-Z\d一二三四五六七八九十]+)[::\s]*(.+?)(?=(?:###?\s*)?(?:方案|选项|Option)\s*[A-Z\d一二三四五六七八九十]+[::\s]|$)", re.DOTALL | re.IGNORECASE ) # 理解确认问题提取 _CLARIFY_QUESTION_RE = re.compile( r"(?:❓|🤔|【确认】|【问题】|\[确认\]|\[问题\]|请确认|请问|是否是指)\s*(.+?\?)", re.DOTALL ) def extract_proposals(text: str) -> List[Dict[str, Any]]: """ 从模型输出中提取多个候选方案 返回: [{"id": "A", "title": "...", "description": "...", "highlights": [...]}] """ if not text: return [] proposals = [] matches = _PROPOSAL_BLOCK_RE.findall(text) for idx, (proposal_id, content) in enumerate(matches): content = content.strip() # 提取标题(第一行或冒号前的部分) lines = content.split("\n") title = lines[0].strip() if lines else f"方案 {proposal_id}" # 清理标题中的 markdown 标记 title = re.sub(r"^[#\-\*]+\s*", "", title) title = re.sub(r"\*+", "", title) # 提取描述 description = "\n".join(lines[1:]).strip() if len(lines) > 1 else "" # 提取亮点/创新点 highlights = [] highlight_patterns = [ r"[★✦⭐🌟]\s*(.+)", r"(?:创新点|亮点|特色)[::]\s*(.+)", r"[-•]\s*(?:创新|特色|核心)[::]\s*(.+)", ] for pattern in highlight_patterns: hl_matches = re.findall(pattern, content) highlights.extend([h.strip() for h in hl_matches]) proposals.append({ "id": proposal_id.strip(), "title": title[:50], # 限制标题长度 "description": description[:200] + ("..." if len(description) > 200 else ""), "highlights": highlights[:3], # 最多3个亮点 "full_content": content, }) return proposals def extract_clarify_questions(text: str) -> List[str]: """ 从模型输出中提取需要用户确认的问题 """ if not text: return [] questions = [] matches = _CLARIFY_QUESTION_RE.findall(text) for q in matches: q = q.strip() if q and len(q) > 5: # 过滤太短的 questions.append(q) # 也尝试提取 open_questions 中的内容 try: ds_obj, _ = extract_design_state(text) if ds_obj and "open_questions" in ds_obj: for q in ds_obj.get("open_questions", []): if isinstance(q, str) and q not in questions: questions.append(q) except Exception: pass return questions[:5] # 最多5个问题 def detect_interaction_phase(text: str, ds_obj: Optional[Dict[str, Any]] = None) -> str: """ 根据模型输出内容检测当前交互阶段 """ if not text: return InteractionPhase.INITIAL text_lower = text.lower() # 检测是否有多个方案供选择 proposals = extract_proposals(text) if len(proposals) >= 2: return InteractionPhase.DIVERGE # 检测是否有需要确认的问题 questions = extract_clarify_questions(text) if questions: return InteractionPhase.UNDERSTAND # 检测是否有完整的 DesignState 且 READY_TO_GENERATE ready = extract_ready_to_generate(text) if ready is True: return InteractionPhase.ELABORATE # 检测是否在迭代中 if ds_obj and ds_obj.get("mechanics"): return InteractionPhase.ITERATE return InteractionPhase.INITIAL def format_proposals_for_display(proposals: List[Dict[str, Any]]) -> str: """ 将方案列表格式化为 Markdown 显示 """ if not proposals: return "" lines = ["## 🎯 请选择一个方案深入展开\n"] for p in proposals: lines.append(f"### 方案 {p['id']}: {p['title']}") if p.get("highlights"): for hl in p["highlights"]: lines.append(f" ✦ {hl}") if p.get("description"): lines.append(f"\n{p['description'][:150]}...") lines.append("") lines.append("\n💡 请在下方选择方案编号,或输入「其他」描述你的想法。") return "\n".join(lines) def generate_phase_prompt_hint(phase: str, ds_obj: Optional[Dict[str, Any]] = None) -> str: """ 根据当前阶段生成提示词补充 """ hints = { InteractionPhase.INITIAL: "", InteractionPhase.UNDERSTAND: ( "\n\n【阶段提示】当前处于「理解确认」阶段。" "请确保你完全理解用户描述的已有玩法,如有不明确之处请提出确认问题。" ), InteractionPhase.DIVERGE: ( "\n\n【阶段提示】当前处于「方案发散」阶段。" "请发散思维,给出 2-4 种不同方向的机制组合方案,每个方案需包含:\n" "1. 方案编号(A/B/C/D)\n" "2. 方案名称\n" "3. 核心创新点(1-2句话)\n" "4. 机制组合简述\n" "不要深入展开,等用户选择后再详细设计。" ), InteractionPhase.SELECT: ( "\n\n【阶段提示】用户已选择方案,请对该方案进行深入展开设计。" ), InteractionPhase.ELABORATE: ( "\n\n【阶段提示】当前处于「深入展开」阶段。" "请输出完整的玩法设计(含 DesignState、mGDL、自检报告)。" ), InteractionPhase.ITERATE: ( "\n\n【阶段提示】当前处于「迭代优化」阶段。" "请基于用户反馈对现有方案进行最小修改。" ), } return hints.get(phase, "") def create_phase_state() -> Dict[str, Any]: """创建阶段状态(用于 Gradio State)""" return { "current_phase": InteractionPhase.INITIAL, "confirmed_understanding": [], # 已确认的理解点 "proposals": [], # 当前可选方案 "selected_proposal": None, # 用户选择的方案 "phase_history": [], # 阶段历史 } def update_phase_state( phase_state: Dict[str, Any], new_phase: str, proposals: Optional[List[Dict[str, Any]]] = None, selected: Optional[str] = None, confirmed: Optional[List[str]] = None, ) -> Dict[str, Any]: """更新阶段状态""" state = phase_state.copy() if phase_state else create_phase_state() # 记录阶段变化历史 if state["current_phase"] != new_phase: state["phase_history"].append({ "from": state["current_phase"], "to": new_phase, "timestamp": datetime.now().strftime("%H:%M:%S"), }) state["current_phase"] = new_phase if proposals is not None: state["proposals"] = proposals if selected is not None: state["selected_proposal"] = selected if confirmed is not None: state["confirmed_understanding"].extend(confirmed) return state def get_phase_display_name(phase: str) -> str: """获取阶段的显示名称""" names = { InteractionPhase.INITIAL: "🚀 初始", InteractionPhase.UNDERSTAND: "🔍 理解确认", InteractionPhase.DIVERGE: "💡 方案发散", InteractionPhase.SELECT: "✅ 方案选择", InteractionPhase.ELABORATE: "📝 深入展开", InteractionPhase.ITERATE: "🔄 迭代优化", } return names.get(phase, phase) # ==================== 细粒度范围控制 ==================== # DesignState 可控字段定义 CONTROLLABLE_FIELDS = { "new_variant_name": {"label": "玩法名称", "category": "基础信息", "lockable": True}, "base_variants": {"label": "底座玩法", "category": "基础信息", "lockable": True}, "fusion_variants": {"label": "融合玩法", "category": "基础信息", "lockable": True}, "game_variant": {"label": "游戏模式", "category": "游戏规则", "lockable": True}, "players": {"label": "玩家人数", "category": "游戏规则", "lockable": True}, "scoring_mode": {"label": "计分模式", "category": "计分系统", "lockable": True}, "tileset": {"label": "牌组配置", "category": "游戏规则", "lockable": True}, "core_constraints": {"label": "核心约束", "category": "游戏规则", "lockable": False}, "mechanics": {"label": "机制列表", "category": "创新机制", "lockable": False}, "open_questions": {"label": "待定问题", "category": "其他", "lockable": False}, } # 字段分类 FIELD_CATEGORIES = { "基础信息": ["new_variant_name", "base_variants", "fusion_variants"], "游戏规则": ["game_variant", "players", "tileset", "core_constraints"], "计分系统": ["scoring_mode"], "创新机制": ["mechanics"], "其他": ["open_questions"], } class ScopeConstraint: """范围约束类型""" SOFT = "soft" # 软约束:检测到越界时提示,但不阻断 HARD = "hard" # 硬约束:检测到越界时阻断并要求重做 def create_scope_config() -> Dict[str, Any]: """创建默认的范围配置""" return { "mode": "preset", # preset(预设模式)或 custom(自定义模式) "preset": "自由迭代(仍保最小修改)", # 预设选项 "constraint_level": ScopeConstraint.SOFT, # 约束级别 "locked_fields": [], # 锁定的顶层字段 "allowed_fields": [], # 允许修改的顶层字段(custom 模式下使用) "locked_mechanics": [], # 锁定的机制名称(不允许修改这些机制) "allowed_mechanics": [], # 只允许修改这些机制(为空表示不限制) } def get_scope_preset_options() -> List[str]: """获取预设范围选项""" return [ "自由迭代(仍保最小修改)", "仅优化创新机制", "仅优化计分与番型", "仅优化流程与阶段", "仅修复校验问题", "锁定核心(仅微调细节)", ] def get_allowed_fields_for_preset(preset: str) -> List[str]: """根据预设获取允许修改的字段""" presets = { "自由迭代(仍保最小修改)": list(CONTROLLABLE_FIELDS.keys()), "仅优化创新机制": ["mechanics", "open_questions", "core_constraints"], "仅优化计分与番型": ["scoring_mode", "open_questions"], "仅优化流程与阶段": ["game_variant", "open_questions"], "仅修复校验问题": ["mechanics", "tileset", "core_constraints", "players", "game_variant", "scoring_mode", "open_questions"], "锁定核心(仅微调细节)": ["open_questions", "core_constraints"], } return presets.get(preset, list(CONTROLLABLE_FIELDS.keys())) def get_forbidden_fields_for_preset(preset: str) -> List[str]: """根据预设获取禁止修改的字段""" if preset == "自由迭代(仍保最小修改)": return [] allowed = set(get_allowed_fields_for_preset(preset)) all_fields = set(CONTROLLABLE_FIELDS.keys()) return list(all_fields - allowed) def validate_scope_compliance( prev: Dict[str, Any], cur: Dict[str, Any], scope_config: Dict[str, Any], ) -> Dict[str, Any]: """ 验证变更是否符合范围约束 返回: { "compliant": bool, # 是否符合约束 "violations": [ # 违规列表 {"field": "xxx", "type": "field_locked|field_forbidden|mechanic_locked", "detail": "..."} ], "warnings": [], # 警告(软约束时) "summary": "..." # 摘要文本 } """ if not prev or not cur: return {"compliant": True, "violations": [], "warnings": [], "summary": "无变更"} result = { "compliant": True, "violations": [], "warnings": [], "summary": "", } mode = scope_config.get("mode", "preset") constraint_level = scope_config.get("constraint_level", ScopeConstraint.SOFT) # 确定允许和禁止的字段 if mode == "preset": preset = scope_config.get("preset", "自由迭代(仍保最小修改)") allowed_fields = set(get_allowed_fields_for_preset(preset)) forbidden_fields = set(get_forbidden_fields_for_preset(preset)) else: allowed_fields = set(scope_config.get("allowed_fields", [])) forbidden_fields = set(scope_config.get("locked_fields", [])) locked_mechanics = set(scope_config.get("locked_mechanics", [])) allowed_mechanics = set(scope_config.get("allowed_mechanics", [])) # 检查顶层字段变更 changed_fields = diff_keys(prev, cur) for field in changed_fields: # 检查是否在禁止列表中 if field in forbidden_fields: violation = { "field": field, "type": "field_forbidden", "detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」不在允许修改范围内", } result["violations"].append(violation) continue # 检查是否被显式锁定 if field in scope_config.get("locked_fields", []): violation = { "field": field, "type": "field_locked", "detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」已被锁定", } result["violations"].append(violation) continue # 检查机制级别变更 if "mechanics" in changed_fields: mech_changes = diff_mechanics(prev, cur) for change in mech_changes: # 提取机制名称 mech_name = "" if "新增机制:" in change: mech_name = change.replace("新增机制: ", "").strip() elif "删除机制:" in change: mech_name = change.replace("删除机制: ", "").strip() elif "机制变更:" in change: mech_name = change.split(" 字段=")[0].replace("机制变更: ", "").strip() # 检查机制是否被锁定 if mech_name and mech_name in locked_mechanics: violation = { "field": "mechanics", "type": "mechanic_locked", "detail": f"机制「{mech_name}」已被锁定,不允许修改", } result["violations"].append(violation) # 检查是否只允许修改特定机制 if allowed_mechanics and mech_name and mech_name not in allowed_mechanics: # 只对修改和删除进行限制,新增通常是允许的 if "删除机制:" in change or "机制变更:" in change: violation = { "field": "mechanics", "type": "mechanic_not_allowed", "detail": f"机制「{mech_name}」不在允许修改的机制列表中", } result["violations"].append(violation) # 根据约束级别处理违规 if result["violations"]: if constraint_level == ScopeConstraint.HARD: result["compliant"] = False result["summary"] = f"❌ 发现 {len(result['violations'])} 处范围越界(硬约束),请重新生成" else: result["warnings"] = result["violations"] result["violations"] = [] result["summary"] = f"⚠️ 发现 {len(result['warnings'])} 处范围越界(软约束),建议检查" else: result["summary"] = "✅ 变更符合范围约束" return result def format_scope_violations(validation_result: Dict[str, Any]) -> str: """格式化范围违规为 Markdown""" if validation_result.get("compliant", True) and not validation_result.get("warnings"): return validation_result.get("summary", "") lines = [validation_result.get("summary", "")] items = validation_result.get("violations", []) or validation_result.get("warnings", []) if items: lines.append("") for item in items: field_label = CONTROLLABLE_FIELDS.get(item["field"], {}).get("label", item["field"]) lines.append(f" • **{field_label}**: {item['detail']}") return "\n".join(lines) def get_mechanics_from_state(ds_obj: Dict[str, Any]) -> List[str]: """从 DesignState 中提取机制名称列表""" if not ds_obj: return [] mechanics = ds_obj.get("mechanics", []) if not isinstance(mechanics, list): return [] names = [] for m in mechanics: if isinstance(m, dict) and "name" in m: names.append(m["name"]) return names def generate_scope_prompt_hint(scope_config: Dict[str, Any], ds_obj: Optional[Dict[str, Any]] = None) -> str: """生成范围约束的提示词补充""" mode = scope_config.get("mode", "preset") if mode == "preset": preset = scope_config.get("preset", "自由迭代(仍保最小修改)") if preset == "自由迭代(仍保最小修改)": return "" allowed = get_allowed_fields_for_preset(preset) forbidden = get_forbidden_fields_for_preset(preset) allowed_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in allowed] forbidden_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in forbidden] hint = f"\n\n【范围约束】当前模式:{preset}\n" hint += f"允许修改:{', '.join(allowed_labels)}\n" if forbidden_labels: hint += f"禁止修改:{', '.join(forbidden_labels)}\n" return hint # 自定义模式 locked_fields = scope_config.get("locked_fields", []) locked_mechanics = scope_config.get("locked_mechanics", []) if not locked_fields and not locked_mechanics: return "" hint = "\n\n【范围约束】自定义模式\n" if locked_fields: locked_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in locked_fields] hint += f"锁定字段(禁止修改):{', '.join(locked_labels)}\n" if locked_mechanics: hint += f"锁定机制(禁止修改):{', '.join(locked_mechanics)}\n" return hint