Spaces:
Sleeping
Sleeping
| """OphthalmoCapture — Export Service | |
| Generates in-memory ZIP packages for individual images or the full session. | |
| Also produces ML-ready formats (HuggingFace CSV, JSONL). | |
| Everything is built from st.session_state — nothing touches disk. | |
| """ | |
| import io | |
| import csv | |
| import json | |
| import zipfile | |
| import datetime | |
| import streamlit as st | |
| def _sanitize(name: str) -> str: | |
| """Remove characters not safe for ZIP entry names.""" | |
| return "".join(c if c.isalnum() or c in "._- " else "_" for c in name) | |
| def _image_metadata(img: dict) -> dict: | |
| """Build a JSON-serialisable metadata dict for one image.""" | |
| return { | |
| "filename": img["filename"], | |
| "label": img["label"], | |
| "locs_data": img.get("locs_data", {}), | |
| "transcription": img["transcription"], | |
| "transcription_original": img["transcription_original"], | |
| "doctor": img.get("labeled_by", ""), | |
| "timestamp": img["timestamp"].isoformat() if img.get("timestamp") else "", | |
| "has_audio": img["audio_bytes"] is not None, | |
| } | |
| # ── Individual export ──────────────────────────────────────────────────────── | |
| def export_single_image(image_id: str) -> tuple[bytes, str]: | |
| """Create a ZIP for one image's labeling data. | |
| Returns (zip_bytes, suggested_filename). | |
| """ | |
| img = st.session_state.images[image_id] | |
| safe_name = _sanitize(img["filename"].rsplit(".", 1)[0]) | |
| folder = f"etiquetado_{safe_name}" | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: | |
| # metadata.json | |
| meta = _image_metadata(img) | |
| zf.writestr(f"{folder}/metadata.json", json.dumps(meta, ensure_ascii=False, indent=2)) | |
| # transcripcion.txt | |
| zf.writestr(f"{folder}/transcripcion.txt", img["transcription"] or "") | |
| # audio_dictado.wav (if recorded) | |
| if img["audio_bytes"]: | |
| zf.writestr(f"{folder}/audio_dictado.wav", img["audio_bytes"]) | |
| zip_bytes = buf.getvalue() | |
| return zip_bytes, f"{folder}.zip" | |
| # ── Bulk export (full session) ─────────────────────────────────────────────── | |
| def export_full_session() -> tuple[bytes, str]: | |
| """Create a ZIP with all images' labeling data + a summary CSV. | |
| Returns (zip_bytes, suggested_filename). | |
| """ | |
| now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M") | |
| root = f"sesion_{now}" | |
| images = st.session_state.images | |
| order = st.session_state.image_order | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: | |
| # ── Summary CSV ────────────────────────────────────────────────── | |
| csv_buf = io.StringIO() | |
| writer = csv.writer(csv_buf) | |
| writer.writerow(["filename", "label", "nuclear_opalescence", | |
| "nuclear_color", "cortical_opacity", | |
| "has_audio", "has_transcription", "doctor"]) | |
| for img_id in order: | |
| img = images[img_id] | |
| locs = img.get("locs_data", {}) | |
| writer.writerow([ | |
| img["filename"], | |
| img["label"] or "", | |
| locs.get("nuclear_opalescence", ""), | |
| locs.get("nuclear_color", ""), | |
| locs.get("cortical_opacity", ""), | |
| "yes" if img["audio_bytes"] else "no", | |
| "yes" if img["transcription"] else "no", | |
| img.get("labeled_by", ""), | |
| ]) | |
| zf.writestr(f"{root}/resumen.csv", csv_buf.getvalue()) | |
| # ── Full metadata JSON ─────────────────────────────────────────── | |
| all_meta = [] | |
| for img_id in order: | |
| all_meta.append(_image_metadata(images[img_id])) | |
| zf.writestr( | |
| f"{root}/etiquetas.json", | |
| json.dumps(all_meta, ensure_ascii=False, indent=2), | |
| ) | |
| # ── Per-image folders ──────────────────────────────────────────── | |
| for idx, img_id in enumerate(order, start=1): | |
| img = images[img_id] | |
| safe_name = _sanitize(img["filename"].rsplit(".", 1)[0]) | |
| img_folder = f"{root}/{idx:03d}_{safe_name}" | |
| meta = _image_metadata(img) | |
| zf.writestr(f"{img_folder}/metadata.json", json.dumps(meta, ensure_ascii=False, indent=2)) | |
| zf.writestr(f"{img_folder}/transcripcion.txt", img["transcription"] or "") | |
| if img["audio_bytes"]: | |
| zf.writestr(f"{img_folder}/audio_dictado.wav", img["audio_bytes"]) | |
| zip_bytes = buf.getvalue() | |
| return zip_bytes, f"{root}.zip" | |
| # ── Session summary ────────────────────────────────────────────────────────── | |
| def get_session_summary() -> dict: | |
| """Return a summary dict for pre-download validation.""" | |
| images = st.session_state.images | |
| total = len(images) | |
| labeled = sum(1 for img in images.values() if img["label"] is not None) | |
| with_audio = sum(1 for img in images.values() if img["audio_bytes"] is not None) | |
| with_text = sum(1 for img in images.values() if img["transcription"]) | |
| return { | |
| "total": total, | |
| "labeled": labeled, | |
| "with_audio": with_audio, | |
| "with_transcription": with_text, | |
| "unlabeled": total - labeled, | |
| } | |
| # ── ML-ready export formats (Idea F) ──────────────────────────────────────── | |
| def export_huggingface_csv() -> tuple[bytes, str]: | |
| """Export a CSV compatible with HuggingFace datasets. | |
| Columns: filename, label, label_code, transcription, doctor | |
| Only labeled images are included. | |
| Returns (csv_bytes, suggested_filename). | |
| """ | |
| import config | |
| images = st.session_state.images | |
| order = st.session_state.image_order | |
| label_map = {opt["display"]: opt["code"] for opt in config.LABEL_OPTIONS} | |
| buf = io.StringIO() | |
| writer = csv.writer(buf) | |
| writer.writerow(["filename", "label", "label_code", | |
| "nuclear_opalescence", "nuclear_color", "cortical_opacity", | |
| "transcription", "doctor"]) | |
| for img_id in order: | |
| img = images[img_id] | |
| if img["label"] is None: | |
| continue | |
| locs = img.get("locs_data", {}) | |
| writer.writerow([ | |
| img["filename"], | |
| img["label"], | |
| label_map.get(img["label"], ""), | |
| locs.get("nuclear_opalescence", ""), | |
| locs.get("nuclear_color", ""), | |
| locs.get("cortical_opacity", ""), | |
| img["transcription"], | |
| img.get("labeled_by", ""), | |
| ]) | |
| csv_bytes = buf.getvalue().encode("utf-8") | |
| now = datetime.datetime.now().strftime("%Y%m%d_%H%M") | |
| return csv_bytes, f"dataset_hf_{now}.csv" | |
| def export_jsonl() -> tuple[bytes, str]: | |
| """Export JSONL (one JSON object per line) suitable for LLM fine-tuning. | |
| Each line: {"filename", "label", "label_code", "transcription", "doctor"} | |
| Only labeled images are included. | |
| Returns (jsonl_bytes, suggested_filename). | |
| """ | |
| import config | |
| images = st.session_state.images | |
| order = st.session_state.image_order | |
| label_map = {opt["display"]: opt["code"] for opt in config.LABEL_OPTIONS} | |
| lines = [] | |
| for img_id in order: | |
| img = images[img_id] | |
| if img["label"] is None: | |
| continue | |
| locs = img.get("locs_data", {}) | |
| obj = { | |
| "filename": img["filename"], | |
| "label": img["label"], | |
| "label_code": label_map.get(img["label"], ""), | |
| "nuclear_opalescence": locs.get("nuclear_opalescence"), | |
| "nuclear_color": locs.get("nuclear_color"), | |
| "cortical_opacity": locs.get("cortical_opacity"), | |
| "transcription": img["transcription"], | |
| "doctor": img.get("labeled_by", ""), | |
| } | |
| lines.append(json.dumps(obj, ensure_ascii=False)) | |
| jsonl_bytes = "\n".join(lines).encode("utf-8") | |
| now = datetime.datetime.now().strftime("%Y%m%d_%H%M") | |
| return jsonl_bytes, f"dataset_{now}.jsonl" | |