File size: 2,200 Bytes
31e2456
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""PPG patch tokeniser — the v1 encoding chosen by E1.

Decision: raw 200 ms patches (25 samples @ 125 Hz), linear projection to d.

Rationale: E1 Stage-1 morphology extraction passed (98.6%), but Stage 2 (the
linear-probe AUROC comparison vs raw) requires AF labels that are pending.
The research plan (RESEARCH_DEVELOPMENT.md §2) specifies raw patches for v1
and defers morphology to ablation A1. We follow the spec; the E1 Stage-2
comparison runs as part of A1 once AF labels land.

Input shape:  [B, 1, T]       PPG signal in volts after bandpass 0.5-8 Hz + z-score
Output shape: [B, N, d]       N = T // patch_size tokens
"""
from __future__ import annotations

import math

import torch
from torch import nn


class PPGPatchTokeniser(nn.Module):
    """Linear projection of fixed-length PPG patches + 1D sinusoidal PE."""

    def __init__(
        self,
        patch_size: int = 25,  # 200 ms at 125 Hz
        d_model: int = 256,
        max_patches: int = 128,
    ) -> None:
        super().__init__()
        self.patch_size = patch_size
        self.d_model = d_model
        self.proj = nn.Linear(patch_size, d_model)
        self.register_buffer(
            "pos_enc", self._sinusoidal_pe(max_patches, d_model), persistent=False
        )

    @staticmethod
    def _sinusoidal_pe(n_pos: int, d: int) -> torch.Tensor:
        pe = torch.zeros(n_pos, d)
        pos = torch.arange(0, n_pos, dtype=torch.float32).unsqueeze(1)
        div = torch.exp(
            torch.arange(0, d, 2, dtype=torch.float32) * -(math.log(10_000.0) / d)
        )
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        return pe

    def forward(self, ppg: torch.Tensor) -> torch.Tensor:
        # ppg: [B, 1, T]; T must be divisible by patch_size
        b, c, t = ppg.shape
        assert c == 1, f"PPG must be single-channel, got {c}"
        assert t % self.patch_size == 0, (
            f"PPG length {t} not divisible by patch_size {self.patch_size}"
        )
        n = t // self.patch_size
        patches = ppg.view(b, n, self.patch_size)
        tokens = self.proj(patches)
        tokens = tokens + self.pos_enc[:n].unsqueeze(0)
        return tokens