OphtalmoCapture / interface /services /export_service.py
TheBug95's picture
Actualizacion de la interfaz de usuario e implementacion del sistema de etiquetado LOCS III. Arreglo de funcionalidades de borrado y restauracion de transcripcion e implementacion de Dialogos de alerta a las descargas de imagenes incompletas en etiquetado
5b7432c
"""OphthalmoCapture — Export Service
Generates in-memory ZIP packages for individual images or the full session.
Also produces ML-ready formats (HuggingFace CSV, JSONL).
Everything is built from st.session_state — nothing touches disk.
"""
import io
import csv
import json
import zipfile
import datetime
import streamlit as st
def _sanitize(name: str) -> str:
"""Remove characters not safe for ZIP entry names."""
return "".join(c if c.isalnum() or c in "._- " else "_" for c in name)
def _image_metadata(img: dict) -> dict:
"""Build a JSON-serialisable metadata dict for one image."""
return {
"filename": img["filename"],
"label": img["label"],
"locs_data": img.get("locs_data", {}),
"transcription": img["transcription"],
"transcription_original": img["transcription_original"],
"doctor": img.get("labeled_by", ""),
"timestamp": img["timestamp"].isoformat() if img.get("timestamp") else "",
"has_audio": img["audio_bytes"] is not None,
}
# ── Individual export ────────────────────────────────────────────────────────
def export_single_image(image_id: str) -> tuple[bytes, str]:
"""Create a ZIP for one image's labeling data.
Returns (zip_bytes, suggested_filename).
"""
img = st.session_state.images[image_id]
safe_name = _sanitize(img["filename"].rsplit(".", 1)[0])
folder = f"etiquetado_{safe_name}"
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
# metadata.json
meta = _image_metadata(img)
zf.writestr(f"{folder}/metadata.json", json.dumps(meta, ensure_ascii=False, indent=2))
# transcripcion.txt
zf.writestr(f"{folder}/transcripcion.txt", img["transcription"] or "")
# audio_dictado.wav (if recorded)
if img["audio_bytes"]:
zf.writestr(f"{folder}/audio_dictado.wav", img["audio_bytes"])
zip_bytes = buf.getvalue()
return zip_bytes, f"{folder}.zip"
# ── Bulk export (full session) ───────────────────────────────────────────────
def export_full_session() -> tuple[bytes, str]:
"""Create a ZIP with all images' labeling data + a summary CSV.
Returns (zip_bytes, suggested_filename).
"""
now = datetime.datetime.now().strftime("%Y-%m-%d_%H%M")
root = f"sesion_{now}"
images = st.session_state.images
order = st.session_state.image_order
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
# ── Summary CSV ──────────────────────────────────────────────────
csv_buf = io.StringIO()
writer = csv.writer(csv_buf)
writer.writerow(["filename", "label", "nuclear_opalescence",
"nuclear_color", "cortical_opacity",
"has_audio", "has_transcription", "doctor"])
for img_id in order:
img = images[img_id]
locs = img.get("locs_data", {})
writer.writerow([
img["filename"],
img["label"] or "",
locs.get("nuclear_opalescence", ""),
locs.get("nuclear_color", ""),
locs.get("cortical_opacity", ""),
"yes" if img["audio_bytes"] else "no",
"yes" if img["transcription"] else "no",
img.get("labeled_by", ""),
])
zf.writestr(f"{root}/resumen.csv", csv_buf.getvalue())
# ── Full metadata JSON ───────────────────────────────────────────
all_meta = []
for img_id in order:
all_meta.append(_image_metadata(images[img_id]))
zf.writestr(
f"{root}/etiquetas.json",
json.dumps(all_meta, ensure_ascii=False, indent=2),
)
# ── Per-image folders ────────────────────────────────────────────
for idx, img_id in enumerate(order, start=1):
img = images[img_id]
safe_name = _sanitize(img["filename"].rsplit(".", 1)[0])
img_folder = f"{root}/{idx:03d}_{safe_name}"
meta = _image_metadata(img)
zf.writestr(f"{img_folder}/metadata.json", json.dumps(meta, ensure_ascii=False, indent=2))
zf.writestr(f"{img_folder}/transcripcion.txt", img["transcription"] or "")
if img["audio_bytes"]:
zf.writestr(f"{img_folder}/audio_dictado.wav", img["audio_bytes"])
zip_bytes = buf.getvalue()
return zip_bytes, f"{root}.zip"
# ── Session summary ──────────────────────────────────────────────────────────
def get_session_summary() -> dict:
"""Return a summary dict for pre-download validation."""
images = st.session_state.images
total = len(images)
labeled = sum(1 for img in images.values() if img["label"] is not None)
with_audio = sum(1 for img in images.values() if img["audio_bytes"] is not None)
with_text = sum(1 for img in images.values() if img["transcription"])
return {
"total": total,
"labeled": labeled,
"with_audio": with_audio,
"with_transcription": with_text,
"unlabeled": total - labeled,
}
# ── ML-ready export formats (Idea F) ────────────────────────────────────────
def export_huggingface_csv() -> tuple[bytes, str]:
"""Export a CSV compatible with HuggingFace datasets.
Columns: filename, label, label_code, transcription, doctor
Only labeled images are included.
Returns (csv_bytes, suggested_filename).
"""
import config
images = st.session_state.images
order = st.session_state.image_order
label_map = {opt["display"]: opt["code"] for opt in config.LABEL_OPTIONS}
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["filename", "label", "label_code",
"nuclear_opalescence", "nuclear_color", "cortical_opacity",
"transcription", "doctor"])
for img_id in order:
img = images[img_id]
if img["label"] is None:
continue
locs = img.get("locs_data", {})
writer.writerow([
img["filename"],
img["label"],
label_map.get(img["label"], ""),
locs.get("nuclear_opalescence", ""),
locs.get("nuclear_color", ""),
locs.get("cortical_opacity", ""),
img["transcription"],
img.get("labeled_by", ""),
])
csv_bytes = buf.getvalue().encode("utf-8")
now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
return csv_bytes, f"dataset_hf_{now}.csv"
def export_jsonl() -> tuple[bytes, str]:
"""Export JSONL (one JSON object per line) suitable for LLM fine-tuning.
Each line: {"filename", "label", "label_code", "transcription", "doctor"}
Only labeled images are included.
Returns (jsonl_bytes, suggested_filename).
"""
import config
images = st.session_state.images
order = st.session_state.image_order
label_map = {opt["display"]: opt["code"] for opt in config.LABEL_OPTIONS}
lines = []
for img_id in order:
img = images[img_id]
if img["label"] is None:
continue
locs = img.get("locs_data", {})
obj = {
"filename": img["filename"],
"label": img["label"],
"label_code": label_map.get(img["label"], ""),
"nuclear_opalescence": locs.get("nuclear_opalescence"),
"nuclear_color": locs.get("nuclear_color"),
"cortical_opacity": locs.get("cortical_opacity"),
"transcription": img["transcription"],
"doctor": img.get("labeled_by", ""),
}
lines.append(json.dumps(obj, ensure_ascii=False))
jsonl_bytes = "\n".join(lines).encode("utf-8")
now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
return jsonl_bytes, f"dataset_{now}.jsonl"