"""SFT utility functions for parsing and masking.""" from typing import Dict, Any, List, Tuple from taoTrain.config import TrainingConfig def parse_sft_record(record: Dict[str, Any], config: TrainingConfig) -> Tuple[List[Tuple[str, str]], bool]: """ Parse JSONL record into list of (user, assistant) turns. Supports two formats: 1. Single-turn: {"input": "...", "output": "..."} 2. Multi-turn: {"turns": [{"user": "...", "assistant": "..."}, ...]} Args: record: JSONL record (dict) config: Training configuration Returns: (turns_list, is_multi_turn) where: - turns_list: List of (user_text, assistant_text) tuples - is_multi_turn: Whether this is a multi-turn record """ # Check for multi-turn format if "turns" in record: turns = [] for turn in record["turns"]: if isinstance(turn, dict) and "user" in turn and "assistant" in turn: turns.append((turn["user"], turn["assistant"])) if turns: return turns, True # Check for single-turn format with input/output fields if "input" in record and "output" in record: return [(record["input"], record["output"])], False # Fallback: check for instruction/response fields (from config) dataset_config = config.dataset instruction_col = dataset_config.instruction_column or "instruction" response_col = dataset_config.response_column or "response" if instruction_col in record and response_col in record: return [(record[instruction_col], record[response_col])], False # Fallback: assume pre-formatted "text" field (old format) if "text" in record: return [(record["text"], "")], False return [], False def build_sft_sequence_tokens( turns: List[Tuple[str, str]], tokenizer, user_token: str = "", assistant_token: str = "", max_seq_length: int = 1024, ) -> Tuple[List[int], List[int], List[int]]: """ Build token sequence for SFT with role tokens and generate masking info. Sequence format: [user_token_id] user_tokens [assistant_token_id] assistant_tokens ... [eos_token_id] Mask values: - 0 (ignore): user input regions and role tokens → loss=-100 - 1 (train): assistant output regions → compute loss Args: turns: List of (user_text, assistant_text) tuples tokenizer: Tokenizer instance user_token: Role token for user (e.g., "") assistant_token: Role token for assistant (e.g., "") max_seq_length: Maximum sequence length Returns: (input_ids, attention_mask, mask) where: - input_ids: Token IDs for the full sequence - attention_mask: Attention mask (1 for real tokens, 0 for padding) - mask: Loss mask (0=ignore, 1=train loss) """ input_ids = [] mask = [] # Get token IDs for special tokens user_token_ids = tokenizer(user_token, add_special_tokens=False)["input_ids"] assistant_token_ids = tokenizer(assistant_token, add_special_tokens=False)["input_ids"] # Process each turn for user_text, assistant_text in turns: # User role marker input_ids.extend(user_token_ids) mask.extend([0] * len(user_token_ids)) # Mask role token # User message tokens user_tokens = tokenizer(user_text, add_special_tokens=False)["input_ids"] input_ids.extend(user_tokens) mask.extend([0] * len(user_tokens)) # Mask user input # Assistant role marker input_ids.extend(assistant_token_ids) mask.extend([0] * len(assistant_token_ids)) # Mask role token # Assistant message tokens assistant_tokens = tokenizer(assistant_text, add_special_tokens=False)["input_ids"] input_ids.extend(assistant_tokens) mask.extend([1] * len(assistant_tokens)) # Train on assistant output # Add EOS token if exists if hasattr(tokenizer, 'eos_token_id') and tokenizer.eos_token_id is not None: input_ids.append(tokenizer.eos_token_id) mask.append(0) # Mask EOS token # Truncate if too long if len(input_ids) > max_seq_length: input_ids = input_ids[:max_seq_length] mask = mask[:max_seq_length] # Pad to max_seq_length padding_len = max_seq_length - len(input_ids) if padding_len > 0: input_ids.extend([tokenizer.pad_token_id or 0] * padding_len) mask.extend([0] * padding_len) # Mask padding tokens # Create attention mask (1 for real tokens, 0 for padding) attention_mask = [1 if i < len(input_ids) - padding_len else 0 for i in range(len(input_ids))] return input_ids, attention_mask, mask def apply_response_masking(input_ids: List[int], mask: List[int]) -> List[int]: """ Apply response-only loss masking by converting mask values to label format. Args: input_ids: Token IDs mask: Mask array (0=ignore, 1=train) Returns: labels: Where mask=0 tokens have label=-100 (ignore in loss), mask=1 tokens have label=input_id """ labels = input_ids.copy() for i, m in enumerate(mask): if m == 0: labels[i] = -100 # CrossEntropyLoss will ignore this token return labels def build_response_only_next_token_labels(input_ids: List[int], mask: List[int]) -> List[int]: """ Build next-token labels for SFT response-only training. Position i predicts token i+1, so the loss mask must be applied to the target token, not the current input token. This trains the first assistant token from the assistant role marker and avoids training on masked EOS/padding targets. """ if len(input_ids) != len(mask): raise ValueError(f"input_ids and mask must have the same length: {len(input_ids)} != {len(mask)}") labels = apply_response_masking(input_ids, mask) return labels[1:] + [-100]