# -*- coding: utf-8 -*- """ mapper_v2.py — 将 normalizer_v2 输出映射为 v0.95 IR(schema 友好;兼容 UNO/七夕) """ from __future__ import annotations from typing import Any, Dict, List from copy import deepcopy import re REQUIRED = ["meta","players","cards","zones","visibility","combinations","comparison","actions","phases","turns","ending","scoring"] ZONE_RE = re.compile(r"^(hand:[A-Za-z0-9_:-]+|field|main_deck|discard_pile|reserve_zone|public_pool|item_deck:[A-Za-z0-9_:-]+|history_deck)$") def _as_dict(x): return x if isinstance(x,dict) else {} def _as_list(x): return x if isinstance(x,list) else ([] if x is None else [x]) def _scalar(x): if isinstance(x,str): return x if isinstance(x,(int,float)): return str(x) if isinstance(x,dict): if "_" in x and isinstance(x["_"], list) and x["_"]: return _scalar(x["_"][0]) if "name" in x and isinstance(x["name"], str): return x["name"] if len(x)==1: k,v = next(iter(x.items())) if v in (None,{}) and isinstance(k,str): return k if isinstance(x,list) and x: return _scalar(x[0]) return "" def _uniquify(seq: List[str]) -> List[str]: seen={}; out=[] for s in map(str, seq): if s not in seen: seen[s]=0; out.append(s) else: seen[s]+=1; out.append(f"{s}_{seen[s]}") return out def _phase_to_schema(s: str) -> str: # Mapping GDL phase names to schema phase names mp={ "deal":"setup", # GDL 'deal' often maps to setup for initialization tasks, but can also be a separate 'deal' phase "setup":"setup", "bid":"bidding", "bidding":"bidding", "double":"doubling", "doubling":"doubling", "initiative":"setup", # Map GDL's initiative_phase to schema's setup "grouping":"grouping", "play":"playing", "playing":"playing", "playing_phase":"play", # Map GDL's playing_phase to schema's play "settle":"settlement", "settlement":"settlement", "settlement_phase":"settlement" } return mp.get(s, s) def _derive_players(nz: Dict[str,Any]) -> Dict[str,Any]: roles=_as_list(_as_dict(nz.get("game")).get("roles")) order=_as_dict(nz.get("turns")).get("order") or [] ids = _uniquify([_scalar(x) for x in order]) if order else [] if not ids and roles: tmp=[] for r in roles: rd=_as_dict(r) name=_scalar(rd.get("name") or rd.get("role") or "Player") cnt=int(rd.get("count",1) or 1) tmp += [name]*max(1,cnt) ids=_uniquify(tmp) if not ids: ids=["P0","P1","P2"] # --- Modification: Create player instances based on roles and counts, and store in roles.players --- # Schema v0.95 expects count and roles list with name, count, and optionally players. out_roles = [] current_id_idx = 0 for r in roles: rd = _as_dict(r) role_name = _scalar(rd.get("name") or "Player") role_count = int(rd.get("count", 1) or 1) # Generate unique player IDs for this role role_players = [] for i in range(role_count): if current_id_idx < len(ids): player_id = ids[current_id_idx] role_players.append(player_id) current_id_idx += 1 else: # Fallback if ids list is shorter than expected player_id = f"{role_name}_{i}" if i > 0 else role_name role_players.append(player_id) # Add the role definition (name, count, players) to the output roles list out_roles.append({"name": role_name, "count": role_count, "players": role_players}) # The actual player IDs are used for turns.order and zone generation return {"count": len(ids), "roles": out_roles, "_player_ids": ids} # Removed _role_definitions as not needed for schema def _expand_zones(zs: List[Dict[str, Any]], pids: List[str]) -> List[str]: out=[] zone_types_map = { "hand": "hand:{pid}", "field": "field", "discard_pile": "discard_pile", "main_deck": "main_deck", "special_deck": "item_deck:{name}" # Map special_deck to item_deck: } for z_def in zs or []: z_type = z_def.get("type") if z_type in zone_types_map: template = zone_types_map[z_type] if z_type == "hand": # Expand hand for each player ID for pid in pids: out.append(template.format(pid=pid)) elif z_type == "special_deck": # Use the name from the definition name = z_def.get("name", "Unknown") out.append(template.format(name=name)) else: # Use the template directly for other types out.append(template) # Add common base zones if not already present base = ["field","discard_pile","main_deck"] for b in base: if b not in out: out.append(b) # Remove duplicates while preserving order seen=set(); uniq=[] for z in out: if z not in seen: seen.add(z); uniq.append(z) return uniq def _expand_vis_hand_star(vis: Dict[str,Any], pids: List[str]) -> Dict[str,Any]: vis=_as_dict(vis); vis.setdefault("defaults", {}); by=_as_dict(vis.get("by_zone")) if "hand:*" in by: cfg=by.pop("hand:*") for pid in pids: by[f"hand:{pid}"]=cfg vis["by_zone"]=by; return vis def _ensure_cards(nz: Dict[str,Any]) -> Dict[str,Any]: cards=_as_dict(nz.get("cards")) ranks=_as_list(cards.get("ranks")); suits=_as_list(cards.get("suits")) if not ranks or not suits: # --- Modification: Derive from card_relations if available --- card_rels = _as_dict(nz.get("card_relations")) if card_rels: # Derive from card_values card_vals = _as_list(card_rels.get("card_values")) # Map common symbols to integers symbol_map = { "J": 11, "Q": 12, "K": 13, "A": 14, "2": 15 } ranks = [] for val in card_vals: if isinstance(val, int): ranks.append(val) elif isinstance(val, str) and val in symbol_map: ranks.append(symbol_map[val]) else: ranks.append(_scalar(val)) # Fallback # Derive from suit_relations suit_rels = _as_dict(card_rels.get("suit_relations")) suit_order = _as_list(suit_rels.get("order")) suits = [s for s in suit_order if isinstance(s, str)] or ["Spade","Heart","Club","Diamond"] # Determine jokers and suitless_ranks jokers = {} suitless_ranks = [] # Standard54 implies 2 jokers if "2" in card_vals or 15 in ranks: # Assuming '2' is always high and joker-like # Standard54 has small joker 16, big joker 17 # In our mapping, if 2 is last, it might be 15. Let's check. # Actually, Standard54 usually has fixed values for jokers regardless of card_values order. # Let's assume standard mapping for Standard54 deck type. deck_info = _as_dict(_as_dict(nz.get("game")).get("deck")) if deck_info.get("type") == "Standard54": jokers = {"small": 16, "big": 17} suitless_ranks = [16, 17] # Add jokers to ranks if not already present if 16 not in ranks: ranks.append(16) if 17 not in ranks: ranks.append(17) # --- Modification: Ensure suitless_ranks, copies_per_deck, num_decks are included --- cards = { "ranks": sorted(ranks), "suits": suits, "jokers": jokers, "suitless_ranks": suitless_ranks, "copies_per_deck": 1, # Assuming 1 copy for Standard54 "num_decks": 1 # Assuming 1 deck } else: # Fallback to standard 54 ranks=list(range(3,18)) # 3..A(14), 小王16, 大王17 suits=["Spade","Heart","Club","Diamond"] cards={"ranks":ranks, "suits":suits, "jokers":{"small":16,"big":17}} else: # If ranks and suits were already provided, ensure the missing fields are added if possible # This path might be taken if the initial cards dict had some info but not all. # We can infer suitless_ranks from jokers, and assume defaults for copies/num_decks if not present. if "jokers" in cards and "suitless_ranks" not in cards: jokers = cards.get("jokers", {}) small_j = jokers.get("small") big_j = jokers.get("big") suitless_ranks = [r for r in [small_j, big_j] if r is not None] cards["suitless_ranks"] = suitless_ranks if "copies_per_deck" not in cards: cards["copies_per_deck"] = 1 if "num_decks" not in cards: cards["num_decks"] = 1 return cards def _ensure_combinations(nz: Dict[str,Any]) -> Dict[str,Any]: cmb=_as_dict(nz.get("combinations")) out={"single":{}, "pair":{}, "triple":{}, "straight":{}, "pairs_chain":{}, "airplane":{}, "triple_with_single":{}, "triple_with_pair":{}, "four_with_twoSingles":{}, "four_with_twoPairs":{}, "bomb":{}, "rocket":{}, "custom":[]} # --- Modification: Handle combinations from GDL --- # Get raw combinations from normalizer raw_combinations = _as_dict(nz.get("combinations", {})) # Handle straight and bomb with parameters if "straight" in nz.get("game", {}).get("combinations", {}): # Look for the argument, e.g., (straight 5) gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {})) if isinstance(gdl_combinations.get("straight"), (int, list)): val = gdl_combinations.get("straight") if isinstance(val, list) and val: val = val[0] # Get first argument if list out["straight"] = {"min_len": int(val)} else: out["straight"] = {} # Fallback if no arg found in GDL if "bomb" in nz.get("game", {}).get("combinations", {}): gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {})) if isinstance(gdl_combinations.get("bomb"), (int, list)): val = gdl_combinations.get("bomb") if isinstance(val, list) and val: val = val[0] out["bomb"] = {"len": int(val)} else: out["bomb"] = {} # Fallback if no arg found in GDL # Handle custom combinations from normalizer if "custom" in raw_combinations and isinstance(raw_combinations["custom"], list): out["custom"] = raw_combinations["custom"] else: # Fallback if normalizer didn't capture them correctly out["custom"] = [] # Merge with any explicitly set values in the input cmb dict (e.g., from extensions) for k in list(out.keys()): if k in cmb and isinstance(cmb[k], dict): out[k].update(cmb[k]) # Update with input values if present return out def _ensure_actions(nz: Dict[str,Any], pids: List[str] = None) -> Dict[str,Any]: actions=_as_dict(nz.get("actions")) pids = pids or [] # 解析 play 动作 play_actions = [] if "play" in actions: play_def = actions["play"] if isinstance(play_def, list): # 解析器把 play 分解成了多个字典,需要合并成一个完整的 play action play_action = _merge_play_actions(play_def, nz) if play_action: # 修复transfer_path中的zone ID if "transfer_path" in play_action: play_action["transfer_path"] = _fix_zone_ids_in_transfer_path( play_action["transfer_path"], pids) play_actions.append(play_action) elif isinstance(play_def, dict): # 单个 play 定义 play_action = _parse_play_action(play_def) if "transfer_path" in play_action: play_action["transfer_path"] = _fix_zone_ids_in_transfer_path( play_action["transfer_path"], pids) play_actions.append(play_action) # 解析 special 动作 special_actions = [] if "special" in actions: special_def = actions["special"] if isinstance(special_def, list): special_actions = _merge_special_actions(special_def) elif isinstance(special_def, dict): special_actions.append(_parse_special_action(special_def)) # 修复special动作中的transfer_path for action in special_actions: if "transfer_path" in action: action["transfer_path"] = _fix_zone_ids_in_transfer_path( action["transfer_path"], pids) # 构建输出 output = { "play": play_actions, "pass": {"transfer_path": {"from": "field", "to": "field"}}, "cleanup_trick": [{"from": "field", "to": "discard_pile", "count": "all"}], "other": {} } # 如果有 special 动作,添加到 other 中 if special_actions: # 将special动作转换为Transfer格式 special_transfers = [] for action in special_actions: if "transfer_path" in action: transfer = action["transfer_path"].copy() if "visibility_change" in action: transfer["visibility_change"] = action["visibility_change"] special_transfers.append(transfer) if special_transfers: output["other"]["special"] = special_transfers return output def _merge_play_actions(play_defs: List[Dict[str, Any]], gdl_actions: Dict[str, Any] = None) -> Dict[str, Any]: """合并被解析器分解的 play 动作定义""" merged = {} raw_def = {} # 首先尝试从play_defs中提取牌型列表 type_list = [] for play_def in play_defs: if isinstance(play_def, dict) and "type" in play_def: type_val = play_def["type"] if isinstance(type_val, dict): type_list = _extract_one_of_values(type_val) break if type_list: # 过滤掉"one_of"和其他无效值 valid_types = [t for t in type_list if t not in ["one_of", "int?", "sequence?"]] if valid_types: # Schema期望type是单个字符串,不是数组,所以取第一个有效类型 merged["type"] = valid_types[0] for play_def in play_defs: if isinstance(play_def, dict): # 检查是否是特定字段 if "type" in play_def and "type" not in merged: type_val = play_def["type"] if isinstance(type_val, dict): if "one_of" in type_val: # 处理 one_of 结构,提取所有牌型 type_list = _extract_one_of_values(type_val) merged["type"] = type_list else: merged["type"] = _scalar(type_val) else: merged["type"] = _scalar(type_val) elif "len" in play_def: # 处理 len 字段,提取简单值 len_val = _extract_simple_value(play_def["len"]) if len_val != "int?": merged["len"] = len_val elif "core" in play_def: # 处理 core 字段,提取简单值 core_val = _extract_simple_value(play_def["core"]) if core_val != "sequence?": merged["core"] = core_val elif "wings" in play_def: # 处理 wings 字段,提取简单值 wings_val = _extract_simple_value(play_def["wings"]) if wings_val != "int?": merged["wings"] = wings_val elif "transfer_path" in play_def: tp = play_def["transfer_path"] if isinstance(tp, dict): merged["transfer_path"] = tp elif isinstance(tp, list): merged["transfer_path"] = _parse_transfer_path_list(tp) elif "visibility_change" in play_def: vc = play_def["visibility_change"] merged["visibility_change"] = _parse_visibility_change(vc) else: # 其他字段保存到 raw_definition raw_def.update(play_def) # 如果没有提取到type,设置默认值 if "type" not in merged: merged["type"] = ["single", "pair", "triple", "straight", "bomb", "rocket"] # 确保visibility_change有state字段 if "visibility_change" in merged: vc = merged["visibility_change"] if "state" not in vc: vc["state"] = "visible" # 保存原始定义 if raw_def: merged["raw_definition"] = raw_def return merged def _extract_one_of_values(type_dict: Dict[str, Any]) -> List[str]: """从 one_of 结构中提取值列表""" values = [] def extract_recursive(obj): if isinstance(obj, dict): if "_" in obj: if isinstance(obj["_"], list): values.extend(obj["_"]) for k, v in obj.items(): if k != "_": # 如果key不是"_",那么key本身就是一个值 if isinstance(v, dict): if len(v) == 0: # 空字典,key就是值 values.append(k) elif "_" in v: # 有"_"字段,key是值,然后递归处理"_" values.append(k) extract_recursive(v) else: # 没有"_"字段,递归处理 extract_recursive(v) else: # v不是字典,k是值 values.append(k) elif isinstance(obj, list): for item in obj: extract_recursive(item) extract_recursive(type_dict) return values def _extract_play_types_from_gdl(gdl_actions: Dict[str, Any]) -> List[str]: """从GDL actions中提取play动作的牌型列表""" if "play" not in gdl_actions: return [] play_def = gdl_actions["play"] if isinstance(play_def, list): # 查找type字段 for item in play_def: if isinstance(item, dict) and "type" in item: type_val = item["type"] if isinstance(type_val, dict) and "one_of" in type_val: return _extract_one_of_values(type_val) elif isinstance(play_def, dict) and "type" in play_def: type_val = play_def["type"] if isinstance(type_val, dict) and "one_of" in type_val: return _extract_one_of_values(type_val) # 如果从actions中没找到,尝试从game.actions中查找 game_actions = _as_dict(_as_dict(gdl_actions.get("game", {})).get("actions", {})) if "play" in game_actions: play_def = game_actions["play"] if isinstance(play_def, list): for item in play_def: if isinstance(item, dict) and "type" in item: type_val = item["type"] if isinstance(type_val, dict) and "one_of" in type_val: return _extract_one_of_values(type_val) elif isinstance(play_def, dict) and "type" in play_def: type_val = play_def["type"] if isinstance(type_val, dict) and "one_of" in type_val: return _extract_one_of_values(type_val) return [] def _extract_simple_value(obj: Any) -> Any: """提取简单值,处理嵌套的 {"_": [value]} 结构""" if isinstance(obj, dict): if "_" in obj and isinstance(obj["_"], list) and len(obj["_"]) == 1: return obj["_"][0] elif len(obj) == 1: key, value = next(iter(obj.items())) if isinstance(value, dict) and len(value) == 0: return key return _extract_simple_value(value) return obj def _parse_visibility_change(vc: Dict[str, Any]) -> Dict[str, Any]: """解析visibility_change字段""" result = {} # 解析 to 字段 if "to" in vc: to_val = vc["to"] if isinstance(to_val, list): # 提取所有受众 audiences = [] for item in to_val: if isinstance(item, dict): # 处理 {"owner": {"_": ["teammates", "enemies"]}} 结构 for key, value in item.items(): if isinstance(value, dict) and "_" in value: audiences.extend(value["_"]) else: audiences.append(key) else: audiences.append(_scalar(item)) result["to"] = audiences else: result["to"] = [_scalar(to_val)] # 解析 state 字段 if "state" in vc: state_val = _extract_simple_value(vc["state"]) if state_val != "visible": result["state"] = state_val # 解析 on_target 字段 if "on_target" in vc: on_target_val = _extract_simple_value(vc["on_target"]) if on_target_val != "true": result["on_target"] = on_target_val return result def _parse_play_action(play_def: Dict[str, Any]) -> Dict[str, Any]: """解析单个 play 动作定义""" result = {} # 解析 type if "type" in play_def: type_val = play_def["type"] if isinstance(type_val, dict) and "one_of" in type_val: result["type"] = _as_list(type_val["one_of"]) else: result["type"] = _scalar(type_val) # 解析其他字段 for key in ["len", "core", "wings"]: if key in play_def: result[key] = play_def[key] # 解析 transfer_path if "transfer_path" in play_def: tp = play_def["transfer_path"] if isinstance(tp, dict): result["transfer_path"] = tp elif isinstance(tp, list): # 处理 from: hand to: field 格式 result["transfer_path"] = _parse_transfer_path_list(tp) # 解析 visibility_change if "visibility_change" in play_def: result["visibility_change"] = play_def["visibility_change"] # 保存原始定义用于复杂情况 result["raw_definition"] = {k: v for k, v in play_def.items() if k not in ["type", "len", "core", "wings", "transfer_path", "visibility_change"]} return result def _parse_special_action(special_def: Dict[str, Any]) -> Dict[str, Any]: """解析 special 动作定义""" result = {} # 解析 name if "name" in special_def: result["name"] = _scalar(special_def["name"]) # 解析 params if "params" in special_def: result["params"] = special_def["params"] # 解析 transfer_path if "transfer_path" in special_def: tp = special_def["transfer_path"] if isinstance(tp, dict): result["transfer_path"] = tp elif isinstance(tp, list): result["transfer_path"] = _parse_transfer_path_list(tp) # 解析 visibility_change if "visibility_change" in special_def: result["visibility_change"] = _parse_visibility_change(special_def["visibility_change"]) # 保存原始定义 result["raw_definition"] = {k: v for k, v in special_def.items() if k not in ["name", "params", "transfer_path", "visibility_change"]} return result def _merge_special_actions(special_defs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """合并被解析器分解的 special 动作定义""" if not special_defs: return [] # 按name分组,合并相同name的动作 grouped = {} for special_def in special_defs: if isinstance(special_def, dict): name = _scalar(special_def.get("name", "")) if name: if name not in grouped: grouped[name] = {} # 合并字段 for key, value in special_def.items(): if key != "name": grouped[name][key] = value else: # 没有name的,可能是参数或其他字段,需要与前面的动作合并 # 这里简化处理,直接作为独立动作 pass # 转换为列表 result = [] for name, fields in grouped.items(): action = {"name": name} action.update(fields) result.append(_parse_special_action(action)) return result def _parse_transfer_path_list(tp_list: List[Any]) -> Dict[str, str]: """解析 from: hand to: field 格式的 transfer_path""" result = {} i = 0 while i < len(tp_list) - 1: if isinstance(tp_list[i], str) and tp_list[i].endswith(":"): key = tp_list[i][:-1] # 去掉冒号 result[key] = _scalar(tp_list[i + 1]) i += 2 else: i += 1 return result def _fix_zone_ids_in_transfer_path(tp: Dict[str, str], pids: List[str]) -> Dict[str, str]: """修复transfer_path中的zone ID,将hand转换为具体的hand:PlayerID""" if not isinstance(tp, dict): return tp result = tp.copy() # 如果from是"hand",需要转换为具体的hand:PlayerID if result.get("from") == "hand" and pids: # 使用第一个玩家ID作为默认值 result["from"] = f"hand:{pids[0]}" # 如果to是"hand",也需要转换 if result.get("to") == "hand" and pids: result["to"] = f"hand:{pids[0]}" return result def _sanitize_mechanics(mechs: List[Dict[str,Any]]): for m in mechs or []: if not isinstance(m,dict): continue # phase/timing/trigger_condition 缺省 m["phase"]=_phase_to_schema(_scalar(m.get("phase") or "playing")) m.setdefault("timing", "during_action") m.setdefault("enabled", True) if str(m.get("timing") or "").strip() not in ("pre_action","post_action","during_action"): m["timing"] = "during_action" m.setdefault("trigger_condition", "always") raw_def = m.get("raw_definition", {}).copy() # Work on a copy to avoid modifying original input # --- Modification: Handle transfer_path for instantiation and Schema compliance --- original_tp = raw_def.get("transfer_path") if original_tp and isinstance(original_tp, dict): from_val = original_tp.get("from") to_val = original_tp.get("to") # Check if 'from' or 'to' are templates (e.g., 'hand', 'hand:*') or invalid zone names # If they are, keep the original transfer_path in raw_definition and do NOT create a top-level transfer_path def _is_template(z): return isinstance(z, str) and z in ("hand", "hand:*") def _is_valid_zone(z): return isinstance(z, str) and bool(ZONE_RE.match(z)) # If from or to is a template, or if from/to are not valid zones according to the schema's ZoneID pattern, # then the transfer_path cannot be placed at the top level. if _is_template(from_val) or _is_template(to_val) or not _is_valid_zone(from_val) or not _is_valid_zone(to_val): # The raw_def already contains the original transfer_path, which is correct. # Do not create a top-level transfer_path. # Ensure raw_definition exists and contains the original # --- Modification: Add note for instantiation --- m["raw_definition"] = {"transfer_path_note": {"zone_style": "needs_instantiation"}} # Remove transfer_path from top level if it was accidentally added (shouldn't happen here) m.pop("transfer_path", None) else: # If 'from' and 'to' are concrete and valid zones, move transfer_path to top level m["transfer_path"] = {"from": from_val, "to": to_val} # Remove transfer_path from raw_definition if it's valid and concrete raw_def.pop("transfer_path", None) if raw_def: # Only keep raw_definition if it has other keys m["raw_definition"] = raw_def else: m["raw_definition"] = {} else: # If no original transfer_path, or it's not a dict, just keep raw_def as is m["raw_definition"] = raw_def # 清掉 None 值,避免 schema 报错 for k in ("min_players","max_players","usage_limit","visibility_change"): if m.get(k, "__absent__") is None: m.pop(k, None) def _ensure_scoring(ir: Dict[str,Any]) -> Dict[str,Any]: sc=_as_dict(ir.get("scoring")) base=sc.get("base") if not isinstance(base,int) or base<0: sc["base"]=1 return sc def map_to_v095(nz: Dict[str,Any], gdl_text: str = None) -> Dict[str,Any]: # --- Modification: Dynamically set description based on game name --- game_name = _scalar(_as_dict(nz.get("game")).get("name") or "UnnamedGame") if "UNO" in game_name: description = "由标准54张牌叠加UNO特殊牌机制的变体。" elif "奇袭" in game_name: description = "在传统斗地主基础上加入了【援手】和【换位】两个新机制,增加了农民间的协作和角色变换的可能性" else: description = f"由标准54张牌叠加特殊牌机制的变体。" # Generic fallback # meta meta={"name": game_name, "version":"v0.95", "origin": "Variant", "seeded": True, "description": description} # players players=_derive_players(nz); pids=players.pop("_player_ids", []) # Removed role_defs as not needed # cards cards=_ensure_cards(nz) # zones - Modified to handle zone definitions from setup zone_defs = _as_list(nz.get("zones", [])) zones=_expand_zones(zone_defs, pids) # visibility visibility=_expand_vis_hand_star(_as_dict(nz.get("visibility")), pids) # combinations combinations=_ensure_combinations(nz) # comparison comparison=_as_dict(nz.get("comparison")) or {"same_type": True, "same_len": True, "bomb_beats_all": True, "rocket_top": True, "tiebreaker": "none"} # actions actions=_ensure_actions(nz, pids) # phases - Modified to handle GDL phase names and map to schema ph=_as_list(nz.get("phases")) allowed={"setup","deal","bid","double","initiative","play","settle","grouping"} # Apply mapping logic using _phase_to_schema cleaned=[] for x in ph: s=str(x).strip() mapped_s = _phase_to_schema(s) if mapped_s in allowed and mapped_s not in cleaned: cleaned.append(mapped_s) # Ensure 'setup' and 'settle' are present, with 'setup' first and 'settle' last if "setup" not in cleaned: cleaned=["setup"]+[t for t in cleaned if t!="setup"] if "settle" not in cleaned: cleaned=[t for t in cleaned if t!="settle"]+["settle"] # Add 'deal' phase if not present, typically after 'setup' if "deal" not in cleaned: setup_idx = -1 for i, p in enumerate(cleaned): if p == "setup": setup_idx = i break if setup_idx != -1: cleaned.insert(setup_idx + 1, "deal") # Insert 'deal' after 'setup' else: cleaned = ["deal"] + cleaned # If 'setup' is missing, add 'deal' at the beginning # Add 'play' phase if not present, typically after 'deal' if "play" not in cleaned: deal_idx = -1 for i, p in enumerate(cleaned): if p == "deal": deal_idx = i break if deal_idx != -1: cleaned.insert(deal_idx + 1, "play") # Insert 'play' after 'deal' else: # If 'deal' is also missing, try 'setup' setup_idx = -1 for i, p in enumerate(cleaned): if p == "setup": setup_idx = i break if setup_idx != -1: cleaned.insert(setup_idx + 1, "play") # Insert 'play' after 'setup' else: cleaned = ["play"] + cleaned # If neither 'setup' nor 'deal' is present, add 'play' at the beginning # turns turns=_as_dict(nz.get("turns")) or {} # Use the uniquely generated player IDs from _derive_players for order # If turns.order is empty or not provided, use pids order=turns.get("order") or pids # If turns.order was provided but is the original ["Landlord", "Peasant", "Peasant", "Peasant"], # it should be replaced by the unique pids generated in _derive_players. # The pids are ["Landlord", "Peasant", "Peasant_1", "Peasant_2"] which is the desired outcome. # So, we prioritize the pids (which are unique) over the potentially non-unique turns.order. # However, if turns.order is explicitly set *after* uniquification by normalizer, we should respect it. # The safest way is to always use pids if they are available and represent the actual unique player turn order. # Let's assume pids represents the final desired turn order. # Use the unique player IDs generated by _derive_players turns["order"]=pids if not turns.get("leader") and pids: turns["leader"]=pids[0] # Use first unique ID as leader turns.setdefault("trick_end_on", "two_pass") # next_leader ext=_as_dict(nz.get("extensions")); te=turns.get("trick_end_on","two_pass") if te=="custom": t_ext=_as_dict(ext.get("trick_end_on")) next_leader = "last_non_pass" if t_ext.get("mode")=="consecutive_passes" else "fixed" else: next_leader = "last_non_pass" turns.setdefault("next_leader", next_leader) # ending & scoring ending=_as_dict(nz.get("ending")) or {"when":"any_hand_empty"} scoring=_ensure_scoring(nz) # mechanics mechs=_as_list(nz.get("special_mechanics")); _sanitize_mechanics(mechs) # extensions - Add card mappings extensions=_as_dict(nz.get("extensions")) extensions["cards_mapping"] = { "rank_name_to_int": { "J": 11, "Q": 12, "K": 13, "A": 14, "2": 15 }, "suit_name_to_carrier": { "Spade": "Spade", "Heart": "Heart", "Club": "Club", "Diamond": "Diamond" } } ir = { "meta": meta, "players": players, "cards": cards, "zones": zones, "visibility": visibility, "combinations": combinations, "comparison": comparison, "actions": actions, "phases": cleaned, "turns": turns, "ending": ending, "scoring": scoring, "special_mechanics": mechs, "extensions": extensions } missing=[k for k in REQUIRED if k not in ir] if missing: raise ValueError(f"Missing required keys after mapping: {missing}") return ir