"""Model classes for MoCo models compatible with transformers""" import sys import os from pathlib import Path import torch import torch.nn as nn from transformers import PreTrainedModel from transformers.modeling_outputs import ImageClassifierOutputWithNoAttention from safetensors.torch import load_file # Embed ResNet code directly to avoid import issues when transformers caches modules def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): """3x3 convolution with padding""" return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=dilation, groups=groups, bias=False, dilation=dilation) def conv1x1(in_planes, out_planes, stride=1): """1x1 convolution""" return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) class BasicBlock(nn.Module): expansion = 1 def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None): super(BasicBlock, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d if groups != 1 or base_width != 64: raise ValueError('BasicBlock only supports groups=1 and base_width=64') if dilation > 1: raise NotImplementedError("Dilation > 1 not supported in BasicBlock") self.conv1 = conv3x3(inplanes, planes, stride) self.bn1 = norm_layer(planes) self.relu = nn.ReLU(inplace=True) self.conv2 = conv3x3(planes, planes) self.bn2 = norm_layer(planes) self.downsample = downsample self.stride = stride def forward(self, x): identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out class Bottleneck(nn.Module): expansion = 4 def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None): super(Bottleneck, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d width = int(planes * (base_width / 64.)) * groups self.conv1 = conv1x1(inplanes, width) self.bn1 = norm_layer(width) self.conv2 = conv3x3(width, width, stride, groups, dilation) self.bn2 = norm_layer(width) self.conv3 = conv1x1(width, planes * self.expansion) self.bn3 = norm_layer(planes * self.expansion) self.relu = nn.ReLU(inplace=True) self.downsample = downsample self.stride = stride def forward(self, x): identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out class ResNet(nn.Module): def __init__(self, block, layers, num_classes=51, zero_init_residual=False, groups=1, width_per_group=64, replace_stride_with_dilation=None, norm_layer=None): super(ResNet, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d self._norm_layer = norm_layer self.inplanes = 64 self.dilation = 1 if replace_stride_with_dilation is None: replace_stride_with_dilation = [False, False, False] if len(replace_stride_with_dilation) != 3: raise ValueError("replace_stride_with_dilation should be None " "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) self.groups = groups self.base_width = width_per_group self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False) self.bn1 = norm_layer(self.inplanes) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, 64, layers[0]) self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0]) self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1]) self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2]) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): nn.init.constant_(m.bn3.weight, 0) elif isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight, 0) def _make_layer(self, block, planes, blocks, stride=1, dilate=False): norm_layer = self._norm_layer downsample = None previous_dilation = self.dilation if dilate: self.dilation *= stride stride = 1 if stride != 1 or self.inplanes != planes * block.expansion: downsample = nn.Sequential( conv1x1(self.inplanes, planes * block.expansion, stride), norm_layer(planes * block.expansion), ) layers = [] layers.append(block(self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer)) self.inplanes = planes * block.expansion for _ in range(1, blocks): layers.append(block(self.inplanes, planes, groups=self.groups, base_width=self.base_width, dilation=self.dilation, norm_layer=norm_layer)) return nn.Sequential(*layers) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avgpool(x) x = torch.flatten(x, 1) x = self.fc(x) return x # Import configuration try: from configuration_moco import MoCoResNetConfig except ImportError: # Fallback: import from same directory import importlib.util config_path = Path(__file__).parent / "configuration_moco.py" spec = importlib.util.spec_from_file_location("configuration_moco", config_path) config_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(config_module) MoCoResNetConfig = config_module.MoCoResNetConfig class MoCoResNetForImageClassification(PreTrainedModel): """MoCo ResNet model for image classification or feature extraction""" config_class = MoCoResNetConfig def __init__(self, config): super().__init__(config) # Build ResNet model from config if config.block == "Bottleneck": block = Bottleneck elif config.block == "BasicBlock": block = BasicBlock else: raise ValueError(f"Unsupported block type: {config.block}") # Create ResNet backbone # For MoCo models, we typically want feature extraction (no classification head) # But we need to initialize with some num_classes, then replace fc if needed self.model = ResNet( block=block, layers=config.layers, num_classes=2048 # Standard ResNet-50 feature dimension ) # Replace classification head based on num_labels if config.num_labels == 0: # Feature extraction mode: replace fc with identity self.model.fc = nn.Identity() else: # Classification mode: replace fc with new classifier self.model.fc = nn.Linear(512 * block.expansion, config.num_labels) def forward(self, pixel_values=None, labels=None, return_dict=None, **kwargs): """ Args: pixel_values: Input images (B, C, H, W) labels: Optional labels for loss computation (only if num_labels > 0) return_dict: Whether to return a ModelOutput instead of a plain tuple """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict if pixel_values is None: raise ValueError("pixel_values must be provided") # Forward through ResNet features = self.model(pixel_values) # If num_labels > 0, features are logits; otherwise they're feature vectors if self.config.num_labels > 0: logits = features loss = None if labels is not None: loss_fct = nn.CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1)) if not return_dict: output = (logits,) return (loss,) + output if loss is not None else output return ImageClassifierOutputWithNoAttention( loss=loss, logits=logits, hidden_states=None, ) else: # Feature extraction mode if not return_dict: return (features,) return {"features": features} @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): """Load model from pretrained checkpoint""" config = kwargs.pop("config", None) if config is None: config = MoCoResNetConfig.from_pretrained(pretrained_model_name_or_path) model = cls(config) # Load weights from safetensors model_path = Path(pretrained_model_name_or_path) safetensors_path = model_path / "model.safetensors" if safetensors_path.exists(): state_dict = load_file(str(safetensors_path)) # Remove 'model.' prefix if present state_dict_clean = {} for k, v in state_dict.items(): if k.startswith("model."): state_dict_clean[k[6:]] = v else: state_dict_clean[k] = v model.model.load_state_dict(state_dict_clean, strict=False) else: raise FileNotFoundError(f"Model weights not found at {safetensors_path}") return model