| from __future__ import annotations |
|
|
| from typing import List, Tuple, Optional, Dict, Any |
| import io |
| import os |
| import tempfile |
|
|
| import numpy as np |
|
|
| import mne |
| from scipy.io import loadmat |
|
|
| try: |
| import h5py |
| except Exception: |
| h5py = None |
|
|
|
|
| |
| |
| |
| def pick_set_fdt(files) -> Tuple[Optional[object], Optional[object]]: |
| """ |
| Streamlitの accept_multiple_files=True で受け取ったfilesから .set と .fdt を拾う。 |
| Returns: (set_file, fdt_file) |
| """ |
| set_file = None |
| fdt_file = None |
| for f in files: |
| name = (getattr(f, "name", "") or "").lower() |
| if name.endswith(".set"): |
| set_file = f |
| elif name.endswith(".fdt"): |
| fdt_file = f |
| return set_file, fdt_file |
|
|
|
|
| def same_stem(a_name: str, b_name: str) -> bool: |
| """Check if two filenames have the same stem (basename without extension).""" |
| a_stem = os.path.splitext(os.path.basename(a_name))[0] |
| b_stem = os.path.splitext(os.path.basename(b_name))[0] |
| return a_stem == b_stem |
|
|
|
|
| def extract_electrode_positions_from_hdf5(set_path: str) -> tuple: |
| """ |
| HDF5形式のEEGLABファイルから電極位置を抽出。 |
| |
| Returns: |
| tuple: (pos_2d, pos_3d) |
| pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone |
| pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone |
| """ |
| if h5py is None: |
| return None, None |
| |
| try: |
| with h5py.File(set_path, "r") as f: |
| |
| chanlocs_path = None |
| for path in ["EEG/chanlocs", "chanlocs"]: |
| if path in f: |
| chanlocs_path = path |
| break |
| |
| if chanlocs_path is None: |
| return None, None |
| |
| chanlocs = f[chanlocs_path] |
| |
| |
| xs, ys, zs = [], [], [] |
| |
| |
| if "X" in chanlocs and "Y" in chanlocs and "Z" in chanlocs: |
| x_data = chanlocs["X"][()] |
| y_data = chanlocs["Y"][()] |
| z_data = chanlocs["Z"][()] |
| |
| |
| if x_data.dtype == h5py.ref_dtype: |
| for i in range(len(x_data)): |
| try: |
| x_val = f[x_data[i, 0]][()] |
| y_val = f[y_data[i, 0]][()] |
| z_val = f[z_data[i, 0]][()] |
| |
| |
| x_val = float(x_val.flat[0]) if hasattr(x_val, 'flat') else float(x_val) |
| y_val = float(y_val.flat[0]) if hasattr(y_val, 'flat') else float(y_val) |
| z_val = float(z_val.flat[0]) if hasattr(z_val, 'flat') else float(z_val) |
| |
| xs.append(x_val) |
| ys.append(y_val) |
| zs.append(z_val) |
| except: |
| |
| xs.append(0.0) |
| ys.append(0.0) |
| zs.append(0.0) |
| else: |
| |
| xs = x_data.flatten().astype(float) |
| ys = y_data.flatten().astype(float) |
| zs = z_data.flatten().astype(float) |
| else: |
| return None, None |
| |
| |
| xs = np.array(xs, dtype=float) |
| ys = np.array(ys, dtype=float) |
| zs = np.array(zs, dtype=float) |
| |
| if len(xs) == 0: |
| return None, None |
| |
| |
| valid_mask = ~(np.isnan(xs) | np.isnan(ys) | np.isnan(zs)) |
| if not np.any(valid_mask): |
| return None, None |
| |
| |
| if not np.all(valid_mask): |
| xs[~valid_mask] = np.nanmean(xs) |
| ys[~valid_mask] = np.nanmean(ys) |
| zs[~valid_mask] = np.nanmean(zs) |
| |
| |
| positions_3d = np.column_stack([xs, ys, zs]) |
| |
| |
| dists = np.sqrt(np.sum(positions_3d**2, axis=1)) |
| max_dist_3d = np.max(dists[dists > 0]) if np.any(dists > 0) else 1.0 |
| if max_dist_3d > 0: |
| positions_3d = positions_3d / max_dist_3d |
| |
| |
| pos_2d = positions_3d[:, :2] |
| dists_2d = np.sqrt(np.sum(pos_2d**2, axis=1)) |
| max_dist_2d = np.max(dists_2d[dists_2d > 0]) if np.any(dists_2d > 0) else 1.0 |
| if max_dist_2d > 0: |
| pos_2d = pos_2d / max_dist_2d * 0.85 |
| |
| print(f"HDF5から電極位置を取得: {len(xs)} channels") |
| return pos_2d.astype(np.float32), positions_3d.astype(np.float32) |
| |
| except Exception as e: |
| print(f"HDF5から電極位置の抽出に失敗: {e}") |
| import traceback |
| traceback.print_exc() |
| return None, None |
|
|
|
|
| def extract_electrode_positions_2d(set_path: str): |
| """ |
| EEGLABファイルから電極位置(2D, 3D)を抽出。 |
| |
| Returns: |
| tuple: (pos_2d, pos_3d) |
| pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone |
| pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone |
| """ |
| try: |
| |
| raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False) |
| montage = raw.get_montage() |
| |
| if montage is None: |
| return None, None |
| |
| |
| pos_3d_dict = montage.get_positions()['ch_pos'] |
| |
| if not pos_3d_dict: |
| return None, None |
| |
| |
| ch_names = raw.ch_names |
| positions_3d = [] |
| for ch_name in ch_names: |
| if ch_name in pos_3d_dict: |
| positions_3d.append(pos_3d_dict[ch_name]) |
| else: |
| |
| positions_3d.append([0, 0, 0]) |
| |
| positions_3d = np.array(positions_3d) |
| |
| |
| max_dist_3d = np.max(np.sqrt(np.sum(positions_3d**2, axis=1))) |
| if max_dist_3d > 0: |
| positions_3d = positions_3d / max_dist_3d |
| |
| |
| pos_2d = positions_3d[:, :2] |
| |
| |
| max_dist_2d = np.max(np.sqrt(np.sum(pos_2d**2, axis=1))) |
| if max_dist_2d > 0: |
| pos_2d = pos_2d / max_dist_2d * 0.85 |
| |
| return pos_2d.astype(np.float32), positions_3d.astype(np.float32) |
| |
| except Exception as e: |
| print(f"電極位置の抽出に失敗: {e}") |
| return None, None |
|
|
|
|
| def _load_eeglab_hdf5(set_path: str, fdt_path: Optional[str] = None, debug: bool = False): |
| """ |
| Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py. |
| Returns: (x_tc, fs) where x_tc is (T, C) |
| """ |
| if h5py is None: |
| raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。") |
| |
| with h5py.File(set_path, "r") as f: |
| |
| if debug: |
| print("=== HDF5 file structure ===") |
| def print_structure(name, obj): |
| if isinstance(obj, h5py.Dataset): |
| print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}") |
| elif isinstance(obj, h5py.Group): |
| print(f"Group: {name}") |
| f.visititems(print_structure) |
| print("===========================") |
| |
| |
| fs = None |
| for path in ["EEG/srate", "srate"]: |
| if path in f: |
| srate_data = f[path] |
| if isinstance(srate_data, h5py.Dataset): |
| val = srate_data[()] |
| |
| fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val) |
| break |
| |
| if fs is None: |
| raise ValueError("サンプリングレート (srate) が見つかりません") |
| |
| |
| nbchan = None |
| for path in ["EEG/nbchan", "nbchan"]: |
| if path in f: |
| nbchan_data = f[path] |
| if isinstance(nbchan_data, h5py.Dataset): |
| val = nbchan_data[()] |
| nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val) |
| break |
| |
| |
| pnts = None |
| for path in ["EEG/pnts", "pnts"]: |
| if path in f: |
| pnts_data = f[path] |
| if isinstance(pnts_data, h5py.Dataset): |
| val = pnts_data[()] |
| pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val) |
| break |
| |
| if debug: |
| print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}") |
| |
| |
| data = None |
| data_shape = None |
| |
| if debug: |
| print(f"Checking for data, fdt_path provided: {fdt_path is not None}") |
| if fdt_path: |
| print(f"fdt_path exists: {os.path.exists(fdt_path)}") |
| |
| |
| if "EEG" in f and "data" in f["EEG"]: |
| data_ref = f["EEG"]["data"] |
| if isinstance(data_ref, h5py.Dataset): |
| if debug: |
| print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}") |
| |
| if data_ref.dtype == h5py.ref_dtype: |
| |
| if debug: |
| print("EEG/data is reference type - data should be in .fdt file") |
| |
| if fdt_path is not None and os.path.exists(fdt_path): |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) |
| else: |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") |
| elif data_ref.size > 100: |
| data = data_ref[()] |
| data_shape = data.shape |
| if debug: |
| print(f"EEG/data contains actual data, shape: {data_shape}") |
| else: |
| |
| if debug: |
| print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt") |
| if fdt_path is not None and os.path.exists(fdt_path): |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) |
| else: |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") |
| |
| |
| if data is None and "data" in f: |
| data_obj = f["data"] |
| if isinstance(data_obj, h5py.Dataset): |
| data = data_obj[()] |
| data_shape = data.shape |
| |
| if data is None: |
| raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。") |
| |
| if debug: |
| print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}") |
| |
| |
| if data.ndim != 2: |
| raise ValueError(f"予期しないデータ次元: {data.ndim}") |
| |
| dim0, dim1 = data.shape |
| |
| |
| if nbchan is not None: |
| if dim0 == nbchan: |
| |
| x_tc = data.T.astype(np.float32) |
| elif dim1 == nbchan: |
| |
| x_tc = data.astype(np.float32) |
| else: |
| |
| if dim0 < dim1: |
| x_tc = data.T.astype(np.float32) |
| else: |
| x_tc = data.astype(np.float32) |
| else: |
| |
| if dim0 < dim1: |
| x_tc = data.T.astype(np.float32) |
| else: |
| x_tc = data.astype(np.float32) |
| |
| if debug: |
| print(f"Final shape (T, C): {x_tc.shape}") |
| |
| return x_tc, fs |
| """ |
| EEGLABファイルから電極位置(2D)を抽出。 |
| |
| Returns: |
| pos: (C, 2) 電極の2D座標、取得できない場合はNone |
| """ |
| try: |
| |
| raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False) |
| montage = raw.get_montage() |
| |
| if montage is None: |
| return None |
| |
| |
| pos_3d = montage.get_positions()['ch_pos'] |
| |
| if not pos_3d: |
| return None |
| |
| |
| ch_names = raw.ch_names |
| positions = [] |
| for ch_name in ch_names: |
| if ch_name in pos_3d: |
| positions.append(pos_3d[ch_name]) |
| else: |
| |
| positions.append([0, 0, 0]) |
| |
| positions = np.array(positions) |
| |
| |
| |
| pos_2d = positions[:, :2] |
| |
| |
| max_dist = np.max(np.sqrt(np.sum(pos_2d**2, axis=1))) |
| if max_dist > 0: |
| pos_2d = pos_2d / max_dist * 0.85 |
| |
| return pos_2d.astype(np.float32) |
| |
| except Exception as e: |
| print(f"電極位置の抽出に失敗: {e}") |
| return None |
| """ |
| Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py. |
| Returns: (x_tc, fs) where x_tc is (T, C) |
| """ |
| if h5py is None: |
| raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。") |
| |
| with h5py.File(set_path, "r") as f: |
| |
| if debug: |
| print("=== HDF5 file structure ===") |
| def print_structure(name, obj): |
| if isinstance(obj, h5py.Dataset): |
| print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}") |
| elif isinstance(obj, h5py.Group): |
| print(f"Group: {name}") |
| f.visititems(print_structure) |
| print("===========================") |
| |
| |
| fs = None |
| for path in ["EEG/srate", "srate"]: |
| if path in f: |
| srate_data = f[path] |
| if isinstance(srate_data, h5py.Dataset): |
| val = srate_data[()] |
| |
| fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val) |
| break |
| |
| if fs is None: |
| raise ValueError("サンプリングレート (srate) が見つかりません") |
| |
| |
| nbchan = None |
| for path in ["EEG/nbchan", "nbchan"]: |
| if path in f: |
| nbchan_data = f[path] |
| if isinstance(nbchan_data, h5py.Dataset): |
| val = nbchan_data[()] |
| nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val) |
| break |
| |
| |
| pnts = None |
| for path in ["EEG/pnts", "pnts"]: |
| if path in f: |
| pnts_data = f[path] |
| if isinstance(pnts_data, h5py.Dataset): |
| val = pnts_data[()] |
| pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val) |
| break |
| |
| if debug: |
| print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}") |
| |
| |
| data = None |
| data_shape = None |
| |
| if debug: |
| print(f"Checking for data, fdt_path provided: {fdt_path is not None}") |
| if fdt_path: |
| print(f"fdt_path exists: {os.path.exists(fdt_path)}") |
| |
| |
| if "EEG" in f and "data" in f["EEG"]: |
| data_ref = f["EEG"]["data"] |
| if isinstance(data_ref, h5py.Dataset): |
| if debug: |
| print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}") |
| |
| if data_ref.dtype == h5py.ref_dtype: |
| |
| if debug: |
| print("EEG/data is reference type - data should be in .fdt file") |
| |
| if fdt_path is not None and os.path.exists(fdt_path): |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) |
| else: |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") |
| elif data_ref.size > 100: |
| data = data_ref[()] |
| data_shape = data.shape |
| if debug: |
| print(f"EEG/data contains actual data, shape: {data_shape}") |
| else: |
| |
| if debug: |
| print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt") |
| if fdt_path is not None and os.path.exists(fdt_path): |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) |
| else: |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") |
| |
| |
| if data is None and "data" in f: |
| data_obj = f["data"] |
| if isinstance(data_obj, h5py.Dataset): |
| data = data_obj[()] |
| data_shape = data.shape |
| |
| if data is None: |
| raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。") |
| |
| if debug: |
| print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}") |
| |
| |
| if data.ndim != 2: |
| raise ValueError(f"予期しないデータ次元: {data.ndim}") |
| |
| dim0, dim1 = data.shape |
| |
| |
| if nbchan is not None: |
| if dim0 == nbchan: |
| |
| x_tc = data.T.astype(np.float32) |
| elif dim1 == nbchan: |
| |
| x_tc = data.astype(np.float32) |
| else: |
| |
| if dim0 < dim1: |
| x_tc = data.T.astype(np.float32) |
| else: |
| x_tc = data.astype(np.float32) |
| else: |
| |
| if dim0 < dim1: |
| x_tc = data.T.astype(np.float32) |
| else: |
| x_tc = data.astype(np.float32) |
| |
| if debug: |
| print(f"Final shape (T, C): {x_tc.shape}") |
| |
| return x_tc, fs |
|
|
|
|
| def _load_fdt_file(fdt_path: str, nbchan: Optional[int], pnts: Optional[int], debug: bool = False) -> np.ndarray: |
| """ |
| Load .fdt file (raw binary float32 data). |
| EEGLAB .fdt files are stored as float32 in (C, T) order. |
| """ |
| if debug: |
| print(f"Loading .fdt file: {fdt_path}") |
| |
| |
| data = np.fromfile(fdt_path, dtype=np.float32) |
| |
| if debug: |
| print(f"Loaded {data.size} float32 values from .fdt") |
| |
| |
| if nbchan is not None and pnts is not None: |
| expected_size = nbchan * pnts |
| if data.size == expected_size: |
| |
| data = data.reshape(nbchan, pnts) |
| if debug: |
| print(f"Reshaped to ({nbchan}, {pnts})") |
| else: |
| if debug: |
| print(f"Warning: expected {expected_size} values but got {data.size}") |
| |
| if data.size % nbchan == 0: |
| data = data.reshape(nbchan, -1) |
| elif data.size % pnts == 0: |
| data = data.reshape(-1, pnts) |
| else: |
| raise ValueError(f"Cannot reshape data of size {data.size} with nbchan={nbchan}, pnts={pnts}") |
| else: |
| raise ValueError("nbchan と pnts の情報が必要です") |
| |
| return data |
|
|
|
|
| def load_eeglab_tc_from_bytes( |
| set_bytes: bytes, |
| set_name: str, |
| fdt_bytes: Optional[bytes] = None, |
| fdt_name: Optional[str] = None, |
| ): |
| """ |
| Load EEGLAB .set (and optional .fdt) from bytes using MNE or h5py. |
| Returns: |
| tuple: (x_tc, fs, electrode_pos_2d, electrode_pos_3d) |
| x_tc: (T, C) float32 |
| fs: sampling rate (Hz) |
| electrode_pos_2d: (C, 2) float32 or None - 電極の2D座標 |
| electrode_pos_3d: (C, 3) float32 or None - 電極の3D座標 |
| |
| Notes: |
| - 多くのEEGLABは .set が .fdt を参照するため、同じディレクトリに同名で置く必要があります。 |
| - .set単体で完結している場合は fdt_* を省略可能にしています。 |
| - MATLAB v7.3 (HDF5) 形式の .set にも対応しています。 |
| """ |
| if fdt_bytes is not None or fdt_name is not None: |
| if fdt_bytes is None or fdt_name is None: |
| raise ValueError("fdt_bytes と fdt_name は両方指定してください。") |
| if not same_stem(set_name, fdt_name): |
| raise ValueError(f".set と .fdt のファイル名(拡張子除く)が一致していません: {set_name} vs {fdt_name}") |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| set_path = os.path.join(tmpdir, os.path.basename(set_name)) |
| with open(set_path, "wb") as f: |
| f.write(set_bytes) |
|
|
| fdt_path = None |
| if fdt_bytes is not None and fdt_name is not None: |
| fdt_path = os.path.join(tmpdir, os.path.basename(fdt_name)) |
| with open(fdt_path, "wb") as f: |
| f.write(fdt_bytes) |
|
|
| |
| try: |
| raw = mne.io.read_raw_eeglab(set_path, preload=True, verbose=False) |
| fs = float(raw.info["sfreq"]) |
| x_tc = raw.get_data().T |
| |
| |
| result = extract_electrode_positions_2d(set_path) |
| if result is not None: |
| electrode_pos_2d, electrode_pos_3d = result |
| else: |
| electrode_pos_2d, electrode_pos_3d = None, None |
| |
| return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d |
|
|
| except Exception as e_raw: |
| |
| try: |
| epochs = mne.io.read_epochs_eeglab(set_path, verbose=False, montage_units="cm") |
| fs = float(epochs.info["sfreq"]) |
| x = epochs.get_data(copy=True) |
|
|
| |
| x_mean = x.mean(axis=0) |
| x_tc = x_mean.T |
| |
| |
| result = extract_electrode_positions_2d(set_path) |
| if result is not None: |
| electrode_pos_2d, electrode_pos_3d = result |
| else: |
| electrode_pos_2d, electrode_pos_3d = None, None |
| |
| return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d |
|
|
| except Exception as e_ep: |
| |
| try: |
| |
| debug = os.environ.get("EEGLAB_DEBUG", "0") == "1" |
| |
| import sys |
| if 'streamlit' in sys.modules: |
| debug = True |
| |
| try: |
| x_tc, fs = _load_eeglab_hdf5(set_path, fdt_path=fdt_path, debug=debug) |
| except Exception as e_hdf5_inner: |
| import traceback |
| print("HDF5読み込みの詳細エラー:") |
| print(traceback.format_exc()) |
| raise e_hdf5_inner |
| |
| |
| electrode_pos_2d, electrode_pos_3d = extract_electrode_positions_from_hdf5(set_path) |
| |
| if debug and electrode_pos_2d is not None: |
| print(f"HDF5から電極位置を取得しました: {electrode_pos_2d.shape}") |
| |
| return x_tc, fs, electrode_pos_2d, electrode_pos_3d |
| |
| except Exception as e_hdf5: |
| import traceback |
| |
| msg = ( |
| "EEGLABの読み込みに失敗しました。\n" |
| f"- read_raw_eeglab error: {e_raw}\n" |
| f"- read_epochs_eeglab error: {e_ep}\n" |
| f"- HDF5読み込み error: {e_hdf5}\n" |
| f"\n詳細トレースバック:\n{traceback.format_exc()}" |
| ) |
| raise RuntimeError(msg) from e_hdf5 |
|
|
|
|
|
|
| |
| |
| |
| def _mat_keys_loadmat(mat_dict: Dict[str, Any]) -> List[str]: |
| return sorted([k for k in mat_dict.keys() if not k.startswith("__")]) |
|
|
|
|
| def _try_get_numeric_arrays_loadmat(mat_dict: Dict[str, Any]) -> Dict[str, np.ndarray]: |
| """ |
| loadmatで読んだdictから、1D/2Dの数値ndarrayだけ抽出して返す。 |
| 3次元配列も含める(エポックデータの可能性)。 |
| """ |
| out: Dict[str, np.ndarray] = {} |
| for k in _mat_keys_loadmat(mat_dict): |
| v = mat_dict[k] |
| if isinstance(v, np.ndarray) and v.size > 0: |
| |
| if np.issubdtype(v.dtype, np.number): |
| if v.ndim in (1, 2): |
| out[k] = v |
| elif v.ndim == 3: |
| |
| |
| out[k + "_mean"] = v.mean(axis=0) |
| out[k + "_concat"] = v.reshape(-1, v.shape[-1]) |
| return out |
|
|
|
|
| def _load_mat_v72(bytes_data: bytes) -> Dict[str, Any]: |
| |
| return loadmat(io.BytesIO(bytes_data), squeeze_me=False, struct_as_record=False) |
|
|
|
|
| def _load_mat_v73_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]: |
| """ |
| v7.3(HDF5)のMATから、数値1D/2D/3D dataset を拾って返す。 |
| keyは HDF5内のパスになります(例: 'group/data')。 |
| |
| 修正: h5pyの新しいバージョンに対応。BytesIOではなく一時ファイルを使用。 |
| """ |
| if h5py is None: |
| raise RuntimeError("MAT v7.3(HDF5) 形式の可能性がありますが、h5py が入っていません。pip install h5py を実行してください。") |
|
|
| out: Dict[str, np.ndarray] = {} |
| |
| |
| with tempfile.NamedTemporaryFile(suffix='.mat', delete=False) as tmp: |
| tmp.write(bytes_data) |
| tmp_path = tmp.name |
| |
| try: |
| with h5py.File(tmp_path, "r") as f: |
|
|
| def visitor(name, obj): |
| if not isinstance(obj, h5py.Dataset): |
| return |
| try: |
| arr = obj[()] |
| except Exception: |
| return |
|
|
| |
| if isinstance(arr, np.ndarray) and arr.size > 0 and np.issubdtype(arr.dtype, np.number): |
| if arr.ndim in (1, 2): |
| out[name] = arr |
| elif arr.ndim == 3: |
| |
| out[name + "_mean"] = arr.mean(axis=0) |
| out[name + "_concat"] = arr.reshape(-1, arr.shape[-1]) |
|
|
| f.visititems(lambda name, obj: visitor(name, obj)) |
| finally: |
| |
| try: |
| os.unlink(tmp_path) |
| except Exception: |
| pass |
|
|
| return out |
|
|
|
|
| def load_mat_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]: |
| """ |
| Return dict: variable_name -> ndarray(1D/2D numeric) |
| Tries v7.2 (scipy.io.loadmat). If it fails, tries v7.3 (h5py). |
| """ |
| try: |
| md = _load_mat_v72(bytes_data) |
| cands = _try_get_numeric_arrays_loadmat(md) |
| return cands |
| except Exception: |
| return _load_mat_v73_candidates(bytes_data) |