File size: 9,550 Bytes
3bb804c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from __future__ import annotations

import datetime
import inspect
from collections.abc import Sequence
from typing import Any, Literal

from edfio._header_field import encode_str


def _repr_from_init(obj: Any) -> str:
    parameters = []
    for name in inspect.signature(obj.__class__).parameters:
        parameters.append(f"{name}={getattr(obj, name)!r}")
    return f"{obj.__class__.__name__}({', '.join(parameters)})"


_MONTH_NAMES = (
    "JAN",
    "FEB",
    "MAR",
    "APR",
    "MAY",
    "JUN",
    "JUL",
    "AUG",
    "SEP",
    "OCT",
    "NOV",
    "DEC",
)


def _decode_edfplus_date(date: str) -> datetime.date:
    day, month, year = date.split("-")
    try:
        month_int = _MONTH_NAMES.index(month.upper()) + 1
    except ValueError:
        raise ValueError(f"Invalid month: {month}, options: {_MONTH_NAMES}") from None
    return datetime.date(int(year), month_int, int(day))


def _encode_edfplus_date(date: datetime.date) -> str:
    return f"{date.day:02}-{_MONTH_NAMES[date.month - 1]}-{date.year:02}"


def _validate_subfields(subfields: dict[str, str]) -> None:
    for key, value in subfields.items():
        if not value:
            raise ValueError(f"Subfield {key} must not be an empty string")
        if " " in value:
            raise ValueError(f"Subfield {key} contains spaces: {value!r}")


class AnonymizedDateError(ValueError):
    """Raised when trying to access an anonymized startdate or birthdate."""


class Patient:
    """
    Object representation of the local patient identification.

    Parsing from/to the string containing the local_patient_identification header field
    is done according to the EDF+ specs. Subfields must be ASCII (32..126) and may not
    contain spaces.

    Parameters
    ----------
    code : str, default: `"X"`
        The code by which the patient is known in the hospital administration.
    sex : `{"X", "F", "M"}`, default: `"X"`
        Sex, `F` for female, `M` for male, `X` if anonymized.
    birthdate : datetime.date | None, default: None
        Patient birthdate, stored as `X` if `None`.
    name : str, default: `"X"`
        The patient's name, stored as `X` if `None`.
    additional : Sequence[str], default: `()`
        Optional additional subfields. Will be stored in the header field separated by
        spaces.
    """

    def __init__(
        self,
        *,
        code: str = "X",
        sex: Literal["F", "M", "X"] = "X",
        birthdate: datetime.date | None = None,
        name: str = "X",
        additional: Sequence[str] = (),
    ) -> None:
        if sex not in ("F", "M", "X"):
            raise ValueError(f"Invalid sex: {sex}, must be one of F, M, X")
        if birthdate is None:
            birthdate_field = "X"
        else:
            birthdate_field = _encode_edfplus_date(birthdate)
        subfields = {
            "code": code,
            "sex": sex,
            "birthdate": birthdate_field,
            "name": name,
            **{f"additional[{i}]": v for i, v in enumerate(additional)},
        }
        _validate_subfields(subfields)
        local_patient_identification = " ".join(subfields.values())
        encode_str(local_patient_identification, 80)
        self._local_patient_identification = local_patient_identification

    def __repr__(self) -> str:
        try:
            return _repr_from_init(self)
        except Exception:
            return repr(self._local_patient_identification)

    @classmethod
    def _from_str(cls, string: str) -> Patient:
        obj = object.__new__(cls)
        obj._local_patient_identification = string
        return obj

    def _to_str(self) -> str:
        return self._local_patient_identification

    @property
    def code(self) -> str:
        """The code by which the patient is known in the hospital administration."""
        return self.get_subfield(0)

    @property
    def sex(self) -> str:
        """Sex, `F` for female, `M` for male, `X` if anonymized."""
        return self.get_subfield(1)

    @property
    def birthdate(self) -> datetime.date:
        """Patient birthdate."""
        birthdate_field = self.get_subfield(2)
        if birthdate_field == "X":
            raise AnonymizedDateError("Patient birthdate is not available ('X').")
        return _decode_edfplus_date(birthdate_field)

    @property
    def name(self) -> str:
        """The patient's name."""
        return self.get_subfield(3)

    @property
    def additional(self) -> tuple[str, ...]:
        """Optional additional subfields."""
        return tuple(self._local_patient_identification.split()[4:])

    def get_subfield(self, idx: int) -> str:
        """
        Access a subfield of the local patient identification field by index.

        Parameters
        ----------
        idx : int
            The index of the subfield to access.

        Returns
        -------
        str
            The subfield at the specified index. If the index exceeds the actually
            available number of subfields, the return value is `"X"`.
        """
        subfields = self._local_patient_identification.split()
        if len(subfields) <= idx:
            return "X"
        return subfields[idx]


