File size: 6,543 Bytes
0a6452f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
压力计算器
Stress Calculator based on PAD changes

压力通过 PAD 状态变化动态计算,而不是作为模型输出。

理论依据:
- Stress_Change ∝ α·(-ΔP) + β·(ΔA) + γ·(-ΔD)
- 不开心(-P)、紧张(+A)、失去掌控(-D)都会增加压力
"""

import torch
import numpy as np
from typing import Union, Tuple
from loguru import logger


class StressCalculator:
    """
    压力计算器

    根据 PAD 状态变化量计算压力变化量
    """

    def __init__(
        self,
        alpha: float = 1.0,   # Pleasure 权重(不开心增加压力)
        beta: float = 0.8,    # Arousal 权重(紧张增加压力)
        gamma: float = 0.6     # Dominance 权重(失去掌控增加压力)
    ):
        """
        初始化压力计算器

        Args:
            alpha: Pleasure 维度的权重系数
            beta: Arousal 维度的权重系数
            gamma: Dominance 维度的权重系数
        """
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma

        logger.info(f"压力计算器初始化: α={alpha}, β={beta}, γ={gamma}")

    def compute_stress_change(
        self,
        delta_pad: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        根据 PAD 变化量计算压力变化量

        公式:Stress_Change = α·(-ΔP) + β·(ΔA) + γ·(-ΔD)

        Args:
            delta_pad: PAD 变化量,形状为 (batch_size, 3) 或 (3,)
                       顺序为 [ΔP, ΔA, ΔD]

        Returns:
            压力变化量,形状为 (batch_size, 1) 或 (1,)
        """
        is_tensor = isinstance(delta_pad, torch.Tensor)

        if is_tensor:
            # PyTorch 张量计算
            if delta_pad.dim() == 1:
                delta_pad = delta_pad.unsqueeze(0)

            delta_p, delta_a, delta_d = delta_pad[:, 0], delta_pad[:, 1], delta_pad[:, 2]

            # 压力计算公式
            stress_change = self.alpha * (-delta_p) + self.beta * (delta_a) + self.gamma * (-delta_d)

            return stress_change.unsqueeze(1)  # (batch_size, 1)
        else:
            # NumPy 数组计算
            if delta_pad.ndim == 1:
                delta_pad = delta_pad.reshape(1, -1)

            delta_p, delta_a, delta_d = delta_pad[:, 0], delta_pad[:, 1], delta_pad[:, 2]

            # 压力计算公式
            stress_change = self.alpha * (-delta_p) + self.beta * (delta_a) + self.gamma * (-delta_d)

            return stress_change.reshape(-1, 1)  # (batch_size, 1)

    def compute_stress_from_base_stress(
        self,
        delta_pad: Union[torch.Tensor, np.ndarray],
        base_stress: Union[torch.Tensor, np.ndarray]
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        从基准压力和变化量计算新压力

        Args:
            delta_pad: PAD 变化量,形状为 (batch_size, 3)
            base_stress: 基准压力,形状为 (batch_size, 1) 或 (1,)

        Returns:
            新压力,形状与 base_stress 相同
        """
        stress_change = self.compute_stress_change(delta_pad)

        if isinstance(stress_change, torch.Tensor):
            if isinstance(base_stress, torch.Tensor):
                if base_stress.dim() == 1:
                    base_stress = base_stress.unsqueeze(1)
            return base_stress + stress_change
        else:
            if isinstance(base_stress, np.ndarray):
                if base_stress.ndim == 1:
                    base_stress = base_stress.reshape(-1, 1)
            return base_stress + stress_change

    def get_formula_description(self) -> str:
        """返回压力计算公式描述"""
        return (
            f"Stress_Change = {self.alpha}·(-ΔP) + {self.beta}·(ΔA) + {self.gamma}·(-ΔD)\n"
            f"解释:不开心(-ΔP)、紧张(+ΔA)、失去掌控(-ΔD)都会增加压力"
        )


# 创建默认的压力计算器实例
_default_calculator = None


def get_default_calculator() -> StressCalculator:
    """获取默认的压力计算器(单例模式)"""
    global _default_calculator
    if _default_calculator is None:
        _default_calculator = StressCalculator(
            alpha=1.0,
            beta=0.8,
            gamma=0.6
        )
    return _default_calculator


def compute_stress_from_pad_change(
    delta_pad: Union[torch.Tensor, np.ndarray],
    alpha: float = 1.0,
    beta: float = 0.8,
    gamma: float = 0.6
) -> Union[torch.Tensor, np.ndarray]:
    """
    便捷函数:从 PAD 变化量计算压力变化量

    Args:
        delta_pad: PAD 变化量,形状为 (batch_size, 3)
        alpha: Pleasure 权重
        beta: Arousal 权重
        gamma: Dominance 权重

    Returns:
        压力变化量,形状为 (batch_size, 1)
    """
    calculator = StressCalculator(alpha, beta, gamma)
    return calculator.compute_stress_change(delta_pad)


def compute_stress(
    current_pad: Union[torch.Tensor, np.ndarray],
    base_pad: Union[torch.Tensor, np.ndarray],
    alpha: float = 1.0,
    beta: float = 0.8,
    gamma: float = 0.6
) -> Union[torch.Tensor, np.ndarray]:
    """
    便捷函数:从当前 PAD 和基准 PAD 计算压力

    Args:
        current_pad: 当前 PAD,形状为 (batch_size, 3)
        base_pad: 基准 PAD,形状为 (batch_size, 3)
        alpha: Pleasure 权重
        beta: Arousal 权重
        gamma: Dominance 权重

    Returns:
        压力值,形状为 (batch_size, 1)
    """
    delta_pad = current_pad - base_pad
    return compute_stress_from_pad_change(delta_pad, alpha, beta, gamma)


if __name__ == "__main__":
    # 测试代码
    print("压力计算器测试")
    print("=" * 60)

    # 创建测试数据
    delta_pad = np.array([
        [0.1, 0.2, -0.1],   # 开心、稍微紧张、失去一些掌控 → 压力应该减少
        [-0.2, 0.3, -0.2],  # 不开心、紧张、失去掌控 → 压力应该增加
        [0.0, 0.0, 0.0],    # 没有变化 → 压力不变
    ])

    calculator = StressCalculator(alpha=1.0, beta=0.8, gamma=0.6)

    print("公式:", calculator.get_formula_description())
    print("\n测试数据:")
    print(delta_pad)

    stress_change = calculator.compute_stress_change(delta_pad)
    print("\n计算出的压力变化量:")
    print(stress_change)

    print("\n解释:")
    for i, (dp, da, dd, ds) in enumerate(delta_pad):
        print(f"  样本{i+1}: ΔP={dp:.2f}, ΔA={da:.2f}, ΔD={dd:.2f} → ΔStress={ds[0]:.4f}")