import streamlit as st import roboflow import pandas as pd import matplotlib.pyplot as plt import zipfile import tempfile from shapely.geometry import Polygon from PIL import Image from io import BytesIO from concurrent.futures import ThreadPoolExecutor from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseUpload import gspread import time APP_VERSION = "2.4" # ========================= # Roboflow init # ========================= API_KEY = st.secrets["roboflow_api_key"] rf = roboflow.Roboflow(api_key=API_KEY) project = rf.workspace(st.secrets["roboflow_workspace"]).project(st.secrets["roboflow_project"]) model = project.version(st.secrets["roboflow_version"]).model model.confidence = 80 model.overlap = 25 dpi_value = 300 # ========================= # Google Drive + Sheets (OAuth2) # ========================= scope = ["https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/spreadsheets"] credentials = Credentials( token=None, refresh_token=st.secrets["GOOGLE_DRIVE_REFRESH_TOKEN"], token_uri="https://oauth2.googleapis.com/token", client_id=st.secrets["GOOGLE_DRIVE_CLIENT_ID"], client_secret=st.secrets["GOOGLE_DRIVE_CLIENT_SECRET"], scopes=scope, ) drive_service = build("drive", "v3", credentials=credentials) sheets_client = gspread.authorize(credentials) sheet = sheets_client.open_by_url(st.secrets["feedback_sheet_url"]).sheet1 # ========================= # Helpers # ========================= def calculate_polygon_area(points): polygon = Polygon([(p["x"], p["y"]) for p in points]) return polygon.area def safe_predict(image_path): for _ in range(3): try: return model.predict(image_path) except Exception: time.sleep(1) return None def resize_image(image): return image.resize((640, 640)) def upload_to_drive(image_bytes, filename, folder_id): media = MediaIoBaseUpload(image_bytes, mimetype="image/png") drive_service.files().create( body={"name": filename, "parents": [folder_id]}, media_body=media, fields="id", ).execute() def find_or_create_folder(folder_name, parent=None): query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false" if parent: query += f" and '{parent}' in parents" results = drive_service.files().list(q=query, spaces="drive", fields="files(id, name)").execute() folders = results.get("files", []) if folders: return folders[0]["id"] file_metadata = {"name": folder_name, "mimeType": "application/vnd.google-apps.folder"} if parent: file_metadata["parents"] = [parent] file = drive_service.files().create(body=file_metadata, fields="id").execute() return file.get("id") def get_image_bytes(image): buf = BytesIO() image.save(buf, format="PNG") buf.seek(0) return buf def process_image(uploaded_file, fov_um=None, pixel_size_um=None): try: safe_name = uploaded_file.name.replace(" ", "_") image = Image.open(uploaded_file).convert("RGB") width_px, _ = image.size effective_pixel_size_um = None if pixel_size_um is not None and pixel_size_um > 0: effective_pixel_size_um = pixel_size_um elif fov_um is not None and fov_um > 0: effective_pixel_size_um = fov_um / float(width_px) with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as temp_file: image.save(temp_file.name) prediction = safe_predict(temp_file.name) if not prediction: return { "Imagem": safe_name, "Área Segmentada (px²)": None, "Área Segmentada (µm²)": None, "SemSegmentacao": True, "Exibir": image, "Original": get_image_bytes(image), } prediction_data = prediction.json() if not prediction_data["predictions"]: return { "Imagem": safe_name, "Área Segmentada (px²)": None, "Área Segmentada (µm²)": None, "SemSegmentacao": True, "Exibir": image, "Original": get_image_bytes(image), } points = prediction_data["predictions"][0]["points"] area_px2 = calculate_polygon_area(points) area_um2 = None if effective_pixel_size_um is not None: area_um2 = area_px2 * (effective_pixel_size_um**2) x = [p["x"] for p in points] + [points[0]["x"]] y = [p["y"] for p in points] + [points[0]["y"]] original_buffer = get_image_bytes(image) segmented_buffer = BytesIO() fig, ax = plt.subplots(figsize=(6, 6), dpi=dpi_value) ax.imshow(image) ax.plot(x, y, color="red", linewidth=2) ax.axis("off") plt.savefig(segmented_buffer, format="png", bbox_inches="tight", pad_inches=0) plt.close() polygon_buffer = BytesIO() fig2, ax2 = plt.subplots(figsize=(6, 6), dpi=dpi_value) ax2.plot(x, y, "r-", linewidth=2) ax2.scatter(x, y, color="red", s=5) ax2.set_title("Polygon contour") ax2.grid(True) plt.savefig(polygon_buffer, format="png", bbox_inches="tight") plt.close() return { "Imagem": safe_name, "Área Segmentada (px²)": area_px2, "Área Segmentada (µm²)": area_um2, "Original": original_buffer, "Segmentada": segmented_buffer, "Poligono": polygon_buffer, "Exibir": image, "SemSegmentacao": False, } except Exception: return None def save_feedback(result, avaliacao, observacao): image_name = result["Imagem"] # 1) Sheet sheet.append_row([image_name, avaliacao, observacao]) # 2) Drive curation if avaliacao in ["Acceptable", "Bad", "No segmentation"]: sufixo = "aceitavel" if avaliacao == "Acceptable" else "ruim" if avaliacao == "Bad" else "sem_segmentacao" parent_folder = find_or_create_folder("Feedback Segmentacoes") subfolder = find_or_create_folder(image_name.replace(".png", ""), parent_folder) resized_original = resize_image(result["Exibir"]) buf = BytesIO() resized_original.save(buf, format="PNG") buf.seek(0) upload_to_drive(buf, f"original_{sufixo}.png", subfolder) if avaliacao != "No segmentation" and result.get("Segmentada") and result.get("Poligono"): resized_segmented = resize_image(Image.open(BytesIO(result["Segmentada"].getvalue()))) resized_polygon = resize_image(Image.open(BytesIO(result["Poligono"].getvalue()))) for img_obj, nome in zip([resized_segmented, resized_polygon], ["segmentada", "poligono"]): buf = BytesIO() img_obj.save(buf, format="PNG") buf.seek(0) upload_to_drive(buf, f"{nome}_{sufixo}.png", subfolder) def render_metrics(result): area_px2 = result["Área Segmentada (px²)"] area_um2 = result["Área Segmentada (µm²)"] st.markdown("**Segmented area**") if area_px2 is not None: st.markdown(f"- {area_px2:.2f} px²") if area_um2 is not None: st.markdown(f"- {area_um2:.2f} µm²") def render_feedback_block(result, prefix_key=""): st.markdown("#### Segmentation quality feedback") st.caption("User evaluation used for future model refinement.") avaliacao = st.radio( "Segmentation quality assessment:", ["Great", "Acceptable", "Bad", "No segmentation"], horizontal=True, key=f"{prefix_key}radio_{result['Imagem']}", ) observacao = st.text_area( "Observations (optional):", key=f"{prefix_key}obs_{result['Imagem']}", ) if st.button("Save feedback", key=f"{prefix_key}btn_{result['Imagem']}"): save_feedback(result, avaliacao, observacao) st.success("Feedback saved successfully.") # ========================= # Layout / UI # ========================= st.set_page_config(page_title="Scratch Assay Segmentation", layout="wide") st.title("Scratch Assay Segmentation Tool") st.caption(f"Version {APP_VERSION} · Deep learning–based wound closure segmentation") st.markdown("---") # Upload block st.markdown("### Input") upload_option = st.radio("Choose upload type:", ["Single image", "Image folder"], horizontal=True) # Advanced settings (collapsed by default) with st.expander("⚙️ Advanced Settings", expanded=False): model.confidence = st.slider("Model confidence (%)", 20, 100, 80) st.markdown( "### Physical calibration (optional)\n" "Provide the physical scale for conversion from pixel area to physical units (µm²). " "If left empty, results will be reported only in pixels²." ) c1, c2 = st.columns(2) fov_um = c1.number_input( "Field of view width (µm)", min_value=0.0, value=0.0, step=1.0, help="Physical width of the image field, in micrometers.", ) pixel_size_um = c2.number_input( "Pixel size (µm / pixel)", min_value=0.0, value=0.0, step=0.01, help="If provided, this overrides the FOV-based calibration.", ) results = [] with st.sidebar: st.markdown("## Info") with st.expander("About / Citation", expanded=False): st.markdown( """ This tool was developed by the **Medical Physics Laboratory** of the Department of **Biophysics and Pharmacology – IBB, UNESP**. **FAPESP Process:** 2024/01849-4. **Coordination:** Prof. Allan Alves. **Development:** Nycolas Mariotto. """ ) # ========================= # Single image # ========================= if upload_option == "Single image": uploaded_file = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg", "tiff"]) if uploaded_file: st.markdown("---") st.markdown("### Result") result = process_image(uploaded_file, fov_um=fov_um, pixel_size_um=pixel_size_um) if result: results.append(result) st.markdown(f"#### {result['Imagem']}") if result["SemSegmentacao"]: col = st.columns(1)[0] with col: st.image(result["Exibir"], caption="Original", use_container_width=True) st.warning("No segmentation was detected for this image.") else: col1, col2, col3 = st.columns(3) with col1: st.image(result["Exibir"], caption="Original", use_container_width=True) with col2: st.image(result["Segmentada"], caption="Segmentation", use_container_width=True) with col3: st.image(result["Poligono"], caption="Polygon", use_container_width=True) render_metrics(result) st.markdown("### Export") st.download_button( "Download segmented overlay (PNG)", data=result["Segmentada"], file_name=f"segmented_{result['Imagem']}.png", mime="image/png", ) st.markdown("---") render_feedback_block(result, prefix_key="single_") # ========================= # Folder # ========================= elif upload_option == "Image folder": uploaded_files = st.file_uploader( "Upload multiple images", type=["png", "jpg", "jpeg", "tiff"], accept_multiple_files=True, ) if uploaded_files: st.markdown("---") st.markdown("### Processing") def process_wrapper(f): return process_image(f, fov_um=fov_um, pixel_size_um=pixel_size_um) with ThreadPoolExecutor(max_workers=4) as executor: processed = list(executor.map(process_wrapper, uploaded_files)) falhas = [f.name for f, r in zip(uploaded_files, processed) if r and r.get("SemSegmentacao")] if falhas: st.warning( f"{len(falhas)} image(s) with no segmentation detected:\n\n- " + "\n- ".join(falhas) ) zip_images_buffer = BytesIO() with zipfile.ZipFile(zip_images_buffer, "w") as zip_file: for idx, result in enumerate(processed, start=1): if not result: continue results.append(result) st.markdown("---") st.markdown(f"### Result {idx} · {result['Imagem']}") if result["SemSegmentacao"]: st.image(result["Exibir"], caption="Original", use_container_width=True) st.warning("No segmentation was detected for this image.") else: col1, col2, col3 = st.columns(3) with col1: st.image(result["Exibir"], caption="Original", use_container_width=True) with col2: st.image(result["Segmentada"], caption="Segmentation", use_container_width=True) with col3: st.image(result["Poligono"], caption="Polygon", use_container_width=True) render_metrics(result) # Build ZIP zip_file.writestr(f"segmentada_{result['Imagem']}.png", result["Segmentada"].getvalue()) zip_file.writestr(f"poligono_{result['Imagem']}.png", result["Poligono"].getvalue()) render_feedback_block(result, prefix_key="folder_") zip_images_buffer.seek(0) # Summary table + exports if results: st.markdown("---") st.markdown("### Quantitative results") df = pd.DataFrame( [ { "Image": r["Imagem"], "Segmented Area (px²)": ( r["Área Segmentada (px²)"] if (not r["SemSegmentacao"] and r["Área Segmentada (px²)"] is not None) else "No Segmentation" ), "Segmented Area (µm²)": ( f"{r['Área Segmentada (µm²)']:.2f}" if (not r["SemSegmentacao"] and r["Área Segmentada (µm²)"] is not None) else "" ), } for r in results ] ) st.dataframe(df, use_container_width=True) excel_buffer = BytesIO() df.to_excel(excel_buffer, index=False) excel_buffer.seek(0) st.markdown("### Export results") c1, c2 = st.columns(2) with c1: st.download_button( "Download table (Excel)", data=excel_buffer, file_name="segmentation_results.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", use_container_width=True, ) with c2: st.download_button( "Download segmented images (ZIP)", data=zip_images_buffer, file_name="segmented_images.zip", mime="application/zip", use_container_width=True, )