File size: 10,449 Bytes
9e31d55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
HuggingFace Integration for EMG Model and MorPiece Tokenizer
This file makes your custom model and tokenizer compatible with HuggingFace and lm_eval
"""

import json
import os
from typing import List, Optional, Union, Dict, Any
import torch
import torch.nn as nn
from transformers import (
    PreTrainedModel, 
    PretrainedConfig, 
    PreTrainedTokenizer,
    AutoConfig,
    AutoModel,
    AutoTokenizer,
    AutoModelForCausalLM,
    GenerationMixin,  # Add this import
)
from transformers.modeling_outputs import CausalLMOutputWithPast

# Import your existing classes
from model_eMG_simplified import EMGLanguageModel, EMGConfig, OptimizedEMG, OptimizedEMGCell
from tokenizer_MorPiece import MorPiece


class MorPieceTokenizer(PreTrainedTokenizer):
    """
    HuggingFace compatible wrapper for MorPiece tokenizer
    """
    
    def __init__(self, 
                 vocab_file=None,
                 model_file=None,
                 unk_token="<unk>",
                 pad_token="<pad>", 
                 bos_token="<s>",
                 eos_token="</s>",
                 **kwargs):
        
        # Initialize the MorPiece tokenizer
        self.morpiece = MorPiece()
        
        # Load from file if provided
        if vocab_file or model_file:
            model_path = vocab_file or model_file
            if os.path.isdir(model_path):
                self.morpiece.from_pretrained(model_path)
            else:
                # Load from JSON file
                with open(model_path, 'r') as f:
                    data = json.load(f)
                self.morpiece.roots = data.get('roots', data)
                if 'vocab' in data:
                    self.morpiece.vocab_to_id = data['vocab']
                else:
                    self.morpiece.build_vocab_lookup()
        
        # Get vocabulary
        self.vocab = self.morpiece.get_vocab()
        
        # Set special tokens
        super().__init__(
            unk_token=unk_token,
            pad_token=pad_token,
            bos_token=bos_token,
            eos_token=eos_token,
            **kwargs
        )
        
    @property
    def vocab_size(self):
        return len(self.vocab)
    
    def get_vocab(self):
        return self.vocab.copy()
    
    def _tokenize(self, text: str) -> List[str]:
        """Tokenize text into tokens"""
        # For HuggingFace compatibility, we need to return string tokens
        token_ids = self.morpiece.encode(text)
        tokens = self.morpiece.decode(token_ids)
        return tokens
    
    def _convert_token_to_id(self, token: str) -> int:
        """Convert token to ID"""
        return self.vocab.get(token, self.vocab.get(self.unk_token, 0))
    
    def _convert_id_to_token(self, index: int) -> str:
        """Convert ID to token"""
        for token, idx in self.vocab.items():
            if idx == index:
                return token
        return self.unk_token
    
    def convert_tokens_to_string(self, tokens: List[str]) -> str:
        """Convert tokens back to string"""
        # Handle special tokens
        text = "".join(tokens)
        # Clean up special tokens for display
        for special_token in [self.pad_token, self.bos_token, self.eos_token]:
            if special_token:
                text = text.replace(special_token, "")
        return text.strip()
    
    def encode(self, text: str, add_special_tokens: bool = True, **kwargs) -> List[int]:
        """Encode text to token IDs"""
        if add_special_tokens and self.bos_token:
            text = f"{self.bos_token} {text}"
        if add_special_tokens and self.eos_token:
            text = f"{text} {self.eos_token}"
            
        return self.morpiece.encode(text)
    
    def decode(self, token_ids: List[int], skip_special_tokens: bool = True, **kwargs) -> str:
        """Decode token IDs to text"""
        tokens = []
        for token_id in token_ids:
            token = self._convert_id_to_token(token_id)
            if skip_special_tokens and token in [self.pad_token, self.bos_token, self.eos_token, self.unk_token]:
                continue
            tokens.append(token)
        return self.convert_tokens_to_string(tokens)
    
    def save_pretrained(self, save_directory: str, **kwargs):
        """Save tokenizer"""
        os.makedirs(save_directory, exist_ok=True)
        
        # Save MorPiece data
        tokenizer_file = os.path.join(save_directory, "tokenizer.json")
        self.morpiece.save(tokenizer_file)
        
        # Save tokenizer config
        config = {
            "tokenizer_class": "MorPieceTokenizer",
            "unk_token": self.unk_token,
            "pad_token": self.pad_token,
            "bos_token": self.bos_token,
            "eos_token": self.eos_token,
        }
        
        config_file = os.path.join(save_directory, "tokenizer_config.json")
        with open(config_file, 'w') as f:
            json.dump(config, f, indent=2)
    
    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs):
        """Load tokenizer from pretrained"""
        return cls(vocab_file=pretrained_model_name_or_path, **kwargs)


class EMGForCausalLM(EMGLanguageModel, GenerationMixin):
    """
    Enhanced EMG model with better HuggingFace compatibility for lm_eval
    Inherits from GenerationMixin to fix the warning
    """
    
    def __init__(self, config):
        # Initialize EMGLanguageModel first
        EMGLanguageModel.__init__(self, config)
        # Then initialize GenerationMixin
        GenerationMixin.__init__(self)
        self.config = config
    
    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        past_key_values: Optional[tuple] = None,
        use_cache: Optional[bool] = None,
        **kwargs
    ) -> CausalLMOutputWithPast:
        """
        Forward pass with HuggingFace compatible output format
        """
        # Get embeddings
        embedded = self.embedding(input_ids)
        
        # Pass through EMG layers
        output, hidden = self.emg(embedded, past_key_values)
        
        # Get logits
        logits = self.output_projection(output)
        
        loss = None
        if labels is not None:
            # Shift so that tokens < n predict n
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            
            # Flatten the tokens
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), 
                          shift_labels.view(-1))
        
        return CausalLMOutputWithPast(
            loss=loss,
            logits=logits,
            past_key_values=hidden if use_cache else None,
            hidden_states=output,
        )
    
    def prepare_inputs_for_generation(
        self, 
        input_ids: torch.Tensor, 
        past_key_values=None, 
        attention_mask=None, 
        **kwargs
    ):
        """Prepare inputs for generation"""
        return {
            "input_ids": input_ids,
            "past_key_values": past_key_values,
            "attention_mask": attention_mask,
        }
    
    def _reorder_cache(self, past_key_values, beam_idx):
        """Reorder cache for beam search"""
        if past_key_values is None:
            return None
        
        reordered_cache = []
        for layer_cache in past_key_values:
            if isinstance(layer_cache, tuple):
                reordered_cache.append(tuple(
                    cache.index_select(0, beam_idx) for cache in layer_cache
                ))
            else:
                reordered_cache.append(layer_cache.index_select(0, beam_idx))
        return tuple(reordered_cache)


# Register the custom classes with transformers
def register_emg_model():
    """Register EMG model and tokenizer with transformers"""
    
    # Register config
    AutoConfig.register("emg", EMGConfig)
    
    # Register model
    AutoModel.register(EMGConfig, EMGLanguageModel)
    AutoModelForCausalLM.register(EMGConfig, EMGForCausalLM)
    
    # Register tokenizer
    AutoTokenizer.register(EMGConfig, MorPieceTokenizer)
    
    print("EMG model and MorPiece tokenizer registered with transformers!")


def load_emg_model_and_tokenizer(model_path: str):
    """
    Load EMG model and MorPiece tokenizer from saved directory
    
    Args:
        model_path: Path to the saved model directory
        
    Returns:
        tuple: (model, tokenizer)
    """
    # Register classes first
    register_emg_model()
    
    # Load model
    config = EMGConfig.from_pretrained(model_path)
    model = EMGForCausalLM.from_pretrained(model_path, config=config)
    
    # Load tokenizer
    tokenizer = MorPieceTokenizer.from_pretrained(model_path)
    
    # Set pad token id in model config if not set
    if not hasattr(config, 'pad_token_id') or config.pad_token_id is None:
        config.pad_token_id = tokenizer.pad_token_id
        model.config.pad_token_id = tokenizer.pad_token_id
    
    return model, tokenizer


def test_model_and_tokenizer(model_path: str):
    """Test the loaded model and tokenizer"""
    model, tokenizer = load_emg_model_and_tokenizer(model_path)
    
    # Test encoding/decoding
    test_text = "Hello world, this is a test."
    print(f"Original text: {test_text}")
    
    # Encode
    encoded = tokenizer.encode(test_text)
    print(f"Encoded: {encoded}")
    
    # Decode
    decoded = tokenizer.decode(encoded, skip_special_tokens=True)
    print(f"Decoded: {decoded}")
    
    # Test model forward pass
    input_ids = torch.tensor([encoded])
    with torch.no_grad():
        outputs = model(input_ids)
        print(f"Model output shape: {outputs.logits.shape}")
        print(f"Model output type: {type(outputs)}")
    
    print("Model and tokenizer are working correctly!")
    return model, tokenizer


if __name__ == "__main__":
    # Example usage
    model_path = "path/to/your/saved/model"  # Replace with your model path
    
    # Register the classes
    register_emg_model()
    
    # Test loading
    try:
        model, tokenizer = test_model_and_tokenizer(model_path)
        print("✅ Model and tokenizer loaded successfully!")
    except Exception as e:
        print(f"❌ Error loading model: {e}")