| |
| """ |
| normalizer_v2.py — 语义标准化(兼容多布局,提取 mechanics 名称/阶段/时机) |
| """ |
| from __future__ import annotations |
| from typing import Any, Dict, List |
| from copy import deepcopy |
|
|
| def _as_list(x): |
| if x is None: return [] |
| return x if isinstance(x, list) else [x] |
|
|
| def _as_dict(x): |
| return x if isinstance(x, dict) else {} |
|
|
| def _scalar(x: Any) -> str: |
| if isinstance(x, str): return x |
| if isinstance(x, (int, float)): return str(x) |
| if isinstance(x, dict): |
| if "_" in x and isinstance(x["_"], list) and x["_"] and isinstance(x["_"][0], (str,int,float)): |
| return _scalar(x["_"][0]) |
| if "name" in x and isinstance(x["name"], str): return x["name"] |
| if len(x)==1: |
| k, v = next(iter(x.items())) |
| if v in (None, {}) and isinstance(k, str): |
| return k |
| if isinstance(x, list) and x: |
| return _scalar(x[0]) |
| return "" |
|
|
| def _is_standard_mechanic_structure(node: Any) -> bool: |
| """ |
| 检查一个节点是否是标准的 {"mechanic": {...}} 结构。 |
| 这是 GDL 中 (mechanic ...) 被 _to_obj 解析后的直接结果。 |
| """ |
| return isinstance(node, dict) and len(node) == 1 and 'mechanic' in node and isinstance(node['mechanic'], dict) |
|
|
| def _extract_mechanics_from_sm(sm_node: Any) -> List[Dict[str, Any]]: |
| """ |
| 专门从 special_mechanics 的节点中提取 mechanic 定义。 |
| sm_node 是 special_mechanics 键对应的值。 |
| 这个函数严格处理 special_mechanics 的内容,避免误判内部结构。 |
| """ |
| mechanics = [] |
| |
| sm_dict = _as_dict(sm_node) |
|
|
| if 'mechanic' in sm_dict: |
| mech_content = sm_dict.get("mechanic") |
| if isinstance(mech_content, dict): |
| |
| |
| for name, definition in mech_content.items(): |
| if isinstance(definition, dict): |
| |
| mechanic_def = definition.copy() |
| |
| if 'name' not in mechanic_def: |
| mechanic_def['name'] = name |
| mechanics.append({"mechanic": mechanic_def}) |
| elif isinstance(mech_content, list): |
| |
| for item in mech_content: |
| |
| |
| if isinstance(item, dict) and ('name' in item or ('_' in item and isinstance(item.get('_'), list) and item['_'] and isinstance(item['_'][0], str))): |
| |
| mechanics.append({"mechanic": item}) |
| elif isinstance(sm_node, list): |
| |
| for item in sm_node: |
| if _is_standard_mechanic_structure(item): |
| mechanics.append(item) |
| elif _is_standard_mechanic_structure(sm_node): |
| |
| mechanics.append(sm_node) |
| |
| |
|
|
| return mechanics |
|
|
|
|
| def _walk_collect_mechs(node, bucket, inside_special_mechanics=False): |
| """ |
| 递归收集 special_mechanics / actions.special |
| inside_special_mechanics 标志用于防止在 mechanic 内部结构中递归查找其他 mechanic。 |
| """ |
| if isinstance(node, dict): |
| |
| if "special_mechanics" in node: |
| sm_node = node.get("special_mechanics") |
| |
| extracted = _extract_mechanics_from_sm(sm_node) |
| bucket.extend(extracted) |
| |
| |
| |
| elif "actions" in node and not inside_special_mechanics: |
| a = _as_dict(node.get("actions")) |
| special_actions = _as_list(a.get("special")) |
| |
| for sa in special_actions: |
| |
| |
| if isinstance(sa, dict) and ("name" in sa or ("_" in sa and isinstance(sa.get("_"), list) and sa["_"] and isinstance(sa["_"][0], str))): |
| bucket.append({"mechanic": sa}) |
| |
| |
| |
| |
| for k, v in node.items(): |
| if k in ("special_mechanics", "actions"): |
| continue |
| |
| if k in ("transfer_path", "visibility_change", "params"): |
| continue |
| _walk_collect_mechs(v, bucket, inside_special_mechanics=inside_special_mechanics) |
|
|
| elif isinstance(node, list): |
| |
| |
| for v in node: |
| _walk_collect_mechs(v, bucket, inside_special_mechanics) |
|
|
|
|
| def _collect_mechanics(parsed: Dict[str, Any]) -> List[Dict[str, Any]]: |
| items: List[Any] = [] |
| |
| _walk_collect_mechs(parsed, items, inside_special_mechanics=False) |
| mechs: List[Dict[str, Any]] = [] |
| for i, raw in enumerate(items): |
| m = _as_dict(raw) |
| if not m: continue |
| |
| m_content = m |
| if len(m) == 1 and 'mechanic' in m and isinstance(m['mechanic'], dict): |
| m_content = m['mechanic'] |
| else: |
| |
| |
| |
| continue |
|
|
| |
| name = _scalar(m_content.get("name") or (m_content.get("_") or [None])[0] or "") |
| if not name: |
| name = f"mechanic@{i}" |
| phase = _scalar(m_content.get("phase") or "playing_phase") |
| timing = _scalar(m_content.get("timing") or "") |
| |
| standard_phase = phase.replace("_phase", "") |
| mech = { |
| "name": name, |
| "enabled": True if _scalar(m_content.get("enabled")) in ("true","1","yes","True") else True, |
| "description": _scalar(m_content.get("description")), |
| "phase": standard_phase, |
| "timing": timing, |
| "trigger_condition": _scalar(m_content.get("trigger_condition")), |
| "raw_definition": { |
| k: v for k, v in m_content.items() |
| if k not in ("name","enabled","description","phase","timing","trigger_condition") |
| } |
| } |
| mechs.append(mech) |
| return mechs |
|
|
| def _collect_mechanics_from_sexpr(parsed: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """ |
| 直接从 S-expression 中收集 mechanics,避免 _to_obj 的键合并问题 |
| """ |
| from gdl_parser_v2 import parse_sexpr |
| |
| mechanics = [] |
| |
| def find_and_extract_mechanics(obj, path=''): |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| if k == 'special_mechanics': |
| |
| if isinstance(v, dict) and 'mechanic' in v: |
| mech_content = v['mechanic'] |
| if isinstance(mech_content, dict): |
| |
| for name, definition in mech_content.items(): |
| if isinstance(definition, dict): |
| mechanics.append(_process_mechanic_definition(name, definition)) |
| else: |
| find_and_extract_mechanics(v, f'{path}.{k}' if path else k) |
| elif isinstance(obj, list): |
| for i, item in enumerate(obj): |
| find_and_extract_mechanics(item, f'{path}[{i}]') |
| |
| find_and_extract_mechanics(parsed) |
| return mechanics |
|
|
| def _collect_mechanics_from_raw_sexpr(gdl_text: str) -> List[Dict[str, Any]]: |
| """ |
| 直接从原始 GDL 文本中收集 mechanics,避免解析器的键合并问题 |
| """ |
| from gdl_parser_v2 import parse_sexpr |
| |
| mechanics = [] |
| |
| |
| lines = gdl_text.split('\n') |
| start_idx = -1 |
| end_idx = -1 |
| paren_count = 0 |
| |
| for i, line in enumerate(lines): |
| if '(special_mechanics' in line: |
| start_idx = i |
| paren_count = line.count('(') - line.count(')') |
| break |
| |
| if start_idx != -1: |
| for i in range(start_idx + 1, len(lines)): |
| paren_count += lines[i].count('(') - lines[i].count(')') |
| if paren_count == 0: |
| end_idx = i |
| break |
| |
| if start_idx != -1 and end_idx != -1: |
| special_mechanics_lines = lines[start_idx:end_idx + 1] |
| special_mechanics_text = '\n'.join(special_mechanics_lines) |
| |
| try: |
| sexpr = parse_sexpr(special_mechanics_text) |
| if len(sexpr) > 0 and isinstance(sexpr[0], list): |
| for item in sexpr[0]: |
| if isinstance(item, list) and len(item) > 0 and item[0] == 'mechanic': |
| |
| name = item[1] if len(item) > 1 else "unknown" |
| |
| |
| definition = {} |
| for i in range(2, len(item)): |
| if isinstance(item[i], list) and len(item[i]) >= 2: |
| key = item[i][0] |
| value = item[i][1] if len(item[i]) == 2 else item[i][1:] |
| definition[key] = value |
| |
| mechanics.append(_process_mechanic_definition(name, definition)) |
| except Exception as e: |
| print(f"Error parsing special_mechanics: {e}") |
| |
| return mechanics |
|
|
| def _process_mechanic_definition(name: str, definition: Dict[str, Any]) -> Dict[str, Any]: |
| """处理单个 mechanic 定义""" |
| phase = _scalar(definition.get("phase") or "playing_phase") |
| timing = _scalar(definition.get("timing") or "") |
| standard_phase = phase.replace("_phase", "") |
| |
| return { |
| "name": name, |
| "enabled": True if _scalar(definition.get("enabled")) in ("true","1","yes","True") else True, |
| "description": _scalar(definition.get("description")), |
| "phase": standard_phase, |
| "timing": timing, |
| "trigger_condition": _scalar(definition.get("trigger_condition")), |
| "raw_definition": { |
| k: v for k, v in definition.items() |
| if k not in ("name","enabled","description","phase","timing","trigger_condition") |
| } |
| } |
|
|
| def normalize_ir(parsed: Dict[str, Any], gdl_text: str = None) -> Dict[str, Any]: |
| p = _as_dict(parsed) |
| game = _as_dict(p.get("game") or p) |
|
|
| out: Dict[str, Any] = {"game": {}} |
|
|
| |
| nm = _scalar(game.get("name") or game.get("_") or "") |
| out["game"]["name"] = nm or "UnnamedGame" |
|
|
| |
| players_count = game.get("players", 0) |
| out["game"]["players_count"] = players_count |
|
|
| |
| roles: List[Dict[str, Any]] = [] |
| if "roles" in game: |
| flat = [] |
| for r in _as_list(game["roles"]): |
| if isinstance(r, list): flat.extend(r) |
| else: flat.append(r) |
| for r in flat: |
| rd = _as_dict(r) |
| if "role" in rd and isinstance(rd["role"], dict): |
| u = rd["role"].get("_") |
| if isinstance(u, list) and u: |
| name = _scalar(u[0]); cnt = 1 |
| if len(u) >= 2: |
| try: cnt = int(u[1]) |
| except: cnt = 1 |
| roles.append({"name": name or "Player", "count": max(1, cnt)}); continue |
| name = _scalar(rd.get("name") or rd.get("role") or "Player") |
| cnt = rd.get("count", 1) |
| try: cnt=int(cnt) |
| except: cnt=1 |
| roles.append({"name": name, "count": max(1,cnt)}) |
| out["game"]["roles"] = roles or [{"name":"Player","count": game.get("players") or 0}] |
|
|
| |
| order = [] |
| to = _as_list(game.get("turn_order")) |
| if to: |
| flat=[] |
| for elem in to: |
| if isinstance(elem, list): flat.extend(elem) |
| else: flat.append(elem) |
| |
| if flat and isinstance(flat[0], dict) and len(flat[0])==1: |
| k, v = next(iter(flat[0].items())) |
| seq = [k] |
| if isinstance(v, dict) and isinstance(v.get("_"), list): seq.extend(v.get("_")) |
| order = [ _scalar(x) for x in seq ] |
| else: |
| order = [ _scalar(x) for x in flat ] |
| if order: |
| out["turns"] = {"order": order} |
|
|
| |
| phases = _as_list(game.get("phases")) |
| phase_names: List[str] = [] |
| if phases: |
| for sub in phases: |
| xs = _as_list(sub) if isinstance(sub, list) else [sub] |
| for x in xs: |
| s = _scalar(x) |
| if s: |
| phase_names.append(s) |
| elif isinstance(x, dict) and len(x)==1: |
| k = next(iter(x.keys())) |
| if isinstance(k, str) and k.endswith("_phase"): |
| |
| standard_name = k.replace("_phase", "") |
| phase_names.append(standard_name) |
| out["phases"] = phase_names |
|
|
| |
| deck_info = _as_dict(game.get("deck")) |
| if deck_info: |
| |
| deck_type = _scalar(deck_info.get("_") or deck_info.get("type") or "Standard54") |
| shuffling = _scalar(deck_info.get("shuffling")) |
| deal_pattern = _scalar(deck_info.get("deal_pattern")) |
| out["game"]["deck"] = { |
| "type": deck_type, |
| "shuffling": shuffling, |
| "deal_pattern": deal_pattern |
| } |
|
|
| |
| setup_info = _as_dict(game.get("setup")) |
| if setup_info: |
| |
| zones_info = _as_dict(setup_info.get("zones")) |
| card_relations_info = _as_dict(setup_info.get("card_relations")) |
| deal_count = setup_info.get("deal") |
|
|
| if zones_info: |
| zone_defs = [] |
| if "hand" in zones_info: |
| zone_defs.append({"type": "hand", **_as_dict(zones_info.get("hand"))}) |
| if "field" in zones_info: |
| zone_defs.append({"type": "field", **_as_dict(zones_info.get("field"))}) |
| if "discard_pile" in zones_info: |
| zone_defs.append({"type": "discard_pile", **_as_dict(zones_info.get("discard_pile"))}) |
| if "main_deck" in zones_info: |
| zone_defs.append({"type": "main_deck", **_as_dict(zones_info.get("main_deck"))}) |
| if "special_deck" in zones_info: |
| |
| sd = zones_info.get("special_deck") |
| if isinstance(sd, dict): |
| |
| |
| |
| for name, details in sd.items(): |
| if isinstance(details, dict): |
| zone_defs.append({"type": "special_deck", "name": name, **details}) |
| else: |
| zone_defs.append({"type": "special_deck", "name": name, "initial_cards": details}) |
| elif isinstance(sd, list): |
| |
| for item in sd: |
| if isinstance(item, dict) and len(item) == 1: |
| name, details = next(iter(item.items())) |
| zone_defs.append({"type": "special_deck", "name": name, **_as_dict(details)}) |
| out["zones"] = zone_defs |
|
|
| if card_relations_info: |
| out["card_relations"] = card_relations_info |
|
|
| if deal_count is not None: |
| out["setup"] = {"deal": deal_count} |
|
|
| |
| combinations_info = _as_dict(game.get("combinations")) |
| if combinations_info: |
| |
| custom_combs = [] |
| if "custom_combination" in combinations_info: |
| raw_customs = _as_list(combinations_info.get("custom_combination")) |
| for cc in raw_customs: |
| cc_dict = _as_dict(cc) |
| if "_" in cc_dict and isinstance(cc_dict["_"], list) and cc_dict["_"]: |
| name = _scalar(cc_dict["_"][0]) |
| spec = {k: v for k, v in cc_dict.items() if k != "_"} |
| custom_combs.append({"name": name, "spec": spec}) |
| elif "name" in cc_dict: |
| name = _scalar(cc_dict.get("name")) |
| spec = {k: v for k, v in cc_dict.items() if k != "name"} |
| custom_combs.append({"name": name, "spec": spec}) |
| out["combinations"] = {"custom": custom_combs} |
|
|
|
|
| |
| |
| if gdl_text: |
| try: |
| out["special_mechanics"] = _collect_mechanics_from_raw_sexpr(gdl_text) |
| if not out["special_mechanics"]: |
| |
| out["special_mechanics"] = _collect_mechanics({"game": game}) |
| except: |
| |
| out["special_mechanics"] = _collect_mechanics({"game": game}) |
| else: |
| |
| out["special_mechanics"] = _collect_mechanics({"game": game}) |
|
|
| |
| actions = None |
| if "actions" in game: |
| actions = deepcopy(game["actions"]) |
| elif "setup" in game: |
| setup = _as_list(game["setup"]) |
| for item in setup: |
| if isinstance(item, dict) and "actions" in item: |
| actions = deepcopy(item["actions"]) |
| break |
| |
| if actions: |
| out["actions"] = actions |
|
|
| |
| for k in ("visibility","invariants","scoring","extensions"): |
| if k in p: out[k] = deepcopy(p[k]) |
| elif k in game: out[k] = deepcopy(game[k]) |
|
|
| return out |
|
|