File size: 3,926 Bytes
680b0d0
8b77271
 
 
 
feea195
a269e07
8b77271
 
 
 
 
 
 
 
 
a269e07
8b77271
 
 
74ca51c
a269e07
8b77271
 
 
 
 
a269e07
 
8b77271
 
a269e07
8b77271
 
a269e07
680b0d0
a269e07
680b0d0
 
 
 
 
a269e07
680b0d0
 
 
 
 
a269e07
8b77271
a269e07
feea195
 
 
 
49db0a7
 
feea195
 
 
 
a269e07
 
 
 
 
 
 
 
 
 
 
 
 
 
8b77271
 
a269e07
 
8b77271
 
 
 
 
 
 
 
a269e07
feea195
 
 
 
8b77271
49db0a7
 
a269e07
 
8b77271
 
feea195
8b77271
 
a269e07
feea195
a269e07
49db0a7
a269e07
8b77271
 
 
a269e07
49db0a7
8b77271
a269e07
8b77271
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import torch
from torch import nn
import numpy as np


def calc_coeff(iter_num: int, high: float, low: float, alpha: float, max_iter: float) -> np.float64:
    """
    动态计算对抗训练中的权重系数以调整梯度反转的强度
    Args:
        iter_num(_int_): 迭代轮次
        high(_float_): 轮次较大时系数接近high
        low(_float_): 轮次较小时系数接近low
        alpha(_float_): 随轮次增加时系数的衰减参数
        max_iter(_float_): 最大轮次
    Returns:
        (_np.float64_): 权重系数
    """
    return np.float64(2.0 * (high - low) / (1.0 + np.exp(-alpha * iter_num / max_iter)) - (high - low) + low)


def grl_hook(coeff: float):
    """
    实现GRL(Gradient Reverse Layer 梯度反转层)
    Args:
        coeff(_float_): 梯度反转系数
    Returns:
        (_function_): 输入梯度值, 返回反转后乘coeff所得值
    """

    def fun1(grad):
        return -coeff * grad.clone()

    return fun1


def Entropy(input_prob: torch.tensor) -> torch.tensor:
    """
    计算输入样本的熵
    Args:
        input(_torch.tensor_): 输入概率分布, 形状为 (batch_size, num_classes)
    Returns:
        entropy(_torch.tensor_): 输出熵值, 长度为 batch_size
    """
    epsilon = 1e-5
    entropy = -input_prob * torch.log(input_prob + epsilon)
    entropy = torch.sum(entropy, dim=1)
    return entropy


class AdversarialNet(nn.Module):
    """
    对抗网络, 用于减小域间差异
    Args:
        in_feature(_int_): 输入特征维数
        hidden_size(_int_): 隐藏层维数
        grl_option(_float_): 梯度反转层权重选择静态or动态更新
        grl_lambda(_float_): 静态反转时梯度所乘的系数
        high(_float_): 轮次较大时系数接近high
        low(_float_): 轮次较小时系数接近low
        alpha(_float_): 随轮次增加时系数的衰减参数
        max_iter(_float_): 最大迭代轮数, 影响梯度衰减因子
    """

    def __init__(
        self,
        in_feature: int,
        hidden_size: int,
        grl_option: str = "Step",
        grl_lambda: float = 1.0,
        high: float = 1.0,
        low: float = 0.0,
        alpha: float = 10.0,
        max_iter: float = 10000.0,
    ):
        super().__init__()  # 继承nn.Module的参数
        self.ad_layer1 = nn.Sequential(
            nn.Linear(in_feature, hidden_size),
            nn.ReLU(inplace=True),  # mark: 此处为节省内存,采用了inplace操作
            nn.Dropout(),
        )
        self.ad_layer2 = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(inplace=True),
            nn.Dropout(),
        )
        self.ad_layer3 = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

        # 参数
        self.high = high
        self.low = low
        self.alpha = alpha
        self.max_iter = max_iter
        self.grl_option = grl_option
        self.grl_lambda = grl_lambda
        self.iter_num = 0  # 当前迭代轮数
        self.__in_features = 1  # 输出特征维数(伪私有化变量)

    def forward(self, x):
        # 训练模式下,迭代轮次+1
        if self.training:
            self.iter_num += 1

        # 计算对抗损失权重,可选动态更新还是保持常数
        if self.grl_option == "Cons":
            coeff = self.grl_lambda
        elif self.grl_option == "Step":
            coeff = calc_coeff(self.iter_num, self.high, self.low, self.alpha, self.max_iter)
        else:
            raise Exception("loss not implement")

        # 前向传播
        x = x * 1.0
        x.register_hook(grl_hook(coeff))  # 反转对抗层之前的梯度,以保证最大化领域对抗损失
        x = self.ad_layer1(x)
        x = self.ad_layer2(x)
        y = self.ad_layer3(x)
        y = self.sigmoid(y)
        return y

    def output_num(self):
        return self.__in_features