File size: 9,739 Bytes
87224ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""
Complete Multimodal Multitask Restoring Model (MMRM).
Combines context encoder, image encoder, fusion, and decoders.
"""

import torch
import torch.nn as nn
from typing import Dict, Tuple

from models.context_encoder import ContextEncoder
from models.image_encoder import ImageEncoder
from models.decoders import TextDecoder, ImageDecoder


class MMRM(nn.Module):
    """
    Multimodal Multitask Restoring Model.
    
    Architecture:
    1. Context Encoder (RoBERTa) extracts textual features
    2. Image Encoder (ResNet50) extracts visual features
    3. Additive Fusion combines features
    4. Text Decoder predicts missing characters
    5. Image Decoder generates restored images
    """
    
    def __init__(self, config, pretrained_roberta_path: str = None):
        """
        Initialize MMRM.
        
        Args:
            config: Configuration object
            pretrained_roberta_path: Path to fine-tuned RoBERTa checkpoint (Phase 1)
        """
        super().__init__()
        self.config = config
        
        # Context encoder
        self.context_encoder = ContextEncoder(config)
        
        # Load fine-tuned RoBERTa if provided
        if pretrained_roberta_path:
            # checkpoint = torch.load(pretrained_roberta_path, map_location='cpu')
            checkpoint = torch.load(pretrained_roberta_path, weights_only = False)
            self.context_encoder.load_state_dict(checkpoint['model_state_dict'])
            print(f"Loaded fine-tuned RoBERTa from {pretrained_roberta_path}")
        
        # Image encoder (with zero-initialized final layer)
        self.image_encoder = ImageEncoder(config, config.resnet_weights)
        
        # Text decoder (initialized with RoBERTa LM head)
        # Get LM head from RoBERTa
        # Text decoder (initialized with RoBERTa LM head)
        # Get LM head from RoBERTa
        # Text decoder (initialized with RoBERTa LM head)
        # Get LM head from RoBERTa
        from transformers import AutoModelForMaskedLM, logging as transformers_logging
        
        # Suppress warnings about unexpected keys (pooler) and set tie_word_embeddings=False
        transformers_logging.set_verbosity_error()
        try:
            roberta_mlm = AutoModelForMaskedLM.from_pretrained(config.roberta_model, tie_word_embeddings=False)
        finally:
            transformers_logging.set_verbosity_warning()
        
        # Handle both RoBERTa (lm_head) and BERT (cls.predictions) architectures
        lm_decoder = None
        if hasattr(roberta_mlm, "lm_head"):
            lm_decoder = roberta_mlm.lm_head.decoder
            # RoBERTa: bias is often in lm_head.bias, not decoder.bias
            if getattr(lm_decoder, "bias", None) is None:
                lm_decoder.bias = roberta_mlm.lm_head.bias
        elif hasattr(roberta_mlm, "cls"):
            lm_decoder = roberta_mlm.cls.predictions.decoder
            # BERT: bias might be in cls.predictions.bias
            if getattr(lm_decoder, "bias", None) is None:
                lm_decoder.bias = roberta_mlm.cls.predictions.bias
                
        self.text_decoder = TextDecoder(config, lm_decoder)
        
        # Image decoder (5 transposed conv layers)
        self.image_decoder = ImageDecoder(config)
    
    def forward(
        self, 
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        mask_positions: torch.Tensor,
        damaged_images: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass through MMRM.
        
        Args:
            input_ids: Token IDs [batch_size, seq_len]
            attention_mask: Attention mask [batch_size, seq_len]
            mask_positions: Positions of masks [batch_size, num_masks]
            damaged_images: Damaged images [batch_size, num_masks, 1, 64, 64]
            
        Returns:
            Tuple of (text_logits, restored_images)
            - text_logits: [batch_size, num_masks, vocab_size]
            - restored_images: [batch_size, num_masks, 1, 64, 64]
        """
        # 1. Extract textual features at mask positions
        # x_1 = memory[i] from paper
        text_features = self.context_encoder.extract_mask_features(
            input_ids, attention_mask, mask_positions
        )  # [batch_size, num_masks, hidden_dim]
        
        # 2. Extract visual features from damaged images
        # x_2 = ResNet50(Img) from paper
        image_features = self.image_encoder(damaged_images)  # [batch_size, num_masks, hidden_dim]
        
        # 3. Additive fusion
        # x = x_1 + x_2 from paper
        fused_features = text_features + image_features  # [batch_size, num_masks, hidden_dim]
        
        # 4. Text prediction
        # Y_pred = MLP(x) from paper
        text_logits = self.text_decoder(fused_features)  # [batch_size, num_masks, vocab_size]
        
        # 5. Image restoration
        # Img_res = ConvT(x) from paper
        restored_images = self.image_decoder(fused_features)  # [batch_size, num_masks, 1, 64, 64]
        
        return text_logits, restored_images
    
    def freeze_context_encoder(self):
        """Freeze context encoder parameters (for Phase 2)."""
        self.context_encoder.freeze()
    
    def unfreeze_context_encoder(self):
        """Unfreeze context encoder parameters."""
        self.context_encoder.unfreeze()


