Feature Extraction
Transformers
Safetensors
custom_code
File size: 3,278 Bytes
c260a86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto.  Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import math
from typing import Dict, Optional

import torch
from torch import nn

from einops import rearrange
from timm.models.vision_transformer import Block

from .enable_spectral_reparam import disable_spectral_reparam, enable_spectral_reparam
from .adaptor_mlp import MLP, MLP2
from .adaptor_attn import AttnFDHead


MLP_SUMMARY_FACTORY = {
    'v1': MLP,
    'v2': MLP2,
}

MLP_FD_FACTORY = {
    'v1': MLP,
    'v2': MLP2,
    'attn': AttnFDHead,
}


def strip_prefix(state: Dict[str, torch.Tensor], prefix: str):
    state = {
        k[len(prefix):]: v
        for k, v in state.items()
        if k.startswith(prefix)
    }
    return state


def get_mlp_info_from_state(version: str, state: Dict[str, torch.Tensor], prefix: str = '', spectral_weights: bool = False):
    state = strip_prefix(state, prefix)

    weight_suffix = 'weight' if not spectral_weights else 'parametrizations.weight.original'

    if version == 'v1':
        hidden_dim, input_dim = state[f'fc1.{weight_suffix}'].shape
        output_dim = state[f'fc2.{weight_suffix}'].shape[0]

        for num_inner in range(1000):
            k = f'inner.{num_inner}.0.weight'
            if k not in state:
                break
    elif version == 'v2':
        hidden_dim, input_dim = state[f'fc1.{weight_suffix}'].shape
        output_dim = state[f'final.2.{weight_suffix}'].shape[0]

        for num_inner in range(1000):
            k = f'blocks.{num_inner}.0.weight'
            if k not in state:
                break
    elif version == 'attn':
        hidden_dim, input_dim = state[f'mlp.fc1.{weight_suffix}'].shape
        output_dim = state[f'mlp.final.2.{weight_suffix}'].shape[0]
        num_inner = 0
    else:
        raise ValueError(f'Unsupported MLP version: {version}')

    return input_dim, hidden_dim, output_dim, num_inner


def create_mlp_from_config(version: str, input_dim: int, hidden_dim: int, output_dim: int, num_inner: int, is_summary: bool = True, **kwargs):
    factory = MLP_SUMMARY_FACTORY if is_summary else MLP_FD_FACTORY

    ret: nn.Module = factory[version](input_dim, hidden_dim, output_dim, num_inner, from_config=True, **kwargs)

    return ret


def create_mlp_from_state(version: str, state: Dict[str, torch.Tensor], prefix: str = '', spectral_weights: bool = False, is_summary: bool = True, **kwargs):
    state = strip_prefix(state, prefix)

    input_dim, hidden_dim, output_dim, num_inner = get_mlp_info_from_state(version, state, spectral_weights=spectral_weights)

    ret: nn.Module = create_mlp_from_config(version, input_dim, hidden_dim, output_dim, num_inner, is_summary=is_summary, **kwargs)
    if spectral_weights:
        enable_spectral_reparam(ret, init_norm_to_current=False, state_dict_guidance=state)

    ret.load_state_dict(state)

    if spectral_weights:
        disable_spectral_reparam(ret)

    return ret