File size: 6,339 Bytes
900890b
 
eec1f12
900890b
 
eec1f12
900890b
eec1f12
900890b
 
eec1f12
a30a1c5
 
 
 
eec1f12
900890b
 
 
954fec8
adfea1b
954fec8
900890b
 
eec1f12
 
 
 
 
 
 
 
 
900890b
954fec8
 
900890b
 
 
eec1f12
900890b
adfea1b
 
 
 
 
 
 
 
 
 
 
 
 
900890b
 
eec1f12
900890b
954fec8
900890b
eec1f12
900890b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4497fc2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eec1f12
954fec8
eec1f12
954fec8
eec1f12
 
4497fc2
954fec8
eec1f12
 
 
 
 
 
 
 
900890b
 
a30a1c5
900890b
 
954fec8
eec1f12
 
 
900890b
a30a1c5
900890b
 
a30a1c5
900890b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eec1f12
900890b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eec1f12
900890b
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import os
import shutil
import zipfile
from pathlib import Path
import gradio as gr
import mne

# Import your pipeline
from ica_xtra import run_preprocessing_pipeline

# Fixed montage file (must be in repo root)
BUILT_IN_GPSC = Path("ghw280_from_egig.gpsc")
if not BUILT_IN_GPSC.is_file():
    raise FileNotFoundError(f"Required montage file not found: {BUILT_IN_GPSC}")

# Output directory
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

# Fixed subject ID — no user input
SUBJECT_ID = "sub"

def preprocess_eeg(
    eeg_file,
    input_format: str = "fif",
    apply_highpass: bool = True,
    apply_lowpass: bool = True,
    apply_notch: bool = True,
    line_freq: float = 60.0,
    pre_ica_mad: float = 3.5,
    post_ica_mad: float = 5.0,
    interpolate_before_ica: bool = False,
    use_artifact_detection: bool = True,
):
    # Clean output dir for this fixed subject
    subject_dir = OUTPUT_DIR / SUBJECT_ID
    if subject_dir.exists():
        shutil.rmtree(subject_dir)
    subject_dir.mkdir(parents=True)
    (subject_dir / "plots").mkdir(exist_ok=True)

    uploaded_file = Path(eeg_file.name)

    if input_format == "mff":
        if not zipfile.is_zipfile(uploaded_file):
            raise ValueError("MFF input must be a ZIP file containing the .mff folder.")
        eeg_path = subject_dir / f"{SUBJECT_ID}.mff"
        with zipfile.ZipFile(uploaded_file, 'r') as zf:
            zf.extractall(eeg_path)
        if not (eeg_path / "info.xml").exists():
            raise ValueError("Invalid MFF: missing 'info.xml' in the ZIP.")
    else:
        eeg_path = subject_dir / uploaded_file.name
        shutil.copy(uploaded_file, eeg_path)

    try:
        # Run pipeline
        run_preprocessing_pipeline(
            subject=SUBJECT_ID,
            input_path=str(eeg_path),
            gpsc_file=str(BUILT_IN_GPSC),
            base_output_path=str(OUTPUT_DIR),
            input_format=input_format,
            apply_highpass=apply_highpass,
            apply_lowpass=apply_lowpass,
            apply_notch=apply_notch,
            line_freq=line_freq,
            pre_ica_mad_threshold=pre_ica_mad,
            post_ica_mad_threshold=post_ica_mad,
            interpolate_before_ica=interpolate_before_ica,
            use_artifact_detection_channels=use_artifact_detection,
            append_subject_to_output=True,
            plot=True,
            log_to_file=True,
            random_state=99,
        )

        # 🔧 FIX: Handle multiple cleaned FIF files
        cleaned_fifs = list(subject_dir.glob("*_eeg_ica_cleaned_raw.fif"))
        if not cleaned_fifs:
            raise FileNotFoundError("No cleaned FIF file found after preprocessing.")

        # Prefer the one with 'post_ica' in the name (final version)
        final_fif = None
        for f in cleaned_fifs:
            if "post_ica" in f.name:
                final_fif = f
                break
        if final_fif is None:
            # If no post_ica file, use the most recently modified (fallback)
            final_fif = max(cleaned_fifs, key=os.path.getmtime)

        # Delete all cleaned FIFs except the final one
        for f in cleaned_fifs:
            if f != final_fif:
                f.unlink()

        # Rename to standard clean name
        expected_name = subject_dir / f"{SUBJECT_ID}_eeg_ica_cleaned_raw.fif"
        if final_fif != expected_name:
            final_fif.rename(expected_name)
            cleaned_fif = expected_name
        else:
            cleaned_fif = final_fif

        # Load and save info summary
        raw = mne.io.read_raw_fif(str(cleaned_fif), preload=False)
        info_summary = subject_dir / f"{SUBJECT_ID}_info_summary.txt"
        with open(info_summary, "w") as f:
            f.write(f"=== EEG Info Summary for {SUBJECT_ID} ===\n")
            f.write(str(raw.info))

        # Create ZIP
        zip_path = OUTPUT_DIR / f"{SUBJECT_ID}_preprocessing_output.zip"
        with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
            for root, _, files in os.walk(subject_dir):
                for file in files:
                    file_path = Path(root) / file
                    arcname = file_path.relative_to(OUTPUT_DIR)
                    zf.write(file_path, arcname)

        return str(zip_path)

    except Exception as e:
        error_log = subject_dir / "error.log"
        with open(error_log, "w") as f:
            f.write(f"Preprocessing failed:\n{str(e)}")
        zip_path = OUTPUT_DIR / f"{SUBJECT_ID}_error.zip"
        with zipfile.ZipFile(zip_path, "w") as zf:
            zf.write(error_log, error_log.name)
        return str(zip_path)

# Gradio UI
with gr.Blocks(theme=gr.themes.Base(), title="EEG Preprocessing") as demo:
    gr.Markdown("# EEG Preprocessing Pipeline")
    gr.Markdown("Upload an EEG file (.fif or .mff). Uses built-in `ghw280_from_egig.gpsc` montage.")

    with gr.Row():
        with gr.Column():
            eeg_input = gr.File(label="EEG File (.fif or .mff)")
            format_dropdown = gr.Dropdown(choices=["fif", "mff"], value="fif", label="Input Format")

            with gr.Accordion("Advanced Settings", open=False):
                hp = gr.Checkbox(True, label="Apply Highpass (1.0 Hz)")
                lp = gr.Checkbox(True, label="Apply Lowpass (100.0 Hz)")
                notch = gr.Checkbox(True, label="Apply Notch Filter")
                line_freq = gr.Number(60.0, label="Line Frequency (Hz)")
                pre_mad = gr.Slider(2.0, 8.0, value=3.5, label="Pre-ICA MAD Threshold")
                post_mad = gr.Slider(2.0, 8.0, value=5.0, label="Post-ICA MAD Threshold")
                interp_before = gr.Checkbox(False, label="Interpolate Bad Channels Before ICA")
                use_artifact = gr.Checkbox(True, label="Use Artifact Detection Channels")

            run_btn = gr.Button("Run Preprocessing", variant="primary")

        with gr.Column():
            output_file = gr.File(label="Download Preprocessing Results (.zip)")

    run_btn.click(
        fn=preprocess_eeg,
        inputs=[
            eeg_input,
            format_dropdown,
            hp,
            lp,
            notch,
            line_freq,
            pre_mad,
            post_mad,
            interp_before,
            use_artifact,
        ],
        outputs=output_file,
    )

demo.launch()