class BaselineImageModel(nn.Module):
    """
    Baseline model: Image-only (ResNet50) for character recognition.
    Used as 'Img' baseline in the paper.
    """
    
    def __init__(self, config):
        """Initialize image-only baseline."""
        super().__init__()
        self.config = config
        
        # ResNet50 encoder
        self.image_encoder = ImageEncoder(config, config.resnet_weights)
        
        # Classifier
        self.classifier = nn.Linear(config.hidden_dim, config.vocab_size)
    
    def forward(self, damaged_images: torch.Tensor) -> torch.Tensor:
        """
        Predict characters from images only.
        
        Args:
            damaged_images: [batch_size, num_masks, 1, 64, 64]
            
        Returns:
            Logits [batch_size, num_masks, vocab_size]
        """
        image_features = self.image_encoder(damaged_images)
        logits = self.classifier(image_features)
        return logits


class BaselineLanguageModel(nn.Module):
    """
    Baseline model: Text-only (RoBERTa) for masked language modeling.
    Used as 'LM' and 'LM ft' baselines in the paper.
    """
    
    def __init__(self, config, fine_tuned: bool = False):
        """
        Initialize language model baseline.
        
        Args:
            config: Configuration object
            fine_tuned: If True, this is the fine-tuned version
        """
        super().__init__()
        self.config = config
        self.fine_tuned = fine_tuned
        
        # Context encoder
        self.context_encoder = ContextEncoder(config)
        
        # Classifier
        # Classifier
        # Classifier
        from transformers import AutoModelForMaskedLM, logging as transformers_logging
        
        # Suppress warnings about unexpected keys (pooler) and set tie_word_embeddings=False
        transformers_logging.set_verbosity_error()
        try:
            roberta_mlm = AutoModelForMaskedLM.from_pretrained(config.roberta_model, tie_word_embeddings=False)
        finally:
            transformers_logging.set_verbosity_warning()
        
        # Handle both RoBERTa (lm_head) and BERT (cls.predictions) architectures
        if self.fine_tuned:
            # Phase 1 Fine-tuning used the simplified TextDecoder (Linear Layer).
            # We must replicate that structure to load weights correctly.
            lm_decoder = None
            if hasattr(roberta_mlm, "lm_head"):
                lm_decoder = roberta_mlm.lm_head.decoder
                if getattr(lm_decoder, "bias", None) is None:
                    lm_decoder.bias = roberta_mlm.lm_head.bias
            elif hasattr(roberta_mlm, "cls"):
                lm_decoder = roberta_mlm.cls.predictions.decoder
                if getattr(lm_decoder, "bias", None) is None:
                    lm_decoder.bias = roberta_mlm.cls.predictions.bias
            
            self.classifier = TextDecoder(config, lm_decoder)
        else:
            # For Baseline (0-shot), we must use the FULL pre-trained LM head
            # (Dense -> Norm -> Masked Decoder) to get valid predictions.
            if hasattr(roberta_mlm, "lm_head"):
                self.classifier = roberta_mlm.lm_head
            elif hasattr(roberta_mlm, "cls"):
                self.classifier = roberta_mlm.cls.predictions
            else:
                # Fallback using TextDecoder if head not found (should not happen with standard models)
                lm_decoder = roberta_mlm.lm_head.decoder if hasattr(roberta_mlm, "lm_head") else None
                self.classifier = TextDecoder(config, lm_decoder)
    
    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        mask_positions: torch.Tensor
    ) -> torch.Tensor:
        """
        Predict characters from context only.
        
        Args:
            input_ids: Token IDs [batch_size, seq_len]
            attention_mask: Attention mask [batch_size, seq_len]
            mask_positions: Positions of masks [batch_size, num_masks]
            
        Returns:
            Logits [batch_size, num_masks, vocab_size]
        """
        text_features = self.context_encoder.extract_mask_features(
            input_ids, attention_mask, mask_positions
        )
        logits = self.classifier(text_features)
        return logits