opsecsystems commited on
Commit
e3650ee
Β·
verified Β·
1 Parent(s): dc713f7

Upload openbci_impedance.py

Browse files
Files changed (1) hide show
  1. openbci_impedance.py +694 -0
openbci_impedance.py ADDED
@@ -0,0 +1,694 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ OpenBCI Cyton + Daisy β€” Moniteur d'impedance live des electrodes
4
+ ================================================================
5
+ Connexion via dongle USB, protocole serie officiel OpenBCI.
6
+
7
+ Dependances :
8
+ pip install pyserial scipy matplotlib numpy
9
+
10
+ Usage :
11
+ python openbci_impedance.py # auto-detection du port
12
+ python openbci_impedance.py --port COM3
13
+ python openbci_impedance.py --list-ports
14
+
15
+ Sources protocole :
16
+ https://docs.openbci.com/Cyton/CytonSDK/
17
+ https://docs.openbci.com/Cyton/CytonDataFormat/
18
+ https://openbci.com/forum/index.php?p=/discussion/2436/
19
+ """
20
+
21
+ import sys
22
+ import time
23
+ import threading
24
+ import argparse
25
+ from collections import deque
26
+
27
+ import numpy as np
28
+ import serial
29
+ import serial.tools.list_ports
30
+ from scipy.signal import butter, sosfiltfilt, iirnotch
31
+
32
+ import matplotlib
33
+ matplotlib.use("TkAgg")
34
+ import matplotlib.pyplot as plt
35
+ import matplotlib.patches as mpatches
36
+ from matplotlib.patches import Circle, Arc
37
+ from matplotlib.animation import FuncAnimation
38
+
39
+ # ╔══════════════════════════════════════════════════╗
40
+ # β•‘ CONSTANTES OFFICIELLES β•‘
41
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
42
+
43
+ BAUD_RATE = 115200
44
+ PACKET_SIZE = 33
45
+ START_BYTE = 0xA0
46
+
47
+ ADS1299_VREF = 4.5
48
+ ADS1299_GAIN = 24.0
49
+ SCALE_UV = (ADS1299_VREF / ADS1299_GAIN) / (2**23 - 1) * 1_000_000
50
+
51
+ # Pleine echelle ADS1299 en Β΅V (seuil railing)
52
+ ADC_MAX_UV = (2**23 - 1) * SCALE_UV # β‰ˆ 187 500 Β΅V
53
+
54
+ LEAD_OFF_FREQ = 31.5 # Hz
55
+ LEAD_OFF_AMPS = 6e-9 # A
56
+ SERIES_RESISTOR = 2200 # Ξ©
57
+
58
+ N_CHANNELS = 16
59
+ SAMPLE_RATE = 125 # Hz effectif par canal (pairing Cyton+Daisy)
60
+ DAISY_CHARS = ['Q','W','E','R','T','Y','U','I']
61
+
62
+ # ╔══════════════════════════════════════════════════╗
63
+ # β•‘ MAPPING ELECTRODES β•‘
64
+ # β•‘ CORRECTION : Y positif = POSTERIOR (arriere) β•‘
65
+ # β•‘ On inverse Y a l'affichage pour placer les β•‘
66
+ # β•‘ electrodes dans la region posterieure du crane. β•‘
67
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
68
+ # Coordonnees brutes fournies (X, Y)
69
+ ELECTRODE_POSITIONS_RAW = [
70
+ ( 0.15, 0.35), # CH1
71
+ (-0.15, 0.35), # CH2
72
+ ( 0.25, 0.38), # CH3
73
+ (-0.25, 0.38), # CH4
74
+ (-0.35, 0.40), # CH5
75
+ ( 0.35, 0.40), # CH6
76
+ ( 0.15, 0.45), # CH7
77
+ ( 0.00, 0.48), # CH8
78
+ (-0.20, 0.25), # CH9
79
+ (-0.10, 0.25), # CH10
80
+ ( 0.00, 0.25), # CH11
81
+ ( 0.10, 0.25), # CH12
82
+ ( 0.20, 0.25), # CH13
83
+ ( 0.00, 0.35), # CH14
84
+ ( 0.00, 0.00), # CH15 (inactive)
85
+ ( 0.00, 0.00), # CH16 (inactive)
86
+ ]
87
+
88
+ HEAD_CENTER_Y = 0.12 # centre geometrique de la tete dans l'axe plot
89
+
90
+ def _posterior_y(y_raw):
91
+ """
92
+ Reflechit Y autour du centre de la tete pour placer les electrodes
93
+ dans la region posterieure (bas du head plot = arriere du crane).
94
+ y_raw > 0 β†’ affiche sous l'equateur.
95
+ """
96
+ return 2.0 * HEAD_CENTER_Y - y_raw # = 0.24 - y_raw
97
+
98
+ # Positions d'affichage finales
99
+ ELECTRODE_POSITIONS = [
100
+ (x, _posterior_y(y)) if not (x == 0.0 and y == 0.0) else (0.0, 0.0)
101
+ for x, y in ELECTRODE_POSITIONS_RAW
102
+ ]
103
+
104
+ # Seuils d'impedance et couleurs
105
+ IMP_THRESHOLDS = [5_000, 10_000, 25_000]
106
+ IMP_COLORS = ['#00CC44', '#CCCC00', '#FF8800', '#CC2222']
107
+ IMP_LABELS_LEG = ['<5 kΞ© Good', '5-10 kΞ© OK', '10-25 kΞ© Fair', '>25 kΞ© Poor']
108
+ IMP_NO_DATA = '#555577'
109
+ IMP_RAILED = '#220033' # violet fonce = saturation / circuit ouvert
110
+
111
+
112
+ # ╔══════════════════════════════════════════════════╗
113
+ # β•‘ DETECTION DU PORT SERIE β•‘
114
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
115
+
116
+ def find_openbci_port():
117
+ keywords = ['openbci', 'ftdi', 'usbserial', 'ttyusb', 'ft231', 'ft232']
118
+ ports = serial.tools.list_ports.comports()
119
+ for p in ports:
120
+ combined = ((p.description or '') + (p.hwid or '')).lower()
121
+ if any(k in combined for k in keywords):
122
+ return p.device
123
+ for p in ports:
124
+ if 'usb' in (p.hwid or '').lower():
125
+ return p.device
126
+ return None
127
+
128
+
129
+ # ╔═══════════════════════════════════════════════��══╗
130
+ # β•‘ PARSING DES PAQUETS OPENBCI β•‘
131
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
132
+
133
+ def parse_24bit_signed(b0, b1, b2):
134
+ val = (b0 << 16) | (b1 << 8) | b2
135
+ if val & 0x800000:
136
+ val -= 0x1000000
137
+ return val
138
+
139
+
140
+ def parse_packet(data: bytes):
141
+ if len(data) < PACKET_SIZE:
142
+ return None
143
+ if data[0] != START_BYTE:
144
+ return None
145
+ if (data[32] & 0xF0) != 0xC0:
146
+ return None
147
+ sample_num = data[1]
148
+ channels_uv = []
149
+ for ch in range(8):
150
+ offset = 2 + ch * 3
151
+ raw = parse_24bit_signed(data[offset], data[offset+1], data[offset+2])
152
+ channels_uv.append(raw * SCALE_UV)
153
+ return sample_num, channels_uv
154
+
155
+
156
+ # ╔══════════════════════════════════════════════════╗
157
+ # β•‘ FILTRES NUMERIQUES β•‘
158
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
159
+
160
+ def _make_notch_sos(freq_hz: float, q: float = 30.0, fs: float = SAMPLE_RATE):
161
+ """Filtre notch IIR (bande rejetee autour de freq_hz)."""
162
+ w0 = freq_hz / (fs / 2.0)
163
+ if w0 <= 0 or w0 >= 1.0:
164
+ return None
165
+ b, a = iirnotch(w0, q)
166
+ # Convertir en SOS pour stabilite numerique
167
+ from scipy.signal import tf2sos
168
+ return tf2sos(b, a)
169
+
170
+
171
+ def _make_bandpass_sos(low_hz: float, high_hz: float, order: int = 4,
172
+ fs: float = SAMPLE_RATE):
173
+ """Filtre passe-bande Butterworth en SOS."""
174
+ nyq = fs / 2.0
175
+ low = low_hz / nyq
176
+ high = high_hz / nyq
177
+ if low <= 0 or high >= 1.0:
178
+ return None
179
+ return butter(order, [low, high], btype='band', output='sos')
180
+
181
+
182
+ # Pre-calculer les filtres une seule fois
183
+ # Californie = reseau 60 Hz uniquement (pas de 50 Hz)
184
+ _NOTCH_60_SOS = _make_notch_sos(60.0)
185
+ _BANDPASS_SOS = _make_bandpass_sos(28.0, 35.0)
186
+
187
+
188
+ # ╔══════════════════════════════════════════════════╗
189
+ # β•‘ CALCUL D'IMPEDANCE (methode GUI) β•‘
190
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
191
+
192
+ # Seuil de railing : si >RAIL_FRAC des echantillons depasse RAIL_THRESH_UV
193
+ # on considere le canal sature (circuit ouvert)
194
+ RAIL_THRESH_UV = 0.90 * ADC_MAX_UV # 90 % de la pleine echelle
195
+ RAIL_FRAC = 0.05 # 5 % d'echantillons suffit
196
+
197
+ def compute_impedance(buffer_uv: list):
198
+ """
199
+ Calcule l'impedance en Ohms depuis un buffer de donnees Β΅V.
200
+
201
+ Retourne :
202
+ float β†’ impedance en Ξ©
203
+ 'open' β†’ canal sature (railing) = electrode non connectee
204
+ None β†’ pas assez de donnees
205
+
206
+ Formule GUI OpenBCI :
207
+ impedance = (sqrt(2) * std_uV * 1e-6) / LEAD_OFF_AMPS - SERIES_RESISTOR
208
+
209
+ Pipeline de filtrage :
210
+ 1. Notch 50 Hz (secteur europeen)
211
+ 2. Notch 60 Hz (securite)
212
+ 3. Passe-bande 28-35 Hz (isole le signal lead-off a 31.5 Hz)
213
+ """
214
+ if len(buffer_uv) < SAMPLE_RATE:
215
+ return None
216
+
217
+ arr = np.array(buffer_uv[-SAMPLE_RATE * 2:], dtype=float)
218
+
219
+ # ── Detection du railing ────────────────────────────────────────────
220
+ # Un canal sature (entree flottante clippee) presente des echantillons
221
+ # proches de la pleine echelle +/- ADC. Dans ce cas l'impedance calculee
222
+ # serait faussement basse (std β‰ˆ 0 apres saturation DC).
223
+ railed_frac = np.mean(np.abs(arr) > RAIL_THRESH_UV)
224
+ if railed_frac >= RAIL_FRAC:
225
+ return 'open'
226
+
227
+ # ── Filtrage ────────────────────────────────────────────────────────
228
+ sig = arr.copy()
229
+ if _NOTCH_60_SOS is not None:
230
+ sig = sosfiltfilt(_NOTCH_60_SOS, sig)
231
+ if _BANDPASS_SOS is None:
232
+ return None
233
+ filtered = sosfiltfilt(_BANDPASS_SOS, sig)
234
+
235
+ std_uv = np.std(filtered)
236
+ impedance = (np.sqrt(2.0) * std_uv * 1e-6) / LEAD_OFF_AMPS - SERIES_RESISTOR
237
+ return max(0.0, impedance)
238
+
239
+
240
+ # ╔══════════════════════════════════════════════════╗
241
+ # β•‘ CONTROLEUR DU BOARD CYTON+DAISY β•‘
242
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
243
+
244
+ class CytonDaisyBoard:
245
+ INIT_TIMEOUT = 15.0
246
+ CMD_DELAY = 0.05
247
+
248
+ def __init__(self, port: str):
249
+ self.port = port
250
+ self.ser = None
251
+ self.running = False
252
+ self._lock = threading.Lock()
253
+
254
+ self._buf_size = SAMPLE_RATE * 4
255
+ self._buffers = [deque(maxlen=self._buf_size) for _ in range(N_CHANNELS)]
256
+ self._last_cyton_data = None
257
+ self._rx_thread = None
258
+
259
+ def connect(self):
260
+ print(f"[Board] Ouverture port {self.port} @ {BAUD_RATE} baud…")
261
+ self.ser = serial.Serial(
262
+ self.port, BAUD_RATE,
263
+ timeout=5,
264
+ rtscts=False,
265
+ dsrdtr=False,
266
+ )
267
+ self.ser.reset_input_buffer()
268
+ self.ser.reset_output_buffer()
269
+ time.sleep(0.5)
270
+
271
+ print("[Board] Envoi 's' pour stopper un stream residuel…")
272
+ self._send(b's')
273
+ time.sleep(1.2)
274
+ self.ser.reset_input_buffer()
275
+
276
+ print("[Board] Envoi 'v' (soft reset)…")
277
+ self._send(b'v')
278
+ time.sleep(0.5)
279
+
280
+ print("[Board] Attente '$$$'…")
281
+ self._wait_for_dollar_signs()
282
+ print("[Board] Board pret.")
283
+
284
+ self._send(b'C') # mode 16 canaux
285
+ time.sleep(0.5)
286
+ self._flush_rx()
287
+
288
+ print("[Board] Activation impedance 16 canaux…")
289
+ self._enable_all_impedances()
290
+ time.sleep(1.0)
291
+
292
+ self._send(b'b') # start stream
293
+ print("[Board] Stream demarre.")
294
+
295
+ self.running = True
296
+ self._rx_thread = threading.Thread(
297
+ target=self._read_loop, name="rx_thread", daemon=True
298
+ )
299
+ self._rx_thread.start()
300
+
301
+ def disconnect(self):
302
+ print("[Board] Deconnexion…")
303
+ self.running = False
304
+ if self.ser and self.ser.is_open:
305
+ try:
306
+ self._send(b's')
307
+ time.sleep(0.3)
308
+ self._disable_all_impedances()
309
+ time.sleep(0.3)
310
+ finally:
311
+ self.ser.close()
312
+ print("[Board] Port ferme.")
313
+
314
+ def _send(self, cmd: bytes):
315
+ if self.ser and self.ser.is_open:
316
+ self.ser.write(cmd)
317
+ self.ser.flush()
318
+
319
+ def _wait_for_dollar_signs(self):
320
+ buf = b''
321
+ start = time.time()
322
+ retry_sent = False
323
+ while time.time() - start < self.INIT_TIMEOUT:
324
+ if self.ser.in_waiting:
325
+ chunk = self.ser.read(self.ser.in_waiting)
326
+ buf += chunk
327
+ try:
328
+ readable = chunk.decode('utf-8', errors='replace')
329
+ if readable.strip():
330
+ print(f"[Board] Recu : {readable.strip()!r}")
331
+ except Exception:
332
+ pass
333
+ if b'$$$' in buf:
334
+ decoded = buf.decode('utf-8', errors='replace').strip()
335
+ print(f"[Board] Message init :\n{decoded}")
336
+ return
337
+ elapsed = time.time() - start
338
+ if elapsed > 4.0 and not retry_sent:
339
+ print("[Board] Pas de reponse apres 4s, re-envoi 'v'…")
340
+ self.ser.reset_input_buffer()
341
+ self._send(b'v')
342
+ retry_sent = True
343
+ time.sleep(0.05)
344
+ raise TimeoutError(
345
+ "Le board n'a pas envoye '$$$'.\n"
346
+ " 1. Board sous tension (LED bleue allumee)\n"
347
+ " 2. Dongle USB branche AVANT le board\n"
348
+ " 3. Interrupteur dongle sur GPIO6 (pas RESET)\n"
349
+ " 4. Aucune autre application sur ce port"
350
+ )
351
+
352
+ def _flush_rx(self, duration=0.5):
353
+ deadline = time.time() + duration
354
+ while time.time() < deadline:
355
+ if self.ser.in_waiting:
356
+ data = self.ser.read(self.ser.in_waiting)
357
+ try:
358
+ d = data.decode('utf-8', errors='replace')
359
+ if d.strip():
360
+ print(f"[Board] Reponse : {d.strip()!r}")
361
+ except Exception:
362
+ pass
363
+ time.sleep(0.05)
364
+
365
+ def _impedance_cmd(self, ch_char: str, pchan: int, nchan: int):
366
+ cmd = f'z{ch_char}{pchan}{nchan}Z'.encode('ascii')
367
+ self._send(cmd)
368
+ time.sleep(self.CMD_DELAY)
369
+
370
+ def _enable_all_impedances(self):
371
+ for ch in range(1, 9):
372
+ self._impedance_cmd(str(ch), pchan=1, nchan=0)
373
+ for char in DAISY_CHARS:
374
+ self._impedance_cmd(char, pchan=1, nchan=0)
375
+ print("[Board] Impedance activee sur 16 canaux.")
376
+
377
+ def _disable_all_impedances(self):
378
+ for ch in range(1, 9):
379
+ self._impedance_cmd(str(ch), pchan=0, nchan=0)
380
+ for char in DAISY_CHARS:
381
+ self._impedance_cmd(char, pchan=0, nchan=0)
382
+
383
+ def _read_loop(self):
384
+ sync_buf = bytearray()
385
+ while self.running:
386
+ try:
387
+ if not self.ser.is_open:
388
+ break
389
+ n = self.ser.in_waiting
390
+ if n == 0:
391
+ time.sleep(0.001)
392
+ continue
393
+ chunk = self.ser.read(n)
394
+ sync_buf.extend(chunk)
395
+ while len(sync_buf) >= PACKET_SIZE:
396
+ idx = -1
397
+ for i in range(len(sync_buf) - PACKET_SIZE + 1):
398
+ if (sync_buf[i] == START_BYTE and
399
+ (sync_buf[i + PACKET_SIZE - 1] & 0xF0) == 0xC0):
400
+ idx = i
401
+ break
402
+ if idx == -1:
403
+ del sync_buf[:max(0, len(sync_buf) - PACKET_SIZE + 1)]
404
+ break
405
+ if idx > 0:
406
+ del sync_buf[:idx]
407
+ continue
408
+ packet = bytes(sync_buf[:PACKET_SIZE])
409
+ del sync_buf[:PACKET_SIZE]
410
+ result = parse_packet(packet)
411
+ if result is not None:
412
+ self._handle_sample(*result)
413
+ except serial.SerialException as e:
414
+ print(f"[Board] Erreur serie : {e}")
415
+ self.running = False
416
+ break
417
+ except Exception as e:
418
+ print(f"[Board] Erreur lecture : {e}")
419
+ time.sleep(0.005)
420
+
421
+ def _handle_sample(self, sample_num: int, ch_data_uv: list):
422
+ is_cyton = (sample_num % 2) == 1
423
+ with self._lock:
424
+ if is_cyton:
425
+ self._last_cyton_data = ch_data_uv[:]
426
+ else:
427
+ if self._last_cyton_data is not None:
428
+ for i in range(8):
429
+ self._buffers[i].append(self._last_cyton_data[i])
430
+ for i in range(8):
431
+ self._buffers[8 + i].append(ch_data_uv[i])
432
+ self._last_cyton_data = None
433
+
434
+ def get_impedances(self) -> list:
435
+ with self._lock:
436
+ snapshot = [list(b) for b in self._buffers]
437
+ return [compute_impedance(buf) for buf in snapshot]
438
+
439
+ def n_samples_collected(self) -> int:
440
+ with self._lock:
441
+ return max((len(b) for b in self._buffers), default=0)
442
+
443
+
444
+ # ╔══════════════════════════════════════════════════╗
445
+ # β•‘ AFFICHAGE MATPLOTLIB (HEAD PLOT) β•‘
446
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
447
+
448
+ def _impedance_color(val):
449
+ if val is None:
450
+ return IMP_NO_DATA
451
+ if val == 'open':
452
+ return IMP_RAILED
453
+ if val < IMP_THRESHOLDS[0]:
454
+ return IMP_COLORS[0]
455
+ elif val < IMP_THRESHOLDS[1]:
456
+ return IMP_COLORS[1]
457
+ elif val < IMP_THRESHOLDS[2]:
458
+ return IMP_COLORS[2]
459
+ else:
460
+ return IMP_COLORS[3]
461
+
462
+
463
+ def _impedance_text(val):
464
+ if val is None:
465
+ return '---'
466
+ if val == 'open':
467
+ return 'OPEN'
468
+ if val >= 1_000_000:
469
+ return f'{val/1_000_000:.1f} MΞ©'
470
+ elif val >= 1_000:
471
+ return f'{val/1_000:.1f} kΞ©'
472
+ else:
473
+ return f'{val:.0f} Ξ©'
474
+
475
+
476
+ class ImpedanceDisplay:
477
+ ELECTRODE_RADIUS = 0.042
478
+ HEAD_RADIUS = 0.50
479
+ UPDATE_MS = 500
480
+
481
+ def __init__(self, board: CytonDaisyBoard):
482
+ self.board = board
483
+ self._build_figure()
484
+
485
+ def _build_figure(self):
486
+ self.fig = plt.figure(figsize=(10, 10), facecolor='#12121f')
487
+ self.ax = self.fig.add_subplot(111)
488
+ self.ax.set_facecolor('#1a1a2e')
489
+ self.ax.set_xlim(-0.78, 0.78)
490
+ self.ax.set_ylim(-0.60, 0.82)
491
+ self.ax.set_aspect('equal')
492
+ self.ax.axis('off')
493
+
494
+ self.fig.suptitle(
495
+ 'OpenBCI Cyton + Daisy β€” Electrode Impedance (live)',
496
+ color='white', fontsize=13, fontweight='bold', y=0.97
497
+ )
498
+
499
+ self._draw_head()
500
+ self._draw_legend()
501
+ self._create_electrode_artists()
502
+
503
+ self.status_lbl = self.ax.text(
504
+ 0, -0.55,
505
+ 'Collecting data... (minimum 1 s required per channel)',
506
+ ha='center', va='center', fontsize=8.5, color='#aaaacc'
507
+ )
508
+
509
+ self.anim = FuncAnimation(
510
+ self.fig, self._update_frame,
511
+ interval=self.UPDATE_MS, blit=False, cache_frame_data=False
512
+ )
513
+
514
+ def _draw_head(self):
515
+ """
516
+ Tete avec nez vers le HAUT (anterieur) et region posterieure en bas.
517
+ Les electrodes vont apparaitre dans la moitie inferieure (posterieure).
518
+ """
519
+ cx, cy = 0, HEAD_CENTER_Y
520
+ r = self.HEAD_RADIUS
521
+
522
+ head_circle = Circle(
523
+ (cx, cy), r, fill=False,
524
+ linewidth=2.5, edgecolor='#9999bb', zorder=1
525
+ )
526
+ self.ax.add_patch(head_circle)
527
+
528
+ # Nez (anterieur = haut)
529
+ nose_x = [-0.04, 0.0, 0.04]
530
+ nose_y = [cy + r - 0.01, cy + r + 0.06, cy + r - 0.01]
531
+ self.ax.plot(nose_x, nose_y, color='#9999bb', linewidth=2.5, zorder=1)
532
+
533
+ # Oreilles
534
+ ear_l = Arc((-r + cx, cy), 0.10, 0.22, angle=0,
535
+ theta1=90, theta2=270,
536
+ linewidth=2.5, edgecolor='#9999bb', zorder=1)
537
+ ear_r = Arc(( r + cx, cy), 0.10, 0.22, angle=0,
538
+ theta1=270, theta2=90,
539
+ linewidth=2.5, edgecolor='#9999bb', zorder=1)
540
+ self.ax.add_patch(ear_l)
541
+ self.ax.add_patch(ear_r)
542
+
543
+ # Lignes de reference (axe sagittal + coronal)
544
+ self.ax.plot([cx, cx], [cy - r, cy + r], '--',
545
+ color='#2a2a4a', linewidth=1.0, zorder=0)
546
+ self.ax.plot([cx - r, cx + r], [cy, cy], '--',
547
+ color='#2a2a4a', linewidth=1.0, zorder=0)
548
+
549
+ # Etiquettes orientation
550
+ self.ax.text(0, cy + r + 0.09, 'FRONT', ha='center', va='bottom',
551
+ fontsize=7, color='#6666aa')
552
+ self.ax.text(0, cy - r - 0.04, 'BACK', ha='center', va='top',
553
+ fontsize=7, color='#6666aa')
554
+
555
+ def _draw_legend(self):
556
+ legend_y = -0.53
557
+ items = (list(zip(IMP_LABELS_LEG, IMP_COLORS))
558
+ + [('Open circuit / saturated', IMP_RAILED),
559
+ ('No data', IMP_NO_DATA)])
560
+ x_start = -0.72
561
+ x_step = 0.248
562
+ for i, (label, color) in enumerate(items):
563
+ x = x_start + i * x_step
564
+ self.ax.add_patch(Circle((x, legend_y), 0.018, color=color, zorder=4))
565
+ self.ax.text(x + 0.025, legend_y, label,
566
+ va='center', ha='left', fontsize=6.5, color='white')
567
+
568
+ def _create_electrode_artists(self):
569
+ self.circles = []
570
+ self.imp_labels = []
571
+ self.ch_labels = []
572
+
573
+ for i, (x, y) in enumerate(ELECTRODE_POSITIONS):
574
+ ch_num = i + 1
575
+ inactive = (x == 0.0 and y == 0.0)
576
+
577
+ if inactive:
578
+ self.circles.append(None)
579
+ self.imp_labels.append(None)
580
+ self.ch_labels.append(None)
581
+ continue
582
+
583
+ circ = Circle(
584
+ (x, y), self.ELECTRODE_RADIUS,
585
+ facecolor=IMP_NO_DATA, edgecolor='white',
586
+ linewidth=1.2, zorder=3
587
+ )
588
+ self.ax.add_patch(circ)
589
+ self.circles.append(circ)
590
+
591
+ ch_lbl = self.ax.text(
592
+ x, y, f'CH{ch_num}',
593
+ ha='center', va='center',
594
+ fontsize=5.8, color='black', fontweight='bold', zorder=5
595
+ )
596
+ self.ch_labels.append(ch_lbl)
597
+
598
+ imp_lbl = self.ax.text(
599
+ x, y - self.ELECTRODE_RADIUS - 0.025,
600
+ '---',
601
+ ha='center', va='top',
602
+ fontsize=7.5, color='white', fontweight='bold', zorder=5
603
+ )
604
+ self.imp_labels.append(imp_lbl)
605
+
606
+ def _update_frame(self, _frame):
607
+ impedances = self.board.get_impedances()
608
+ n_samples = self.board.n_samples_collected()
609
+ n_active = sum(1 for v in impedances
610
+ if v is not None and v != 'open')
611
+
612
+ for i, imp in enumerate(impedances):
613
+ circ = self.circles[i]
614
+ imp_lbl = self.imp_labels[i]
615
+ if circ is None:
616
+ continue
617
+ circ.set_facecolor(_impedance_color(imp))
618
+ if imp_lbl is not None:
619
+ imp_lbl.set_text(_impedance_text(imp))
620
+ # Couleur du texte : blanc sauf sur OPEN (violet fonce β†’ texte clair)
621
+ imp_lbl.set_color('#ddaaff' if imp == 'open' else 'white')
622
+
623
+ need = SAMPLE_RATE
624
+ pct = min(100, int(n_samples / need * 100))
625
+ n_open = sum(1 for v in impedances if v == 'open')
626
+
627
+ if n_samples < need:
628
+ self.status_lbl.set_text(
629
+ f'Collecting: {n_samples}/{need} samples ({pct}%)'
630
+ )
631
+ else:
632
+ self.status_lbl.set_text(
633
+ f'Live β€” {n_active}/14 channels measured | {n_open} open circuit(s)'
634
+ )
635
+ return []
636
+
637
+ def show(self):
638
+ plt.tight_layout(rect=[0, 0, 1, 0.96])
639
+ plt.show()
640
+
641
+
642
+ # ╔══════════════════════════════════════════════════╗
643
+ # β•‘ POINT D'ENTREE β•‘
644
+ # β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
645
+
646
+ def main():
647
+ parser = argparse.ArgumentParser(
648
+ description="OpenBCI Cyton+Daisy β€” Moniteur d'impedance live"
649
+ )
650
+ parser.add_argument('--port', type=str, default=None,
651
+ help='Port serie (ex: COM3, /dev/ttyUSB0)')
652
+ parser.add_argument('--list-ports', action='store_true',
653
+ help='Lister les ports disponibles et quitter')
654
+ args = parser.parse_args()
655
+
656
+ if args.list_ports:
657
+ print("Ports serie disponibles :")
658
+ for p in serial.tools.list_ports.comports():
659
+ print(f" {p.device:20s} {p.description} [{p.hwid}]")
660
+ sys.exit(0)
661
+
662
+ port = args.port
663
+ if port is None:
664
+ port = find_openbci_port()
665
+ if port is None:
666
+ print("ERREUR : dongle introuvable. Utilisez --port.")
667
+ sys.exit(1)
668
+ print(f"Dongle detecte automatiquement : {port}")
669
+ else:
670
+ print(f"Port specifie : {port}")
671
+
672
+ board = CytonDaisyBoard(port)
673
+ try:
674
+ board.connect()
675
+ except TimeoutError as e:
676
+ print(f"\nERREUR de connexion : {e}")
677
+ sys.exit(1)
678
+ except serial.SerialException as e:
679
+ print(f"\nERREUR serie : {e}")
680
+ sys.exit(1)
681
+
682
+ display = ImpedanceDisplay(board)
683
+ print("\nFenetre ouverte. Fermez-la ou Ctrl+C pour quitter.")
684
+ try:
685
+ display.show()
686
+ except KeyboardInterrupt:
687
+ print("\nInterruption.")
688
+ finally:
689
+ board.disconnect()
690
+ print("Programme termine.")
691
+
692
+
693
+ if __name__ == '__main__':
694
+ main()