Spaces:
Runtime error
Runtime error
| # Copyright 2025 the LlamaFactory team. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| from abc import abstractmethod | |
| from dataclasses import dataclass | |
| from typing import TYPE_CHECKING, Any, Optional, Union | |
| from ..extras import logging | |
| from .data_utils import Role | |
| if TYPE_CHECKING: | |
| from datasets import Dataset, IterableDataset | |
| from transformers import Seq2SeqTrainingArguments | |
| from ..hparams import DataArguments | |
| from .mm_plugin import AudioInput, ImageInput, VideoInput | |
| from .parser import DatasetAttr | |
| MediaType = Union[ImageInput, VideoInput, AudioInput] | |
| logger = logging.get_logger(__name__) | |
| class DatasetConverter: | |
| dataset_attr: "DatasetAttr" | |
| data_args: "DataArguments" | |
| def _find_medias(self, medias: Union["MediaType", list["MediaType"], None]) -> Optional[list["MediaType"]]: | |
| r"""Optionally concatenate media path to media dir when loading from local disk.""" | |
| if medias is None: | |
| return None | |
| elif not isinstance(medias, list): | |
| medias = [medias] | |
| elif len(medias) == 0: | |
| return None | |
| else: | |
| medias = medias[:] | |
| if self.dataset_attr.load_from in ["script", "file"] and isinstance(medias[0], str): | |
| for i in range(len(medias)): | |
| if os.path.isfile(os.path.join(self.data_args.media_dir, medias[i])): | |
| medias[i] = os.path.join(self.data_args.media_dir, medias[i]) | |
| else: | |
| logger.warning_rank0_once(f"Media {medias[i]} does not exist in `media_dir`. Use original path.") | |
| return medias | |
| def __call__(self, example: dict[str, Any]) -> dict[str, Any]: | |
| r"""Convert a single example in the dataset to the standard format.""" | |
| ... | |
| class AlpacaDatasetConverter(DatasetConverter): | |
| def __call__(self, example: dict[str, Any]) -> dict[str, Any]: | |
| prompt = [] | |
| if self.dataset_attr.history and isinstance(example[self.dataset_attr.history], list): | |
| for old_prompt, old_response in example[self.dataset_attr.history]: | |
| prompt.append({"role": Role.USER.value, "content": old_prompt}) | |
| prompt.append({"role": Role.ASSISTANT.value, "content": old_response}) | |
| query = [] | |
| if self.dataset_attr.prompt and example[self.dataset_attr.prompt]: | |
| query.append(example[self.dataset_attr.prompt]) | |
| if self.dataset_attr.query and example[self.dataset_attr.query]: | |
| query.append(example[self.dataset_attr.query]) | |
| prompt.append({"role": Role.USER.value, "content": "\n".join(query)}) # "prompt\nquery" | |
| if self.dataset_attr.kto_tag and isinstance(example[self.dataset_attr.kto_tag], bool): # kto example | |
| response = [{"role": Role.ASSISTANT.value, "content": example[self.dataset_attr.response]}] | |
| if example[self.dataset_attr.kto_tag]: | |
| response = response + [{"role": Role.ASSISTANT.value, "content": ""}] | |
| else: | |
| response = [{"role": Role.ASSISTANT.value, "content": ""}] + response | |
| elif ( | |
| self.dataset_attr.ranking | |
| and isinstance(example[self.dataset_attr.chosen], str) | |
| and isinstance(example[self.dataset_attr.rejected], str) | |
| ): # pairwise example | |
| response = [ | |
| {"role": Role.ASSISTANT.value, "content": example[self.dataset_attr.chosen]}, | |
| {"role": Role.ASSISTANT.value, "content": example[self.dataset_attr.rejected]}, | |
| ] | |
| elif self.dataset_attr.response and isinstance(example[self.dataset_attr.response], str): # normal example | |
| response = [{"role": Role.ASSISTANT.value, "content": example[self.dataset_attr.response]}] | |
| else: # unsupervised | |
| response = [] | |
| output = { | |
| "_prompt": prompt, | |
| "_response": response, | |
| "_system": example[self.dataset_attr.system] if self.dataset_attr.system else "", | |
| "_tools": example[self.dataset_attr.tools] if self.dataset_attr.tools else "", | |
| "_images": self._find_medias(example[self.dataset_attr.images]) if self.dataset_attr.images else None, | |
| "_videos": self._find_medias(example[self.dataset_attr.videos]) if self.dataset_attr.videos else None, | |
| "_audios": self._find_medias(example[self.dataset_attr.audios]) if self.dataset_attr.audios else None, | |
| } | |
| return output | |
| class SharegptDatasetConverter(DatasetConverter): | |
| def __call__(self, example: dict[str, Any]) -> dict[str, Any]: | |
| tag_mapping = { | |
| self.dataset_attr.user_tag: Role.USER.value, | |
| self.dataset_attr.assistant_tag: Role.ASSISTANT.value, | |
| self.dataset_attr.observation_tag: Role.OBSERVATION.value, | |
| self.dataset_attr.function_tag: Role.FUNCTION.value, | |
| self.dataset_attr.system_tag: Role.SYSTEM.value, | |
| } | |
| odd_tags = (self.dataset_attr.user_tag, self.dataset_attr.observation_tag) | |
| even_tags = (self.dataset_attr.assistant_tag, self.dataset_attr.function_tag) | |
| accept_tags = (odd_tags, even_tags) | |
| messages = example[self.dataset_attr.messages] | |
| if ( | |
| self.dataset_attr.system_tag | |
| and len(messages) != 0 | |
| and messages[0][self.dataset_attr.role_tag] == self.dataset_attr.system_tag | |
| ): | |
| system = messages[0][self.dataset_attr.content_tag] | |
| messages = messages[1:] | |
| else: | |
| system = example[self.dataset_attr.system] if self.dataset_attr.system else "" | |
| aligned_messages = [] | |
| broken_data = False | |
| for turn_idx, message in enumerate(messages): | |
| if message[self.dataset_attr.role_tag] not in accept_tags[turn_idx % 2]: | |
| logger.warning_rank0(f"Invalid role tag in {messages}.") | |
| broken_data = True | |
| break | |
| aligned_messages.append( | |
| { | |
| "role": tag_mapping[message[self.dataset_attr.role_tag]], | |
| "content": message[self.dataset_attr.content_tag], | |
| } | |
| ) | |
| if (not self.dataset_attr.ranking and len(aligned_messages) % 2 != 0) or ( | |
| self.dataset_attr.ranking and len(aligned_messages) % 2 == 0 | |
| ): | |
| logger.warning_rank0(f"Invalid message count in {messages}.") | |
| broken_data = True | |
| if broken_data: | |
| logger.warning_rank0("Skipping this abnormal example.") | |
| prompt, response = [], [] | |
| elif self.dataset_attr.kto_tag and isinstance(example[self.dataset_attr.kto_tag], bool): # kto example | |
| prompt = aligned_messages[:-1] | |
| response = aligned_messages[-1:] | |
| if example[self.dataset_attr.kto_tag]: | |
| response = response + [{"role": Role.ASSISTANT.value, "content": ""}] | |
| else: | |
| response = [{"role": Role.ASSISTANT.value, "content": ""}] + response | |
| elif ( | |
| self.dataset_attr.ranking | |
| and isinstance(example[self.dataset_attr.chosen], dict) | |
| and isinstance(example[self.dataset_attr.rejected], dict) | |
| ): # pairwise example | |
| chosen = example[self.dataset_attr.chosen] | |
| rejected = example[self.dataset_attr.rejected] | |
| if ( | |
| chosen[self.dataset_attr.role_tag] not in accept_tags[-1] | |
| or rejected[self.dataset_attr.role_tag] not in accept_tags[-1] | |
| ): | |
| logger.warning_rank0(f"Invalid role tag in {[chosen, rejected]}.") | |
| broken_data = True | |
| prompt = aligned_messages | |
| response = [ | |
| { | |
| "role": tag_mapping[chosen[self.dataset_attr.role_tag]], | |
| "content": chosen[self.dataset_attr.content_tag], | |
| }, | |
| { | |
| "role": tag_mapping[rejected[self.dataset_attr.role_tag]], | |
| "content": rejected[self.dataset_attr.content_tag], | |
| }, | |
| ] | |
| else: # normal example | |
| prompt = aligned_messages[:-1] | |
| response = aligned_messages[-1:] | |
| output = { | |
| "_prompt": prompt, | |
| "_response": response, | |
| "_system": system, | |
| "_tools": example[self.dataset_attr.tools] if self.dataset_attr.tools else "", | |
| "_images": self._find_medias(example[self.dataset_attr.images]) if self.dataset_attr.images else None, | |
| "_videos": self._find_medias(example[self.dataset_attr.videos]) if self.dataset_attr.videos else None, | |
| "_audios": self._find_medias(example[self.dataset_attr.audios]) if self.dataset_attr.audios else None, | |
| } | |
| return output | |
| DATASET_CONVERTERS = { | |
| "alpaca": AlpacaDatasetConverter, | |
| "sharegpt": SharegptDatasetConverter, | |
| } | |
| def register_dataset_converter(name: str, dataset_converter: type["DatasetConverter"]) -> None: | |
| r"""Register a new dataset converter.""" | |
| if name in DATASET_CONVERTERS: | |
| raise ValueError(f"Dataset converter {name} already exists.") | |
| DATASET_CONVERTERS[name] = dataset_converter | |
| def get_dataset_converter(name: str, dataset_attr: "DatasetAttr", data_args: "DataArguments") -> "DatasetConverter": | |
| r"""Get a dataset converter.""" | |
| if name not in DATASET_CONVERTERS: | |
| raise ValueError(f"Dataset converter {name} not found.") | |
| return DATASET_CONVERTERS[name](dataset_attr, data_args) | |
| def align_dataset( | |
| dataset: Union["Dataset", "IterableDataset"], | |
| dataset_attr: "DatasetAttr", | |
| data_args: "DataArguments", | |
| training_args: "Seq2SeqTrainingArguments", | |
| ) -> Union["Dataset", "IterableDataset"]: | |
| r"""Align the dataset to a specific format. | |
| Aligned dataset: | |
| _prompt: [{"role": "user", "content": "..."}] * (2T - 1) | |
| _response: [{"role": "assistant", "content": "..."}] * N (N > 1 for ranking dataset) | |
| _system: "..." | |
| _tools: "..." | |
| _images: [] | |
| _videos: [] | |
| _audios: [] | |
| """ | |
| column_names = list(next(iter(dataset)).keys()) | |
| kwargs = {} | |
| if not data_args.streaming: | |
| kwargs = dict( | |
| num_proc=data_args.preprocessing_num_workers, | |
| load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0), | |
| desc="Converting format of dataset", | |
| ) | |
| dataset_converter = get_dataset_converter(dataset_attr.formatting, dataset_attr, data_args) | |
| return dataset.map( | |
| dataset_converter, | |
| batched=False, | |
| remove_columns=column_names, | |
| **kwargs, | |
| ) | |