| |
| """ |
| mapper_v2.py — 将 normalizer_v2 输出映射为 v0.95 IR(schema 友好;兼容 UNO/七夕) |
| """ |
| from __future__ import annotations |
| from typing import Any, Dict, List |
| from copy import deepcopy |
| import re |
|
|
| REQUIRED = ["meta","players","cards","zones","visibility","combinations","comparison","actions","phases","turns","ending","scoring"] |
| ZONE_RE = re.compile(r"^(hand:[A-Za-z0-9_:-]+|field|main_deck|discard_pile|reserve_zone|public_pool|item_deck:[A-Za-z0-9_:-]+|history_deck)$") |
|
|
| def _as_dict(x): return x if isinstance(x,dict) else {} |
| def _as_list(x): return x if isinstance(x,list) else ([] if x is None else [x]) |
|
|
| def _scalar(x): |
| if isinstance(x,str): return x |
| if isinstance(x,(int,float)): return str(x) |
| if isinstance(x,dict): |
| if "_" in x and isinstance(x["_"], list) and x["_"]: return _scalar(x["_"][0]) |
| if "name" in x and isinstance(x["name"], str): return x["name"] |
| if len(x)==1: |
| k,v = next(iter(x.items())) |
| if v in (None,{}) and isinstance(k,str): return k |
| if isinstance(x,list) and x: return _scalar(x[0]) |
| return "" |
|
|
| def _uniquify(seq: List[str]) -> List[str]: |
| seen={}; out=[] |
| for s in map(str, seq): |
| if s not in seen: |
| seen[s]=0; out.append(s) |
| else: |
| seen[s]+=1; out.append(f"{s}_{seen[s]}") |
| return out |
|
|
| def _phase_to_schema(s: str) -> str: |
| |
| mp={ |
| "deal":"setup", |
| "setup":"setup", |
| "bid":"bidding", |
| "bidding":"bidding", |
| "double":"doubling", |
| "doubling":"doubling", |
| "initiative":"setup", |
| "grouping":"grouping", |
| "play":"playing", |
| "playing":"playing", |
| "playing_phase":"play", |
| "settle":"settlement", |
| "settlement":"settlement", |
| "settlement_phase":"settlement" |
| } |
| return mp.get(s, s) |
|
|
| def _derive_players(nz: Dict[str,Any]) -> Dict[str,Any]: |
| roles=_as_list(_as_dict(nz.get("game")).get("roles")) |
| order=_as_dict(nz.get("turns")).get("order") or [] |
| ids = _uniquify([_scalar(x) for x in order]) if order else [] |
| if not ids and roles: |
| tmp=[] |
| for r in roles: |
| rd=_as_dict(r) |
| name=_scalar(rd.get("name") or rd.get("role") or "Player") |
| cnt=int(rd.get("count",1) or 1) |
| tmp += [name]*max(1,cnt) |
| ids=_uniquify(tmp) |
| if not ids: |
| ids=["P0","P1","P2"] |
|
|
| |
| |
| out_roles = [] |
| current_id_idx = 0 |
| for r in roles: |
| rd = _as_dict(r) |
| role_name = _scalar(rd.get("name") or "Player") |
| role_count = int(rd.get("count", 1) or 1) |
|
|
| |
| role_players = [] |
| for i in range(role_count): |
| if current_id_idx < len(ids): |
| player_id = ids[current_id_idx] |
| role_players.append(player_id) |
| current_id_idx += 1 |
| else: |
| |
| player_id = f"{role_name}_{i}" if i > 0 else role_name |
| role_players.append(player_id) |
|
|
| |
| out_roles.append({"name": role_name, "count": role_count, "players": role_players}) |
| |
|
|
| return {"count": len(ids), "roles": out_roles, "_player_ids": ids} |
|
|
| def _expand_zones(zs: List[Dict[str, Any]], pids: List[str]) -> List[str]: |
| out=[] |
| zone_types_map = { |
| "hand": "hand:{pid}", |
| "field": "field", |
| "discard_pile": "discard_pile", |
| "main_deck": "main_deck", |
| "special_deck": "item_deck:{name}" |
| } |
| for z_def in zs or []: |
| z_type = z_def.get("type") |
| if z_type in zone_types_map: |
| template = zone_types_map[z_type] |
| if z_type == "hand": |
| |
| for pid in pids: |
| out.append(template.format(pid=pid)) |
| elif z_type == "special_deck": |
| |
| name = z_def.get("name", "Unknown") |
| out.append(template.format(name=name)) |
| else: |
| |
| out.append(template) |
| |
| base = ["field","discard_pile","main_deck"] |
| for b in base: |
| if b not in out: out.append(b) |
| |
| seen=set(); uniq=[] |
| for z in out: |
| if z not in seen: |
| seen.add(z); uniq.append(z) |
| return uniq |
|
|
|
|
| def _expand_vis_hand_star(vis: Dict[str,Any], pids: List[str]) -> Dict[str,Any]: |
| vis=_as_dict(vis); vis.setdefault("defaults", {}); by=_as_dict(vis.get("by_zone")) |
| if "hand:*" in by: |
| cfg=by.pop("hand:*") |
| for pid in pids: by[f"hand:{pid}"]=cfg |
| vis["by_zone"]=by; return vis |
|
|
| def _ensure_cards(nz: Dict[str,Any]) -> Dict[str,Any]: |
| cards=_as_dict(nz.get("cards")) |
| ranks=_as_list(cards.get("ranks")); suits=_as_list(cards.get("suits")) |
| if not ranks or not suits: |
| |
| card_rels = _as_dict(nz.get("card_relations")) |
| if card_rels: |
| |
| card_vals = _as_list(card_rels.get("card_values")) |
| |
| symbol_map = { |
| "J": 11, "Q": 12, "K": 13, "A": 14, "2": 15 |
| } |
| ranks = [] |
| for val in card_vals: |
| if isinstance(val, int): |
| ranks.append(val) |
| elif isinstance(val, str) and val in symbol_map: |
| ranks.append(symbol_map[val]) |
| else: |
| ranks.append(_scalar(val)) |
|
|
| |
| suit_rels = _as_dict(card_rels.get("suit_relations")) |
| suit_order = _as_list(suit_rels.get("order")) |
| suits = [s for s in suit_order if isinstance(s, str)] or ["Spade","Heart","Club","Diamond"] |
|
|
| |
| jokers = {} |
| suitless_ranks = [] |
| |
| if "2" in card_vals or 15 in ranks: |
| |
| |
| |
| |
| deck_info = _as_dict(_as_dict(nz.get("game")).get("deck")) |
| if deck_info.get("type") == "Standard54": |
| jokers = {"small": 16, "big": 17} |
| suitless_ranks = [16, 17] |
| |
| if 16 not in ranks: ranks.append(16) |
| if 17 not in ranks: ranks.append(17) |
|
|
| |
| cards = { |
| "ranks": sorted(ranks), |
| "suits": suits, |
| "jokers": jokers, |
| "suitless_ranks": suitless_ranks, |
| "copies_per_deck": 1, |
| "num_decks": 1 |
| } |
| else: |
| |
| ranks=list(range(3,18)) |
| suits=["Spade","Heart","Club","Diamond"] |
| cards={"ranks":ranks, "suits":suits, "jokers":{"small":16,"big":17}} |
| else: |
| |
| |
| |
| if "jokers" in cards and "suitless_ranks" not in cards: |
| jokers = cards.get("jokers", {}) |
| small_j = jokers.get("small") |
| big_j = jokers.get("big") |
| suitless_ranks = [r for r in [small_j, big_j] if r is not None] |
| cards["suitless_ranks"] = suitless_ranks |
| if "copies_per_deck" not in cards: |
| cards["copies_per_deck"] = 1 |
| if "num_decks" not in cards: |
| cards["num_decks"] = 1 |
|
|
| return cards |
|
|
| def _ensure_combinations(nz: Dict[str,Any]) -> Dict[str,Any]: |
| cmb=_as_dict(nz.get("combinations")) |
| out={"single":{}, "pair":{}, "triple":{}, "straight":{}, "pairs_chain":{}, "airplane":{}, |
| "triple_with_single":{}, "triple_with_pair":{}, "four_with_twoSingles":{}, "four_with_twoPairs":{}, "bomb":{}, "rocket":{}, "custom":[]} |
|
|
| |
| |
| raw_combinations = _as_dict(nz.get("combinations", {})) |
| |
| if "straight" in nz.get("game", {}).get("combinations", {}): |
| |
| gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {})) |
| if isinstance(gdl_combinations.get("straight"), (int, list)): |
| val = gdl_combinations.get("straight") |
| if isinstance(val, list) and val: |
| val = val[0] |
| out["straight"] = {"min_len": int(val)} |
| else: |
| out["straight"] = {} |
|
|
| if "bomb" in nz.get("game", {}).get("combinations", {}): |
| gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {})) |
| if isinstance(gdl_combinations.get("bomb"), (int, list)): |
| val = gdl_combinations.get("bomb") |
| if isinstance(val, list) and val: |
| val = val[0] |
| out["bomb"] = {"len": int(val)} |
| else: |
| out["bomb"] = {} |
|
|
| |
| if "custom" in raw_combinations and isinstance(raw_combinations["custom"], list): |
| out["custom"] = raw_combinations["custom"] |
| else: |
| |
| out["custom"] = [] |
|
|
| |
| for k in list(out.keys()): |
| if k in cmb and isinstance(cmb[k], dict): |
| out[k].update(cmb[k]) |
|
|
| return out |
|
|
| def _ensure_actions(nz: Dict[str,Any], pids: List[str] = None) -> Dict[str,Any]: |
| actions=_as_dict(nz.get("actions")) |
| pids = pids or [] |
| |
| |
| play_actions = [] |
| if "play" in actions: |
| play_def = actions["play"] |
| if isinstance(play_def, list): |
| |
| play_action = _merge_play_actions(play_def, nz) |
| if play_action: |
| |
| if "transfer_path" in play_action: |
| play_action["transfer_path"] = _fix_zone_ids_in_transfer_path( |
| play_action["transfer_path"], pids) |
| play_actions.append(play_action) |
| elif isinstance(play_def, dict): |
| |
| play_action = _parse_play_action(play_def) |
| if "transfer_path" in play_action: |
| play_action["transfer_path"] = _fix_zone_ids_in_transfer_path( |
| play_action["transfer_path"], pids) |
| play_actions.append(play_action) |
| |
| |
| special_actions = [] |
| if "special" in actions: |
| special_def = actions["special"] |
| if isinstance(special_def, list): |
| special_actions = _merge_special_actions(special_def) |
| elif isinstance(special_def, dict): |
| special_actions.append(_parse_special_action(special_def)) |
| |
| |
| for action in special_actions: |
| if "transfer_path" in action: |
| action["transfer_path"] = _fix_zone_ids_in_transfer_path( |
| action["transfer_path"], pids) |
| |
| |
| output = { |
| "play": play_actions, |
| "pass": {"transfer_path": {"from": "field", "to": "field"}}, |
| "cleanup_trick": [{"from": "field", "to": "discard_pile", "count": "all"}], |
| "other": {} |
| } |
| |
| |
| if special_actions: |
| |
| special_transfers = [] |
| for action in special_actions: |
| if "transfer_path" in action: |
| transfer = action["transfer_path"].copy() |
| if "visibility_change" in action: |
| transfer["visibility_change"] = action["visibility_change"] |
| special_transfers.append(transfer) |
| if special_transfers: |
| output["other"]["special"] = special_transfers |
| |
| return output |
|
|
| def _merge_play_actions(play_defs: List[Dict[str, Any]], gdl_actions: Dict[str, Any] = None) -> Dict[str, Any]: |
| """合并被解析器分解的 play 动作定义""" |
| merged = {} |
| raw_def = {} |
| |
| |
| type_list = [] |
| for play_def in play_defs: |
| if isinstance(play_def, dict) and "type" in play_def: |
| type_val = play_def["type"] |
| if isinstance(type_val, dict): |
| type_list = _extract_one_of_values(type_val) |
| break |
| |
| if type_list: |
| |
| valid_types = [t for t in type_list if t not in ["one_of", "int?", "sequence?"]] |
| if valid_types: |
| |
| merged["type"] = valid_types[0] |
| |
| for play_def in play_defs: |
| if isinstance(play_def, dict): |
| |
| if "type" in play_def and "type" not in merged: |
| type_val = play_def["type"] |
| if isinstance(type_val, dict): |
| if "one_of" in type_val: |
| |
| type_list = _extract_one_of_values(type_val) |
| merged["type"] = type_list |
| else: |
| merged["type"] = _scalar(type_val) |
| else: |
| merged["type"] = _scalar(type_val) |
| |
| elif "len" in play_def: |
| |
| len_val = _extract_simple_value(play_def["len"]) |
| if len_val != "int?": |
| merged["len"] = len_val |
| |
| elif "core" in play_def: |
| |
| core_val = _extract_simple_value(play_def["core"]) |
| if core_val != "sequence?": |
| merged["core"] = core_val |
| |
| elif "wings" in play_def: |
| |
| wings_val = _extract_simple_value(play_def["wings"]) |
| if wings_val != "int?": |
| merged["wings"] = wings_val |
| |
| elif "transfer_path" in play_def: |
| tp = play_def["transfer_path"] |
| if isinstance(tp, dict): |
| merged["transfer_path"] = tp |
| elif isinstance(tp, list): |
| merged["transfer_path"] = _parse_transfer_path_list(tp) |
| |
| elif "visibility_change" in play_def: |
| vc = play_def["visibility_change"] |
| merged["visibility_change"] = _parse_visibility_change(vc) |
| |
| else: |
| |
| raw_def.update(play_def) |
| |
| |
| if "type" not in merged: |
| merged["type"] = ["single", "pair", "triple", "straight", "bomb", "rocket"] |
| |
| |
| if "visibility_change" in merged: |
| vc = merged["visibility_change"] |
| if "state" not in vc: |
| vc["state"] = "visible" |
| |
| |
| if raw_def: |
| merged["raw_definition"] = raw_def |
| |
| return merged |
|
|
| def _extract_one_of_values(type_dict: Dict[str, Any]) -> List[str]: |
| """从 one_of 结构中提取值列表""" |
| values = [] |
| |
| def extract_recursive(obj): |
| if isinstance(obj, dict): |
| if "_" in obj: |
| if isinstance(obj["_"], list): |
| values.extend(obj["_"]) |
| for k, v in obj.items(): |
| if k != "_": |
| |
| if isinstance(v, dict): |
| if len(v) == 0: |
| |
| values.append(k) |
| elif "_" in v: |
| |
| values.append(k) |
| extract_recursive(v) |
| else: |
| |
| extract_recursive(v) |
| else: |
| |
| values.append(k) |
| elif isinstance(obj, list): |
| for item in obj: |
| extract_recursive(item) |
| |
| extract_recursive(type_dict) |
| return values |
|
|
| def _extract_play_types_from_gdl(gdl_actions: Dict[str, Any]) -> List[str]: |
| """从GDL actions中提取play动作的牌型列表""" |
| if "play" not in gdl_actions: |
| return [] |
| |
| play_def = gdl_actions["play"] |
| if isinstance(play_def, list): |
| |
| for item in play_def: |
| if isinstance(item, dict) and "type" in item: |
| type_val = item["type"] |
| if isinstance(type_val, dict) and "one_of" in type_val: |
| return _extract_one_of_values(type_val) |
| elif isinstance(play_def, dict) and "type" in play_def: |
| type_val = play_def["type"] |
| if isinstance(type_val, dict) and "one_of" in type_val: |
| return _extract_one_of_values(type_val) |
| |
| |
| game_actions = _as_dict(_as_dict(gdl_actions.get("game", {})).get("actions", {})) |
| if "play" in game_actions: |
| play_def = game_actions["play"] |
| if isinstance(play_def, list): |
| for item in play_def: |
| if isinstance(item, dict) and "type" in item: |
| type_val = item["type"] |
| if isinstance(type_val, dict) and "one_of" in type_val: |
| return _extract_one_of_values(type_val) |
| elif isinstance(play_def, dict) and "type" in play_def: |
| type_val = play_def["type"] |
| if isinstance(type_val, dict) and "one_of" in type_val: |
| return _extract_one_of_values(type_val) |
| |
| return [] |
|
|
| def _extract_simple_value(obj: Any) -> Any: |
| """提取简单值,处理嵌套的 {"_": [value]} 结构""" |
| if isinstance(obj, dict): |
| if "_" in obj and isinstance(obj["_"], list) and len(obj["_"]) == 1: |
| return obj["_"][0] |
| elif len(obj) == 1: |
| key, value = next(iter(obj.items())) |
| if isinstance(value, dict) and len(value) == 0: |
| return key |
| return _extract_simple_value(value) |
| return obj |
|
|
| def _parse_visibility_change(vc: Dict[str, Any]) -> Dict[str, Any]: |
| """解析visibility_change字段""" |
| result = {} |
| |
| |
| if "to" in vc: |
| to_val = vc["to"] |
| if isinstance(to_val, list): |
| |
| audiences = [] |
| for item in to_val: |
| if isinstance(item, dict): |
| |
| for key, value in item.items(): |
| if isinstance(value, dict) and "_" in value: |
| audiences.extend(value["_"]) |
| else: |
| audiences.append(key) |
| else: |
| audiences.append(_scalar(item)) |
| result["to"] = audiences |
| else: |
| result["to"] = [_scalar(to_val)] |
| |
| |
| if "state" in vc: |
| state_val = _extract_simple_value(vc["state"]) |
| if state_val != "visible": |
| result["state"] = state_val |
| |
| |
| if "on_target" in vc: |
| on_target_val = _extract_simple_value(vc["on_target"]) |
| if on_target_val != "true": |
| result["on_target"] = on_target_val |
| |
| return result |
|
|
| def _parse_play_action(play_def: Dict[str, Any]) -> Dict[str, Any]: |
| """解析单个 play 动作定义""" |
| result = {} |
| |
| |
| if "type" in play_def: |
| type_val = play_def["type"] |
| if isinstance(type_val, dict) and "one_of" in type_val: |
| result["type"] = _as_list(type_val["one_of"]) |
| else: |
| result["type"] = _scalar(type_val) |
| |
| |
| for key in ["len", "core", "wings"]: |
| if key in play_def: |
| result[key] = play_def[key] |
| |
| |
| if "transfer_path" in play_def: |
| tp = play_def["transfer_path"] |
| if isinstance(tp, dict): |
| result["transfer_path"] = tp |
| elif isinstance(tp, list): |
| |
| result["transfer_path"] = _parse_transfer_path_list(tp) |
| |
| |
| if "visibility_change" in play_def: |
| result["visibility_change"] = play_def["visibility_change"] |
| |
| |
| result["raw_definition"] = {k: v for k, v in play_def.items() |
| if k not in ["type", "len", "core", "wings", "transfer_path", "visibility_change"]} |
| |
| return result |
|
|
| def _parse_special_action(special_def: Dict[str, Any]) -> Dict[str, Any]: |
| """解析 special 动作定义""" |
| result = {} |
| |
| |
| if "name" in special_def: |
| result["name"] = _scalar(special_def["name"]) |
| |
| |
| if "params" in special_def: |
| result["params"] = special_def["params"] |
| |
| |
| if "transfer_path" in special_def: |
| tp = special_def["transfer_path"] |
| if isinstance(tp, dict): |
| result["transfer_path"] = tp |
| elif isinstance(tp, list): |
| result["transfer_path"] = _parse_transfer_path_list(tp) |
| |
| |
| if "visibility_change" in special_def: |
| result["visibility_change"] = _parse_visibility_change(special_def["visibility_change"]) |
| |
| |
| result["raw_definition"] = {k: v for k, v in special_def.items() |
| if k not in ["name", "params", "transfer_path", "visibility_change"]} |
| |
| return result |
|
|
| def _merge_special_actions(special_defs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """合并被解析器分解的 special 动作定义""" |
| if not special_defs: |
| return [] |
| |
| |
| grouped = {} |
| for special_def in special_defs: |
| if isinstance(special_def, dict): |
| name = _scalar(special_def.get("name", "")) |
| if name: |
| if name not in grouped: |
| grouped[name] = {} |
| |
| for key, value in special_def.items(): |
| if key != "name": |
| grouped[name][key] = value |
| else: |
| |
| |
| pass |
| |
| |
| result = [] |
| for name, fields in grouped.items(): |
| action = {"name": name} |
| action.update(fields) |
| result.append(_parse_special_action(action)) |
| |
| return result |
|
|
| def _parse_transfer_path_list(tp_list: List[Any]) -> Dict[str, str]: |
| """解析 from: hand to: field 格式的 transfer_path""" |
| result = {} |
| i = 0 |
| while i < len(tp_list) - 1: |
| if isinstance(tp_list[i], str) and tp_list[i].endswith(":"): |
| key = tp_list[i][:-1] |
| result[key] = _scalar(tp_list[i + 1]) |
| i += 2 |
| else: |
| i += 1 |
| return result |
|
|
| def _fix_zone_ids_in_transfer_path(tp: Dict[str, str], pids: List[str]) -> Dict[str, str]: |
| """修复transfer_path中的zone ID,将hand转换为具体的hand:PlayerID""" |
| if not isinstance(tp, dict): |
| return tp |
| |
| result = tp.copy() |
| |
| |
| if result.get("from") == "hand" and pids: |
| |
| result["from"] = f"hand:{pids[0]}" |
| |
| |
| if result.get("to") == "hand" and pids: |
| result["to"] = f"hand:{pids[0]}" |
| |
| return result |
|
|
| def _sanitize_mechanics(mechs: List[Dict[str,Any]]): |
| for m in mechs or []: |
| if not isinstance(m,dict): continue |
| |
| m["phase"]=_phase_to_schema(_scalar(m.get("phase") or "playing")) |
| m.setdefault("timing", "during_action") |
| m.setdefault("enabled", True) |
| if str(m.get("timing") or "").strip() not in ("pre_action","post_action","during_action"): |
| m["timing"] = "during_action" |
| m.setdefault("trigger_condition", "always") |
| raw_def = m.get("raw_definition", {}).copy() |
| |
| original_tp = raw_def.get("transfer_path") |
| if original_tp and isinstance(original_tp, dict): |
| from_val = original_tp.get("from") |
| to_val = original_tp.get("to") |
|
|
| |
| |
| def _is_template(z): |
| return isinstance(z, str) and z in ("hand", "hand:*") |
|
|
| def _is_valid_zone(z): |
| return isinstance(z, str) and bool(ZONE_RE.match(z)) |
|
|
| |
| |
| if _is_template(from_val) or _is_template(to_val) or not _is_valid_zone(from_val) or not _is_valid_zone(to_val): |
| |
| |
| |
| |
| m["raw_definition"] = {"transfer_path_note": {"zone_style": "needs_instantiation"}} |
| |
| m.pop("transfer_path", None) |
| else: |
| |
| m["transfer_path"] = {"from": from_val, "to": to_val} |
| |
| raw_def.pop("transfer_path", None) |
| if raw_def: |
| m["raw_definition"] = raw_def |
| else: |
| m["raw_definition"] = {} |
| else: |
| |
| m["raw_definition"] = raw_def |
|
|
| |
| for k in ("min_players","max_players","usage_limit","visibility_change"): |
| if m.get(k, "__absent__") is None: |
| m.pop(k, None) |
|
|
| def _ensure_scoring(ir: Dict[str,Any]) -> Dict[str,Any]: |
| sc=_as_dict(ir.get("scoring")) |
| base=sc.get("base") |
| if not isinstance(base,int) or base<0: sc["base"]=1 |
| return sc |
|
|
| def map_to_v095(nz: Dict[str,Any], gdl_text: str = None) -> Dict[str,Any]: |
| |
| game_name = _scalar(_as_dict(nz.get("game")).get("name") or "UnnamedGame") |
| if "UNO" in game_name: |
| description = "由标准54张牌叠加UNO特殊牌机制的变体。" |
| elif "奇袭" in game_name: |
| description = "在传统斗地主基础上加入了【援手】和【换位】两个新机制,增加了农民间的协作和角色变换的可能性" |
| else: |
| description = f"由标准54张牌叠加特殊牌机制的变体。" |
|
|
| |
| meta={"name": game_name, "version":"v0.95", "origin": "Variant", "seeded": True, "description": description} |
|
|
| |
| players=_derive_players(nz); pids=players.pop("_player_ids", []) |
|
|
| |
| cards=_ensure_cards(nz) |
|
|
| |
| zone_defs = _as_list(nz.get("zones", [])) |
| zones=_expand_zones(zone_defs, pids) |
|
|
| |
| visibility=_expand_vis_hand_star(_as_dict(nz.get("visibility")), pids) |
|
|
| |
| combinations=_ensure_combinations(nz) |
|
|
| |
| comparison=_as_dict(nz.get("comparison")) or {"same_type": True, "same_len": True, "bomb_beats_all": True, "rocket_top": True, "tiebreaker": "none"} |
|
|
| |
| actions=_ensure_actions(nz, pids) |
|
|
| |
| ph=_as_list(nz.get("phases")) |
| allowed={"setup","deal","bid","double","initiative","play","settle","grouping"} |
| |
| cleaned=[] |
| for x in ph: |
| s=str(x).strip() |
| mapped_s = _phase_to_schema(s) |
| if mapped_s in allowed and mapped_s not in cleaned: |
| cleaned.append(mapped_s) |
| |
| if "setup" not in cleaned: cleaned=["setup"]+[t for t in cleaned if t!="setup"] |
| if "settle" not in cleaned: cleaned=[t for t in cleaned if t!="settle"]+["settle"] |
| |
| if "deal" not in cleaned: |
| setup_idx = -1 |
| for i, p in enumerate(cleaned): |
| if p == "setup": |
| setup_idx = i |
| break |
| if setup_idx != -1: |
| cleaned.insert(setup_idx + 1, "deal") |
| else: |
| cleaned = ["deal"] + cleaned |
| |
| if "play" not in cleaned: |
| deal_idx = -1 |
| for i, p in enumerate(cleaned): |
| if p == "deal": |
| deal_idx = i |
| break |
| if deal_idx != -1: |
| cleaned.insert(deal_idx + 1, "play") |
| else: |
| |
| setup_idx = -1 |
| for i, p in enumerate(cleaned): |
| if p == "setup": |
| setup_idx = i |
| break |
| if setup_idx != -1: |
| cleaned.insert(setup_idx + 1, "play") |
| else: |
| cleaned = ["play"] + cleaned |
|
|
|
|
| |
| turns=_as_dict(nz.get("turns")) or {} |
| |
| |
| order=turns.get("order") or pids |
| |
| |
| |
| |
| |
| |
| |
| |
| turns["order"]=pids |
| if not turns.get("leader") and pids: turns["leader"]=pids[0] |
| turns.setdefault("trick_end_on", "two_pass") |
| |
| ext=_as_dict(nz.get("extensions")); te=turns.get("trick_end_on","two_pass") |
| if te=="custom": |
| t_ext=_as_dict(ext.get("trick_end_on")) |
| next_leader = "last_non_pass" if t_ext.get("mode")=="consecutive_passes" else "fixed" |
| else: |
| next_leader = "last_non_pass" |
| turns.setdefault("next_leader", next_leader) |
|
|
| |
| ending=_as_dict(nz.get("ending")) or {"when":"any_hand_empty"} |
| scoring=_ensure_scoring(nz) |
|
|
| |
| mechs=_as_list(nz.get("special_mechanics")); _sanitize_mechanics(mechs) |
|
|
| |
| extensions=_as_dict(nz.get("extensions")) |
| extensions["cards_mapping"] = { |
| "rank_name_to_int": { |
| "J": 11, |
| "Q": 12, |
| "K": 13, |
| "A": 14, |
| "2": 15 |
| }, |
| "suit_name_to_carrier": { |
| "Spade": "Spade", |
| "Heart": "Heart", |
| "Club": "Club", |
| "Diamond": "Diamond" |
| } |
| } |
|
|
|
|
| ir = { |
| "meta": meta, |
| "players": players, |
| "cards": cards, |
| "zones": zones, |
| "visibility": visibility, |
| "combinations": combinations, |
| "comparison": comparison, |
| "actions": actions, |
| "phases": cleaned, |
| "turns": turns, |
| "ending": ending, |
| "scoring": scoring, |
| "special_mechanics": mechs, |
| "extensions": extensions |
| } |
|
|
| missing=[k for k in REQUIRED if k not in ir] |
| if missing: raise ValueError(f"Missing required keys after mapping: {missing}") |
| return ir |
|
|