Update app.py
Browse files
app.py
CHANGED
|
@@ -18,7 +18,7 @@ OUTPUT_DIR = Path("outputs")
|
|
| 18 |
OUTPUT_DIR.mkdir(exist_ok=True)
|
| 19 |
|
| 20 |
# Fixed subject ID — no user input
|
| 21 |
-
SUBJECT_ID = "sub
|
| 22 |
|
| 23 |
def preprocess_eeg(
|
| 24 |
eeg_file,
|
|
@@ -39,9 +39,23 @@ def preprocess_eeg(
|
|
| 39 |
subject_dir.mkdir(parents=True)
|
| 40 |
(subject_dir / "plots").mkdir(exist_ok=True)
|
| 41 |
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 45 |
|
| 46 |
try:
|
| 47 |
# Run pipeline
|
|
@@ -73,7 +87,7 @@ def preprocess_eeg(
|
|
| 73 |
f.write(f"=== EEG Info Summary for {SUBJECT_ID} ===\n")
|
| 74 |
f.write(str(raw.info))
|
| 75 |
|
| 76 |
-
# Create ZIP
|
| 77 |
zip_path = OUTPUT_DIR / f"{SUBJECT_ID}_preprocessing_output.zip"
|
| 78 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
| 79 |
for root, _, files in os.walk(subject_dir):
|
|
|
|
| 18 |
OUTPUT_DIR.mkdir(exist_ok=True)
|
| 19 |
|
| 20 |
# Fixed subject ID — no user input
|
| 21 |
+
SUBJECT_ID = "sub"
|
| 22 |
|
| 23 |
def preprocess_eeg(
|
| 24 |
eeg_file,
|
|
|
|
| 39 |
subject_dir.mkdir(parents=True)
|
| 40 |
(subject_dir / "plots").mkdir(exist_ok=True)
|
| 41 |
|
| 42 |
+
uploaded_file = Path(eeg_file.name)
|
| 43 |
+
|
| 44 |
+
if input_format == "mff":
|
| 45 |
+
# Validate: must be a ZIP file
|
| 46 |
+
if not zipfile.is_zipfile(uploaded_file):
|
| 47 |
+
raise ValueError("MFF input must be a ZIP file containing the .mff folder.")
|
| 48 |
+
# Extract directly into subject_dir as "sub.mff"
|
| 49 |
+
eeg_path = subject_dir / f"{SUBJECT_ID}.mff"
|
| 50 |
+
with zipfile.ZipFile(uploaded_file, 'r') as zf:
|
| 51 |
+
zf.extractall(eeg_path)
|
| 52 |
+
# Optional: validate
|
| 53 |
+
if not (eeg_path / "info.xml").exists():
|
| 54 |
+
raise ValueError("Invalid MFF: missing 'info.xml' in the ZIP.")
|
| 55 |
+
else:
|
| 56 |
+
# For FIF: copy the file into subject_dir with its original name
|
| 57 |
+
eeg_path = subject_dir / uploaded_file.name
|
| 58 |
+
shutil.copy(uploaded_file, eeg_path)
|
| 59 |
|
| 60 |
try:
|
| 61 |
# Run pipeline
|
|
|
|
| 87 |
f.write(f"=== EEG Info Summary for {SUBJECT_ID} ===\n")
|
| 88 |
f.write(str(raw.info))
|
| 89 |
|
| 90 |
+
# Create ZIP of entire subject_dir
|
| 91 |
zip_path = OUTPUT_DIR / f"{SUBJECT_ID}_preprocessing_output.zip"
|
| 92 |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
| 93 |
for root, _, files in os.walk(subject_dir):
|