File size: 8,617 Bytes
e1b27e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import re
import os
from typing import Any, Dict, List, Optional, Tuple, Union
from transformers import PreTrainedTokenizer
from transformers.utils import logging
from pathlib import Path
import json

logger = logging.get_logger(__name__)

from huggingface_hub import hf_hub_download
import json
import os

def load_json(path, repo_id=None):
    if repo_id:
        path = hf_hub_download(repo_id, path)
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def load_json_old(path: str) -> Union[Dict, List]:
    """
    Load a JSON file from the given path.
    Args:
        path (str): The path to the JSON file to be loaded.

    Returns:
        Union[Dict, List]: The parsed content of the JSON file, which could be a dictionary or a list.
    """
    full_path = Path(__file__).parent / path
    with open(full_path, "r", encoding="utf-8") as f:
        return json.load(f)


class STLTokenizer(PreTrainedTokenizer):
    """
    A custom tokenizer class that extends `PreTrainedTokenizer` to handle a specific vocabulary and tokenization process.
    This tokenizer can load a vocabulary from a JSON file, tokenize text, convert tokens to IDs,
    and handle padding and special tokens.
    """
    def __init__(self, vocab_path: str = "vocab.json", unk_token: str = "unk", pad_token: str = "pad",
                 bos_token: str = "/s", eos_token: str = "s", model_max_length = 512, **kwargs):
        """
        Initializes the STLTokenizer with a given vocabulary and special tokens.
        Args:
            vocab_path (str): The path to the JSON file containing the vocabulary.
            unk_token (str, optional): The token used for unknown words. Defaults to "unk".
            pad_token (str, optional): The token used for padding. Defaults to "pad".
            bos_token (str, optional): The token used for the beginning of a sequence. Defaults to "/s".
            eos_token (str, optional): The token used for the end of a sequence. Defaults to "s".
        """
        self.vocab = load_json("vocab.json", repo_id="saracandu/stldec_random_32")
        self.unk_token = unk_token
        self.pad_token = pad_token
        self.bos_token = bos_token
        self.eos_token = eos_token
        self.model_max_length = model_max_length
        self.id_to_token = {v: k for k, v in self.vocab.items()}  # Reverse mapping

        super().__init__(
            unk_token=unk_token,
            pad_token=pad_token,
            bos_token=bos_token,
            eos_token=eos_token,
            model_max_length=model_max_length,
            **kwargs
        )

    @property
    def vocab_size(self) -> int:
        """
        Returns the size of the vocabulary.
        Returns:
            int: The number of tokens in the vocabulary.
        """
        return len(self.vocab)

    def prepad_sequence(self, sequence, space_token = ' ', new_space_token = '@', undo = False):
        """
        Replaces spaces in the input sequence with a specified token.
        Args:
            sequence (str): The input sequence.
            undo (bool): If True, replace the padding token with spaces. Defaults to False, which pads the spaces.
        Returns:
            str: The preprocessed sequence with spaces or padding tokens replaced.
        """
        if undo:
            return sequence.replace(new_space_token, space_token)
        else:
            return sequence.replace(space_token, new_space_token)

    def add_bos_eos(self, sequence: str) -> str:
        """
        Aggiunge i token BOS all'inizio e EOS alla fine della sequenza.
        Args:
            sequence (str): La sequenza di input.
        Returns:
            str: La sequenza con i token BOS ed EOS.
        """
        return f'{self.bos_token} {sequence} {self.eos_token}'

    def tokenize(self, text: str) -> List[str]:
        """
        Tokenizes the input text into a list of tokens.
        The method preprocesses the input text by replacing spaces with padding tokens and then tries to
        find the longest possible match for each substring in the vocabulary.
        Args:
            text (str): The input text to be tokenized.
        Returns:
            List[str]: A list of tokens representing the tokenized text.
        """
        text = self.add_bos_eos(text)
        text = self.prepad_sequence(text)

        tokens = []
        i = 0
        while i < len(text):
            best_match = None
            for j in range(len(text), i, -1):  # Try matching substrings of decreasing length
                subtoken = text[i:j]
                if subtoken in self.vocab:
                    best_match = subtoken
                    break
            if best_match:
                tokens.append(best_match)
                i += len(best_match)
            else:
                tokens.append(self.unk_token)
                i += 1
        return tokens

    def convert_tokens_to_ids(self, tokens: List[str]) -> List[int]:
        """
        Converts a list of tokens into a list of token IDs.
        Args:
            tokens (List[str]): A list of tokens to be converted into IDs.
        Returns:
            List[int]: A list of corresponding token IDs.
        """
        unk_token_str = str(self.unk_token)
        unk_token_id = self.vocab.get(unk_token_str)
        return [self.vocab.get(token, unk_token_id) for token in tokens]

    def convert_ids_to_tokens(self, ids: List[int]) -> List[str]:
        """
        Converts a list of token IDs into a list of tokens.
        Args:
            ids (List[int]): A list of token IDs to be converted into tokens.
        Returns:
            List[str]: A list of corresponding tokens.
        """
        return [self.id_to_token.get(i, self.unk_token) for i in ids]

    def encode(self, sequence: str) -> List[int]:
        """
        Encodes a string sequence into a list of token IDs.

        This method tokenizes the input sequence using the `tokenize` method,
        and then converts the resulting tokens into their corresponding token IDs
        using the `convert_tokens_to_ids` method.

        Args:
            sequence (str): The input sequence (text) to be encoded.

        Returns:
            List[int]: A list of token IDs corresponding to the input sequence.
        """
        splitted_sequence = self.tokenize(sequence)
        return self.convert_tokens_to_ids(splitted_sequence)

    def postpad_sequence(self, sequence, pad_token_id):
       """
       Fills the sequence up to max_length padding elements
       """
       num_extra_elements = self.model_max_length - len(sequence) -1
       if num_extra_elements > 0:
           sequence.extend([pad_token_id] * num_extra_elements)
       return sequence

    def decode(self, token_ids: List[int]) -> str:
        """
        Decodes a list of token IDs into a string of text.
        The method converts the IDs to tokens and joins them to form a string.
        It also restores the original spaces or padding tokens if `undo` is True.
        Args:
            token_ids (List[int]): A list of token IDs to be decoded.
            skip_special_tokens (bool, optional): Whether to skip special tokens during decoding. Defaults to False.
        Returns:
            str: The decoded string.
        """
        tokens = self.convert_ids_to_tokens(token_ids)
        decoded = "".join(tokens)
        return self.prepad_sequence(decoded, undo=True)

    def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]:
        """
        Saves the tokenizer's vocabulary to a file.
        Useful only when the vocabulary has to be retrieved and is not given
        (thus this is not the case: here to further improvements with sentencepiece).
        This method saves the vocabulary to a JSON file in the specified directory.
        Args:
            save_directory (str): The directory where the vocabulary file will be saved.
            filename_prefix (Optional[str]): An optional prefix for the filename.
        Returns:
            Tuple[str]: A tuple containing the path to the saved vocabulary file.
        """
        vocab_file = f"{save_directory}/{filename_prefix + '-' if filename_prefix else ''}vocab.json"
        with open(vocab_file, "w", encoding="utf-8") as f:
            json.dump(self.vocab, f, indent=2, ensure_ascii=False)
        return (vocab_file,)

    def get_vocab(self) -> dict:
        """
        Retrieves the vocabulary used by the tokenizer.
        Returns:
            dict: The vocabulary as a dictionary.
        """
        return self.vocab