Spaces:
Sleeping
Sleeping
| """OphthalmoCapture — Image Upload Component | |
| Handles file upload, validation, and ingestion into the ephemeral session. | |
| Uses @st.dialog modals to warn about: | |
| - Previously labeled images (from DB) — doctor chooses which to re-label. | |
| - Session duplicates — informational notice. | |
| """ | |
| import streamlit as st | |
| import config | |
| import database as db | |
| from i18n import t | |
| from services import session_manager as sm | |
| from utils import validate_image_bytes | |
| def _reset_uploader(): | |
| """Increment the uploader key counter to clear the file_uploader widget.""" | |
| st.session_state._uploader_counter = st.session_state.get("_uploader_counter", 0) + 1 | |
| # ── Modal: previously labeled images ───────────────────────────────────────── | |
| def _show_relabel_dialog(): | |
| """Modal dialog asking the doctor which previously-labeled images to re-upload.""" | |
| def _dlg(): | |
| pending = st.session_state.get("_pending_upload_review") | |
| if not pending: | |
| st.rerun() | |
| return | |
| prev = pending["previously_labeled"] | |
| non_labeled_count = len(pending["files"]) - len(prev) | |
| st.markdown(t("relabel_dialog_msg", count=len(prev))) | |
| if non_labeled_count > 0: | |
| st.info(t("relabel_new_info", count=non_labeled_count)) | |
| relabel_choices = {} | |
| for fname, records in prev.items(): | |
| latest = records[0] | |
| label_info = latest.get("label", "—") | |
| doctor_info = latest.get("doctorName", "—") | |
| ts_info = str(latest.get("createdAt", ""))[:16] | |
| n_times = len(records) | |
| badge = t("times_badge_plural", n=n_times) if n_times > 1 else t("times_badge", n=n_times) | |
| relabel_choices[fname] = st.checkbox( | |
| f"**{fname}** — _{label_info}_ | {doctor_info} | {ts_info} ({badge})", | |
| value=True, | |
| key=f"_dlg_relabel_{fname}", | |
| ) | |
| st.divider() | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| if st.button(t("accept_upload"), type="primary", use_container_width=True): | |
| _process_pending(relabel_choices) | |
| with col_b: | |
| if st.button(t("cancel_labeled"), use_container_width=True): | |
| _cancel_pending() | |
| _dlg() | |
| def _process_pending(relabel_choices: dict[str, bool]): | |
| """Ingest accepted files from the pending review.""" | |
| pending = st.session_state.pop("_pending_upload_review", None) | |
| if not pending: | |
| st.rerun() | |
| return | |
| prev = pending["previously_labeled"] | |
| files_dict = pending["files"] | |
| existing_filenames = { | |
| img["filename"] for img in st.session_state.images.values() | |
| } | |
| if "_processed_uploads" not in st.session_state: | |
| st.session_state._processed_uploads = set() | |
| added = 0 | |
| for fname, raw_bytes in files_dict.items(): | |
| # If it was previously labeled and doctor unchecked it → skip | |
| if fname in prev and not relabel_choices.get(fname, True): | |
| continue | |
| if fname not in existing_filenames: | |
| sm.add_image(fname, raw_bytes) | |
| st.session_state._processed_uploads.add(fname) | |
| st.session_state.session_downloaded = False | |
| added += 1 | |
| _reset_uploader() | |
| if added > 0 and st.session_state.current_image_id is None: | |
| st.session_state.current_image_id = st.session_state.image_order[0] | |
| st.rerun() | |
| def _cancel_pending(): | |
| """Cancel previously-labeled images but still ingest new (non-labeled) ones.""" | |
| pending = st.session_state.pop("_pending_upload_review", None) | |
| if pending: | |
| prev = pending["previously_labeled"] | |
| files_dict = pending["files"] | |
| existing_filenames = { | |
| img["filename"] for img in st.session_state.images.values() | |
| } | |
| if "_processed_uploads" not in st.session_state: | |
| st.session_state._processed_uploads = set() | |
| added = 0 | |
| for fname, raw_bytes in files_dict.items(): | |
| # Skip previously labeled — doctor chose to cancel them | |
| if fname in prev: | |
| continue | |
| if fname not in existing_filenames: | |
| sm.add_image(fname, raw_bytes) | |
| st.session_state._processed_uploads.add(fname) | |
| st.session_state.session_downloaded = False | |
| added += 1 | |
| if added > 0 and st.session_state.current_image_id is None: | |
| st.session_state.current_image_id = st.session_state.image_order[0] | |
| _reset_uploader() | |
| st.rerun() | |
| # ── Modal: session duplicates (informational) ──────────────────────────────── | |
| def _show_duplicates_dialog(): | |
| """Informational modal listing images already present in the current session.""" | |
| def _dlg(): | |
| dup_names = st.session_state.get("_session_duplicates", []) | |
| if not dup_names: | |
| st.rerun() | |
| return | |
| st.markdown( | |
| t("duplicates_dialog_msg") | |
| ) | |
| for fname in dup_names: | |
| st.markdown(f"- `{fname}`") | |
| if st.button(t("accept"), use_container_width=True): | |
| st.session_state.pop("_session_duplicates", None) | |
| st.rerun() | |
| _dlg() | |
| # ── Main uploader ──────────────────────────────────────────────────────────── | |
| def render_uploader(): | |
| """Render the file uploader and process new uploads. | |
| Returns the number of newly added images (0 if none). | |
| """ | |
| counter = st.session_state.get("_uploader_counter", 0) | |
| uploaded_files = st.file_uploader( | |
| t("upload_images"), | |
| type=config.ALLOWED_EXTENSIONS, | |
| accept_multiple_files=True, | |
| help=f"{t('upload_help_formats')}: {', '.join(config.ALLOWED_EXTENSIONS)}. " | |
| f"{t('upload_help_max')} {config.MAX_UPLOAD_SIZE_MB} MB.", | |
| key=f"uploader_{counter}", | |
| ) | |
| # ── Show pending dialogs (survive reruns) ──────────────────────────── | |
| if "_pending_upload_review" in st.session_state: | |
| _show_relabel_dialog() | |
| return 0 | |
| if "_session_duplicates" in st.session_state: | |
| _show_duplicates_dialog() | |
| return 0 | |
| if not uploaded_files: | |
| return 0 | |
| if "_processed_uploads" not in st.session_state: | |
| st.session_state._processed_uploads = set() | |
| existing_filenames = { | |
| img["filename"] for img in st.session_state.images.values() | |
| } | |
| # ── Classify files ─────────────────────────────────────────────────── | |
| new_files = [] | |
| skipped_invalid = 0 | |
| session_duplicates = [] | |
| for uf in uploaded_files: | |
| # Already in the current session | |
| if uf.name in existing_filenames: | |
| if uf.name not in st.session_state._processed_uploads: | |
| session_duplicates.append(uf.name) | |
| st.session_state._processed_uploads.add(uf.name) | |
| continue | |
| # Already ingested via this uploader cycle | |
| if uf.name in st.session_state._processed_uploads: | |
| continue | |
| raw_bytes = uf.getvalue() | |
| if not validate_image_bytes(raw_bytes): | |
| skipped_invalid += 1 | |
| continue | |
| new_files.append((uf.name, raw_bytes)) | |
| # ── Check DB for previously labeled images ─────────────────────────── | |
| if new_files: | |
| new_filenames = [name for name, _ in new_files] | |
| previously_labeled = db.get_previously_labeled_filenames(new_filenames) | |
| if previously_labeled: | |
| # Store all files (new + previously labeled) for review | |
| st.session_state["_pending_upload_review"] = { | |
| "files": {name: raw for name, raw in new_files}, | |
| "previously_labeled": previously_labeled, | |
| } | |
| # Also show session duplicate dialog afterward if needed | |
| if session_duplicates: | |
| st.session_state["_session_duplicates"] = session_duplicates | |
| st.rerun() | |
| return 0 | |
| # ── Ingest files that need no review ───────────────────────────────── | |
| new_count = 0 | |
| for name, raw_bytes in new_files: | |
| if name in existing_filenames: | |
| continue | |
| if name in st.session_state._processed_uploads: | |
| continue | |
| sm.add_image(name, raw_bytes) | |
| existing_filenames.add(name) | |
| st.session_state._processed_uploads.add(name) | |
| st.session_state.session_downloaded = False | |
| new_count += 1 | |
| if skipped_invalid > 0: | |
| st.warning( | |
| f"⚠️ {skipped_invalid} {t('invalid_files')}" | |
| ) | |
| if new_count > 0: | |
| _reset_uploader() | |
| if st.session_state.current_image_id is None: | |
| st.session_state.current_image_id = st.session_state.image_order[0] | |
| # ── Show session duplicate info dialog if any ──────────────────────── | |
| if session_duplicates: | |
| st.session_state["_session_duplicates"] = session_duplicates | |
| st.rerun() | |
| return new_count | |