Fix loader: correct CADAD signature, bundle sentence-segmentation collate + concept vocab, decode_greedy path verified end-to-end
23c824a verified | """Sentence segmentation + collate for chest2err inference. | |
| Stripped-down version of the in-tree training collate: keeps only what's needed | |
| to run greedy decoding on one (ref, cand) pair. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import Any, Dict, List, Tuple | |
| import torch | |
| _SECTION_HEADER_RE = re.compile(r"\[[^\]]+\]") | |
| _BULLET_RE = re.compile(r"\n\s*[-*•]\s+") | |
| _SENT_BOUNDARY_RE = re.compile(r"(?<=\.)\s+(?=[A-Z])") | |
| def segment_report(text: str, min_len: int = 3) -> List[str]: | |
| if not text or not text.strip(): | |
| return [] | |
| t = _SECTION_HEADER_RE.sub("\n", text) | |
| t = _BULLET_RE.sub("\n", t) | |
| out: List[str] = [] | |
| for line in t.split("\n"): | |
| line = line.strip() | |
| if len(line) < min_len: | |
| continue | |
| for sent in _SENT_BOUNDARY_RE.split(line): | |
| sent = sent.strip() | |
| if len(sent) >= min_len: | |
| out.append(sent) | |
| return out | |
| def encode_pair_for_decoder( | |
| tokenizer, | |
| ref_text: str, | |
| cand_text: str, | |
| max_length: int = 1280, | |
| ref_marker: str = "[REF]", | |
| cand_marker: str = "[PRED]", | |
| ) -> Dict[str, Any]: | |
| ref_segs = segment_report(ref_text) | |
| cand_segs = segment_report(cand_text) | |
| ref_marker_ids = tokenizer.encode(ref_marker + " ", add_special_tokens=False) | |
| cand_marker_ids = tokenizer.encode(" " + cand_marker + " ", add_special_tokens=False) | |
| ref_seg_token_ids = [tokenizer.encode(s + " ", add_special_tokens=False) for s in ref_segs] | |
| cand_seg_token_ids = [tokenizer.encode(s + " ", add_special_tokens=False) for s in cand_segs] | |
| def _total_len(rs, cs): | |
| return (len(ref_marker_ids) + sum(len(x) for x in rs) | |
| + len(cand_marker_ids) + sum(len(x) for x in cs)) | |
| while _total_len(ref_seg_token_ids, cand_seg_token_ids) > max_length and cand_seg_token_ids: | |
| cand_seg_token_ids.pop(); cand_segs = cand_segs[:-1] | |
| while _total_len(ref_seg_token_ids, cand_seg_token_ids) > max_length and ref_seg_token_ids: | |
| ref_seg_token_ids.pop(); ref_segs = ref_segs[:-1] | |
| input_ids: List[int] = [] | |
| input_ids.extend(ref_marker_ids) | |
| ref_ranges: List[Tuple[int, int]] = [] | |
| for ids in ref_seg_token_ids: | |
| s = len(input_ids); input_ids.extend(ids); ref_ranges.append((s, len(input_ids))) | |
| input_ids.extend(cand_marker_ids) | |
| cand_ranges: List[Tuple[int, int]] = [] | |
| for ids in cand_seg_token_ids: | |
| s = len(input_ids); input_ids.extend(ids); cand_ranges.append((s, len(input_ids))) | |
| return { | |
| "input_ids": input_ids, | |
| "ref_seg_ranges": ref_ranges, | |
| "cand_seg_ranges": cand_ranges, | |
| "ref_segs": ref_segs, | |
| "cand_segs": cand_segs, | |
| } | |
| def collate_decoder_batch(items: List[Dict[str, Any]], pad_token_id: int = 0) -> Dict[str, torch.Tensor]: | |
| T = max(len(it["input_ids"]) for it in items) | |
| Sr = max(max(len(it["ref_seg_ranges"]), 1) for it in items) | |
| Sc = max(max(len(it["cand_seg_ranges"]), 1) for it in items) | |
| B = len(items) | |
| input_ids = torch.full((B, T), pad_token_id, dtype=torch.long) | |
| attention_mask = torch.zeros((B, T), dtype=torch.long) | |
| ref_seg_token_mask = torch.zeros((B, Sr, T), dtype=torch.bool) | |
| cand_seg_token_mask = torch.zeros((B, Sc, T), dtype=torch.bool) | |
| for b, it in enumerate(items): | |
| ids = it["input_ids"]; L = len(ids) | |
| input_ids[b, :L] = torch.tensor(ids, dtype=torch.long) | |
| attention_mask[b, :L] = 1 | |
| for s, (a, e) in enumerate(it["ref_seg_ranges"]): | |
| ref_seg_token_mask[b, s, a:e] = True | |
| for s, (a, e) in enumerate(it["cand_seg_ranges"]): | |
| cand_seg_token_mask[b, s, a:e] = True | |
| return { | |
| "input_ids": input_ids, | |
| "attention_mask": attention_mask, | |
| "ref_seg_token_mask": ref_seg_token_mask, | |
| "cand_seg_token_mask": cand_seg_token_mask, | |
| } | |