| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import math |
| | from typing import Optional |
| |
|
| | import torch.nn as nn |
| | from torch import Tensor |
| | from transformers import PreTrainedModel |
| | from transformers.activations import ACT2FN |
| | from transformers.modeling_outputs import BaseModelOutputWithNoAttention, BaseModelOutputWithPoolingAndNoAttention |
| |
|
| | from .configuration_resnet import ResNet10Config |
| |
|
| |
|
| | class MaxPool2dJax(nn.Module): |
| | """Mimics JAX's MaxPool with padding='SAME' for exact parity.""" |
| |
|
| | def __init__(self, kernel_size, stride=2): |
| | super().__init__() |
| |
|
| | |
| | self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size) |
| | self.stride = stride if isinstance(stride, tuple) else (stride, stride) |
| |
|
| | self.maxpool = nn.MaxPool2d( |
| | kernel_size=self.kernel_size, |
| | stride=self.stride, |
| | padding=0, |
| | ) |
| |
|
| | def _compute_padding(self, input_height, input_width): |
| | """Calculate asymmetric padding to match JAX's 'SAME' behavior.""" |
| |
|
| | |
| | pad_h = max( |
| | 0, (math.ceil(input_height / self.stride[0]) - 1) * self.stride[0] + self.kernel_size[0] - input_height |
| | ) |
| | pad_w = max( |
| | 0, (math.ceil(input_width / self.stride[1]) - 1) * self.stride[1] + self.kernel_size[1] - input_width |
| | ) |
| |
|
| | |
| | pad_top = pad_h // 2 |
| | pad_bottom = pad_h - pad_top |
| | pad_left = pad_w // 2 |
| | pad_right = pad_w - pad_left |
| |
|
| | return (pad_left, pad_right, pad_top, pad_bottom) |
| |
|
| | def forward(self, x): |
| | """Apply asymmetric padding before convolution.""" |
| | _, _, h, w = x.shape |
| |
|
| | |
| | pad_left, pad_right, pad_top, pad_bottom = self._compute_padding(h, w) |
| | x = nn.functional.pad( |
| | x, (pad_left, pad_right, pad_top, pad_bottom), value=-float("inf") |
| | ) |
| |
|
| | return nn.MaxPool2d(kernel_size=3, stride=2, padding=0)(x) |
| |
|
| |
|
| | class Conv2dJax(nn.Module): |
| | """Mimics JAX's Conv2D with padding='SAME' for exact parity.""" |
| |
|
| | def __init__(self, in_channels, out_channels, kernel_size, stride=1, bias=False): |
| | super().__init__() |
| |
|
| | |
| | self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size) |
| | self.stride = stride if isinstance(stride, tuple) else (stride, stride) |
| |
|
| | self.conv = nn.Conv2d( |
| | in_channels, |
| | out_channels, |
| | kernel_size=self.kernel_size, |
| | stride=self.stride, |
| | padding=0, |
| | bias=bias, |
| | ) |
| |
|
| | def _compute_padding(self, input_height, input_width): |
| | """Calculate asym |
| | metric padding to match JAX's 'SAME' behavior.""" |
| |
|
| | |
| | pad_h = max( |
| | 0, (math.ceil(input_height / self.stride[0]) - 1) * self.stride[0] + self.kernel_size[0] - input_height |
| | ) |
| | pad_w = max( |
| | 0, (math.ceil(input_width / self.stride[1]) - 1) * self.stride[1] + self.kernel_size[1] - input_width |
| | ) |
| |
|
| | |
| | pad_top = pad_h // 2 |
| | pad_bottom = pad_h - pad_top |
| | pad_left = pad_w // 2 |
| | pad_right = pad_w - pad_left |
| |
|
| | return (pad_left, pad_right, pad_top, pad_bottom) |
| |
|
| | def forward(self, x): |
| | """Apply asymmetric padding before convolution.""" |
| | _, _, h, w = x.shape |
| |
|
| | |
| | pad_left, pad_right, pad_top, pad_bottom = self._compute_padding(h, w) |
| | x = nn.functional.pad(x, (pad_left, pad_right, pad_top, pad_bottom)) |
| |
|
| | return self.conv(x) |
| |
|
| |
|
| | class BasicBlock(nn.Module): |
| | def __init__(self, in_channels, out_channels, activation, stride=1, norm_groups=4): |
| | super().__init__() |
| |
|
| | self.conv1 = Conv2dJax( |
| | in_channels, |
| | out_channels, |
| | kernel_size=3, |
| | stride=stride, |
| | bias=False, |
| | ) |
| | self.norm1 = nn.GroupNorm(num_groups=norm_groups, num_channels=out_channels) |
| | self.act1 = ACT2FN[activation] |
| | self.act2 = ACT2FN[activation] |
| | self.conv2 = Conv2dJax(out_channels, out_channels, kernel_size=3, stride=1, bias=False) |
| | self.norm2 = nn.GroupNorm(num_groups=norm_groups, num_channels=out_channels) |
| |
|
| | self.shortcut = None |
| | if in_channels != out_channels: |
| | self.shortcut = nn.Sequential( |
| | Conv2dJax(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), |
| | nn.GroupNorm(num_groups=norm_groups, num_channels=out_channels), |
| | ) |
| |
|
| | def forward(self, x): |
| | identity = x |
| |
|
| | out = self.conv1(x) |
| | out = self.norm1(out) |
| | out = self.act1(out) |
| |
|
| | out = self.conv2(out) |
| | out = self.norm2(out) |
| |
|
| | if self.shortcut is not None: |
| | identity = self.shortcut(identity) |
| |
|
| | out += identity |
| | return self.act2(out) |
| |
|
| |
|
| | class Encoder(nn.Module): |
| | def __init__(self, config: ResNet10Config): |
| | super().__init__() |
| | self.config = config |
| | self.stages = nn.ModuleList([]) |
| |
|
| | for i, size in enumerate(self.config.hidden_sizes): |
| | if i == 0: |
| | self.stages.append( |
| | BasicBlock( |
| | self.config.embedding_size, |
| | size, |
| | activation=self.config.hidden_act, |
| | ) |
| | ) |
| | else: |
| | self.stages.append( |
| | BasicBlock( |
| | self.config.hidden_sizes[i - 1], |
| | size, |
| | activation=self.config.hidden_act, |
| | stride=2, |
| | ) |
| | ) |
| |
|
| | def forward(self, hidden_state: Tensor, output_hidden_states: bool = False) -> BaseModelOutputWithNoAttention: |
| | hidden_states = () if output_hidden_states else None |
| |
|
| | for stage in self.stages: |
| | if output_hidden_states: |
| | hidden_states = hidden_states + (hidden_state,) |
| |
|
| | hidden_state = stage(hidden_state) |
| |
|
| | if output_hidden_states: |
| | hidden_states = hidden_states + (hidden_state,) |
| |
|
| | return BaseModelOutputWithNoAttention( |
| | last_hidden_state=hidden_state, |
| | hidden_states=hidden_states, |
| | ) |
| |
|
| |
|
| | class ResNet10(PreTrainedModel): |
| | config_class = ResNet10Config |
| |
|
| | def __init__(self, config): |
| | super().__init__(config) |
| |
|
| | self.embedder = nn.Sequential( |
| | nn.Conv2d( |
| | self.config.num_channels, |
| | self.config.embedding_size, |
| | kernel_size=7, |
| | stride=2, |
| | padding=3, |
| | bias=False, |
| | ), |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | nn.GroupNorm(num_groups=4, eps=1e-5, num_channels=self.config.embedding_size), |
| | ACT2FN[self.config.hidden_act], |
| | MaxPool2dJax(kernel_size=3, stride=2), |
| | ) |
| |
|
| | self.encoder = Encoder(self.config) |
| | self.pooler = nn.AdaptiveAvgPool2d(output_size=1) |
| |
|
| | def _init_pooler(self): |
| | if self.config.pooler == "avg": |
| | self.pooler = nn.AdaptiveAvgPool2d(output_size=1) |
| | elif self.config.pooler == "max": |
| | self.pooler = nn.MaxPool2d(kernel_size=3, stride=2) |
| | elif self.config.pooler == "spatial_learned_embeddings": |
| | raise ValueError("Invalid pooler, it exist in the hil serl version, but weights are missing") |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | else: |
| | raise ValueError(f"Invalid pooler: {self.config.pooler}") |
| |
|
| | def forward(self, x: Tensor, output_hidden_states: Optional[bool] = None) -> BaseModelOutputWithNoAttention: |
| | output_hidden_states = ( |
| | output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
| | ) |
| | embedding_output = self.embedder(x) |
| | encoder_outputs = self.encoder(embedding_output, output_hidden_states=output_hidden_states) |
| |
|
| | pooler_output = self.pooler(encoder_outputs.last_hidden_state) |
| |
|
| | return BaseModelOutputWithPoolingAndNoAttention( |
| | last_hidden_state=encoder_outputs.last_hidden_state, |
| | hidden_states=encoder_outputs.hidden_states, |
| | pooler_output=pooler_output, |
| | ) |
| |
|
| | def print_model_hash(self): |
| | print("Model parameters hashes:") |
| | for name, param in self.named_parameters(): |
| | print(name, param.sum()) |
| |
|