class Recording:
    """
    Object representation of the local recording identification.

    Parsing from/to the string containing the local_recording_identification header
    field is done according to the EDF+ specs. Subfields must be ASCII (32..126) and may
    not contain spaces.

    Parameters
    ----------
    startdate : datetime.date | None, default: None
        The recording startdate.
    hospital_administration_code : str, default: `"X"`
        The hospital administration code of the investigation, e.g., EEG number or PSG
        number.
    investigator_technician_code : str, default: `"X"`
        A code specifying the responsible investigator or technician.
    equipment_code : str, default: `"X"`
        A code specifying the used equipment.
    additional : Sequence[str], default: `()`
        Optional additional subfields. Will be stored in the header field separated by
        spaces.
    """

    def __init__(
        self,
        *,
        startdate: datetime.date | None = None,
        hospital_administration_code: str = "X",
        investigator_technician_code: str = "X",
        equipment_code: str = "X",
        additional: Sequence[str] = (),
    ) -> None:
        if startdate is None:
            startdate_field = "X"
        else:
            startdate_field = _encode_edfplus_date(startdate)
        subfields = {
            "startdate": startdate_field,
            "hospital_administration_code": hospital_administration_code,
            "investigator_technician_code": investigator_technician_code,
            "equipment_code": equipment_code,
            **{f"additional[{i}]": v for i, v in enumerate(additional)},
        }
        _validate_subfields(subfields)
        local_recording_identification = " ".join(("Startdate", *subfields.values()))
        encode_str(local_recording_identification, 80)
        self._local_recording_identification = local_recording_identification

    def __repr__(self) -> str:
        try:
            return _repr_from_init(self)
        except Exception:
            return repr(self._local_recording_identification)

    @classmethod
    def _from_str(cls, string: str) -> Recording:
        obj = object.__new__(cls)
        obj._local_recording_identification = string
        return obj

    def _to_str(self) -> str:
        return self._local_recording_identification

    @property
    def startdate(self) -> datetime.date:
        """The recording startdate."""
        if not self._local_recording_identification.startswith("Startdate "):
            raise ValueError(
                f"Local recording identification field {self._local_recording_identification!r} does not follow EDF+ standard."
            )
        startdate_field = self.get_subfield(1)
        if startdate_field == "X":
            raise AnonymizedDateError("Recording startdate is not available ('X').")
        return _decode_edfplus_date(startdate_field)

    @property
    def hospital_administration_code(self) -> str:
        """The hospital administration code of the investigation."""
        return self.get_subfield(2)

    @property
    def investigator_technician_code(self) -> str:
        """A code specifying the responsible investigator or technician."""
        return self.get_subfield(3)

    @property
    def equipment_code(self) -> str:
        """A code specifying the used equipment."""
        return self.get_subfield(4)

    @property
    def additional(self) -> tuple[str, ...]:
        """Optional additional subfields."""
        return tuple(self._local_recording_identification.split()[5:])

    def get_subfield(self, idx: int) -> str:
        """
        Access a subfield of the local recording identification field by index.

        Parameters
        ----------
        idx : int
            The index of the subfield to access. The first subfield (starting at
            index 0) should always be "Startdate" according to the EDF+ spedification.

        Returns
        -------
        str
            The subfield at the specified index. If the index exceeds the actually
            available number of subfields, the return value is `"X"`.
        """
        subfields = self._local_recording_identification.split()
        if len(subfields) <= idx:
            return "X"
        return subfields[idx]