File size: 1,984 Bytes
31e2456
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""ECG patch tokeniser for single-lead II @ 250 Hz — v1 per E0 audit findings.

Input: [B, 1, T], T = 50 × N (200 ms patches at 250 Hz)

The research plan called for 2D (leads × time) patches over 12-lead @ 500 Hz.
E0 found the HF mirror is 3-lead (II/V/aVR) @ ~250 Hz, with lead II in 93.7%
of segments. We drop records without lead II, use a 1D patch scheme over a
single lead, and defer the multi-lead 2D variant to a future ablation if
lead availability becomes an issue.
"""
from __future__ import annotations

import math

import torch
from torch import nn


class ECGPatchTokeniser(nn.Module):
    """Linear projection of fixed-length ECG patches + 1D sinusoidal PE."""

    def __init__(
        self,
        patch_size: int = 50,  # 200 ms at 250 Hz
        d_model: int = 256,
        max_patches: int = 128,
    ) -> None:
        super().__init__()
        self.patch_size = patch_size
        self.d_model = d_model
        self.proj = nn.Linear(patch_size, d_model)
        self.register_buffer(
            "pos_enc", self._sinusoidal_pe(max_patches, d_model), persistent=False
        )

    @staticmethod
    def _sinusoidal_pe(n_pos: int, d: int) -> torch.Tensor:
        pe = torch.zeros(n_pos, d)
        pos = torch.arange(0, n_pos, dtype=torch.float32).unsqueeze(1)
        div = torch.exp(
            torch.arange(0, d, 2, dtype=torch.float32) * -(math.log(10_000.0) / d)
        )
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        return pe

    def forward(self, ecg: torch.Tensor) -> torch.Tensor:
        b, c, t = ecg.shape
        assert c == 1, f"single-lead expected, got {c}"
        assert t % self.patch_size == 0, (
            f"ECG length {t} not divisible by patch_size {self.patch_size}"
        )
        n = t // self.patch_size
        patches = ecg.view(b, n, self.patch_size)
        tokens = self.proj(patches)
        tokens = tokens + self.pos_enc[:n].unsqueeze(0)
        return tokens