""" Complete Multimodal Multitask Restoring Model (MMRM). Combines context encoder, image encoder, fusion, and decoders. """ import torch import torch.nn as nn from typing import Dict, Tuple from models.context_encoder import ContextEncoder from models.image_encoder import ImageEncoder from models.decoders import TextDecoder, ImageDecoder class MMRM(nn.Module): """ Multimodal Multitask Restoring Model. Architecture: 1. Context Encoder (RoBERTa) extracts textual features 2. Image Encoder (ResNet50) extracts visual features 3. Additive Fusion combines features 4. Text Decoder predicts missing characters 5. Image Decoder generates restored images """ def __init__(self, config, pretrained_roberta_path: str = None): """ Initialize MMRM. Args: config: Configuration object pretrained_roberta_path: Path to fine-tuned RoBERTa checkpoint (Phase 1) """ super().__init__() self.config = config # Context encoder self.context_encoder = ContextEncoder(config) # Load fine-tuned RoBERTa if provided if pretrained_roberta_path: # checkpoint = torch.load(pretrained_roberta_path, map_location='cpu') checkpoint = torch.load(pretrained_roberta_path, weights_only = False) self.context_encoder.load_state_dict(checkpoint['model_state_dict']) print(f"Loaded fine-tuned RoBERTa from {pretrained_roberta_path}") # Image encoder (with zero-initialized final layer) self.image_encoder = ImageEncoder(config, config.resnet_weights) # Text decoder (initialized with RoBERTa LM head) # Get LM head from RoBERTa # Text decoder (initialized with RoBERTa LM head) # Get LM head from RoBERTa # Text decoder (initialized with RoBERTa LM head) # Get LM head from RoBERTa from transformers import AutoModelForMaskedLM, logging as transformers_logging # Suppress warnings about unexpected keys (pooler) and set tie_word_embeddings=False transformers_logging.set_verbosity_error() try: roberta_mlm = AutoModelForMaskedLM.from_pretrained(config.roberta_model, tie_word_embeddings=False) finally: transformers_logging.set_verbosity_warning() # Handle both RoBERTa (lm_head) and BERT (cls.predictions) architectures lm_decoder = None if hasattr(roberta_mlm, "lm_head"): lm_decoder = roberta_mlm.lm_head.decoder # RoBERTa: bias is often in lm_head.bias, not decoder.bias if getattr(lm_decoder, "bias", None) is None: lm_decoder.bias = roberta_mlm.lm_head.bias elif hasattr(roberta_mlm, "cls"): lm_decoder = roberta_mlm.cls.predictions.decoder # BERT: bias might be in cls.predictions.bias if getattr(lm_decoder, "bias", None) is None: lm_decoder.bias = roberta_mlm.cls.predictions.bias self.text_decoder = TextDecoder(config, lm_decoder) # Image decoder (5 transposed conv layers) self.image_decoder = ImageDecoder(config) def forward( self, input_ids: torch.Tensor, attention_mask: torch.Tensor, mask_positions: torch.Tensor, damaged_images: torch.Tensor ) -> Tuple[torch.Tensor, torch.Tensor]: """ Forward pass through MMRM. Args: input_ids: Token IDs [batch_size, seq_len] attention_mask: Attention mask [batch_size, seq_len] mask_positions: Positions of masks [batch_size, num_masks] damaged_images: Damaged images [batch_size, num_masks, 1, 64, 64] Returns: Tuple of (text_logits, restored_images) - text_logits: [batch_size, num_masks, vocab_size] - restored_images: [batch_size, num_masks, 1, 64, 64] """ # 1. Extract textual features at mask positions # x_1 = memory[i] from paper text_features = self.context_encoder.extract_mask_features( input_ids, attention_mask, mask_positions ) # [batch_size, num_masks, hidden_dim] # 2. Extract visual features from damaged images # x_2 = ResNet50(Img) from paper image_features = self.image_encoder(damaged_images) # [batch_size, num_masks, hidden_dim] # 3. Additive fusion # x = x_1 + x_2 from paper fused_features = text_features + image_features # [batch_size, num_masks, hidden_dim] # 4. Text prediction # Y_pred = MLP(x) from paper text_logits = self.text_decoder(fused_features) # [batch_size, num_masks, vocab_size] # 5. Image restoration # Img_res = ConvT(x) from paper restored_images = self.image_decoder(fused_features) # [batch_size, num_masks, 1, 64, 64] return text_logits, restored_images def freeze_context_encoder(self): """Freeze context encoder parameters (for Phase 2).""" self.context_encoder.freeze() def unfreeze_context_encoder(self): """Unfreeze context encoder parameters.""" self.context_encoder.unfreeze() class BaselineImageModel(nn.Module): """ Baseline model: Image-only (ResNet50) for character recognition. Used as 'Img' baseline in the paper. """ def __init__(self, config): """Initialize image-only baseline.""" super().__init__() self.config = config # ResNet50 encoder self.image_encoder = ImageEncoder(config, config.resnet_weights) # Classifier self.classifier = nn.Linear(config.hidden_dim, config.vocab_size) def forward(self, damaged_images: torch.Tensor) -> torch.Tensor: """ Predict characters from images only. Args: damaged_images: [batch_size, num_masks, 1, 64, 64] Returns: Logits [batch_size, num_masks, vocab_size] """ image_features = self.image_encoder(damaged_images) logits = self.classifier(image_features) return logits class BaselineLanguageModel(nn.Module): """ Baseline model: Text-only (RoBERTa) for masked language modeling. Used as 'LM' and 'LM ft' baselines in the paper. """ def __init__(self, config, fine_tuned: bool = False): """ Initialize language model baseline. Args: config: Configuration object fine_tuned: If True, this is the fine-tuned version """ super().__init__() self.config = config self.fine_tuned = fine_tuned # Context encoder self.context_encoder = ContextEncoder(config) # Classifier # Classifier # Classifier from transformers import AutoModelForMaskedLM, logging as transformers_logging # Suppress warnings about unexpected keys (pooler) and set tie_word_embeddings=False transformers_logging.set_verbosity_error() try: roberta_mlm = AutoModelForMaskedLM.from_pretrained(config.roberta_model, tie_word_embeddings=False) finally: transformers_logging.set_verbosity_warning() # Handle both RoBERTa (lm_head) and BERT (cls.predictions) architectures if self.fine_tuned: # Phase 1 Fine-tuning used the simplified TextDecoder (Linear Layer). # We must replicate that structure to load weights correctly. lm_decoder = None if hasattr(roberta_mlm, "lm_head"): lm_decoder = roberta_mlm.lm_head.decoder if getattr(lm_decoder, "bias", None) is None: lm_decoder.bias = roberta_mlm.lm_head.bias elif hasattr(roberta_mlm, "cls"): lm_decoder = roberta_mlm.cls.predictions.decoder if getattr(lm_decoder, "bias", None) is None: lm_decoder.bias = roberta_mlm.cls.predictions.bias self.classifier = TextDecoder(config, lm_decoder) else: # For Baseline (0-shot), we must use the FULL pre-trained LM head # (Dense -> Norm -> Masked Decoder) to get valid predictions. if hasattr(roberta_mlm, "lm_head"): self.classifier = roberta_mlm.lm_head elif hasattr(roberta_mlm, "cls"): self.classifier = roberta_mlm.cls.predictions else: # Fallback using TextDecoder if head not found (should not happen with standard models) lm_decoder = roberta_mlm.lm_head.decoder if hasattr(roberta_mlm, "lm_head") else None self.classifier = TextDecoder(config, lm_decoder) def forward( self, input_ids: torch.Tensor, attention_mask: torch.Tensor, mask_positions: torch.Tensor ) -> torch.Tensor: """ Predict characters from context only. Args: input_ids: Token IDs [batch_size, seq_len] attention_mask: Attention mask [batch_size, seq_len] mask_positions: Positions of masks [batch_size, num_masks] Returns: Logits [batch_size, num_masks, vocab_size] """ text_features = self.context_encoder.extract_mask_features( input_ids, attention_mask, mask_positions ) logits = self.classifier(text_features) return logits