|
|
import streamlit as st |
|
|
import roboflow |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
import zipfile |
|
|
import tempfile |
|
|
from shapely.geometry import Polygon |
|
|
from PIL import Image |
|
|
from io import BytesIO |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from google.oauth2.credentials import Credentials |
|
|
from googleapiclient.discovery import build |
|
|
from googleapiclient.http import MediaIoBaseUpload |
|
|
import gspread |
|
|
import time |
|
|
|
|
|
APP_VERSION = "2.4" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
API_KEY = st.secrets["roboflow_api_key"] |
|
|
rf = roboflow.Roboflow(api_key=API_KEY) |
|
|
project = rf.workspace(st.secrets["roboflow_workspace"]).project(st.secrets["roboflow_project"]) |
|
|
model = project.version(st.secrets["roboflow_version"]).model |
|
|
model.confidence = 80 |
|
|
model.overlap = 25 |
|
|
dpi_value = 300 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scope = ["https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/spreadsheets"] |
|
|
credentials = Credentials( |
|
|
token=None, |
|
|
refresh_token=st.secrets["GOOGLE_DRIVE_REFRESH_TOKEN"], |
|
|
token_uri="https://oauth2.googleapis.com/token", |
|
|
client_id=st.secrets["GOOGLE_DRIVE_CLIENT_ID"], |
|
|
client_secret=st.secrets["GOOGLE_DRIVE_CLIENT_SECRET"], |
|
|
scopes=scope, |
|
|
) |
|
|
drive_service = build("drive", "v3", credentials=credentials) |
|
|
sheets_client = gspread.authorize(credentials) |
|
|
sheet = sheets_client.open_by_url(st.secrets["feedback_sheet_url"]).sheet1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_polygon_area(points): |
|
|
polygon = Polygon([(p["x"], p["y"]) for p in points]) |
|
|
return polygon.area |
|
|
|
|
|
|
|
|
def safe_predict(image_path): |
|
|
for _ in range(3): |
|
|
try: |
|
|
return model.predict(image_path) |
|
|
except Exception: |
|
|
time.sleep(1) |
|
|
return None |
|
|
|
|
|
|
|
|
def resize_image(image): |
|
|
return image.resize((640, 640)) |
|
|
|
|
|
|
|
|
def upload_to_drive(image_bytes, filename, folder_id): |
|
|
media = MediaIoBaseUpload(image_bytes, mimetype="image/png") |
|
|
drive_service.files().create( |
|
|
body={"name": filename, "parents": [folder_id]}, |
|
|
media_body=media, |
|
|
fields="id", |
|
|
).execute() |
|
|
|
|
|
|
|
|
def find_or_create_folder(folder_name, parent=None): |
|
|
query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false" |
|
|
if parent: |
|
|
query += f" and '{parent}' in parents" |
|
|
results = drive_service.files().list(q=query, spaces="drive", fields="files(id, name)").execute() |
|
|
folders = results.get("files", []) |
|
|
if folders: |
|
|
return folders[0]["id"] |
|
|
|
|
|
file_metadata = {"name": folder_name, "mimeType": "application/vnd.google-apps.folder"} |
|
|
if parent: |
|
|
file_metadata["parents"] = [parent] |
|
|
file = drive_service.files().create(body=file_metadata, fields="id").execute() |
|
|
return file.get("id") |
|
|
|
|
|
|
|
|
def get_image_bytes(image): |
|
|
buf = BytesIO() |
|
|
image.save(buf, format="PNG") |
|
|
buf.seek(0) |
|
|
return buf |
|
|
|
|
|
|
|
|
def process_image(uploaded_file, fov_um=None, pixel_size_um=None): |
|
|
try: |
|
|
safe_name = uploaded_file.name.replace(" ", "_") |
|
|
image = Image.open(uploaded_file).convert("RGB") |
|
|
|
|
|
width_px, _ = image.size |
|
|
|
|
|
effective_pixel_size_um = None |
|
|
if pixel_size_um is not None and pixel_size_um > 0: |
|
|
effective_pixel_size_um = pixel_size_um |
|
|
elif fov_um is not None and fov_um > 0: |
|
|
effective_pixel_size_um = fov_um / float(width_px) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as temp_file: |
|
|
image.save(temp_file.name) |
|
|
prediction = safe_predict(temp_file.name) |
|
|
if not prediction: |
|
|
return { |
|
|
"Imagem": safe_name, |
|
|
"Área Segmentada (px²)": None, |
|
|
"Área Segmentada (µm²)": None, |
|
|
"SemSegmentacao": True, |
|
|
"Exibir": image, |
|
|
"Original": get_image_bytes(image), |
|
|
} |
|
|
prediction_data = prediction.json() |
|
|
|
|
|
if not prediction_data["predictions"]: |
|
|
return { |
|
|
"Imagem": safe_name, |
|
|
"Área Segmentada (px²)": None, |
|
|
"Área Segmentada (µm²)": None, |
|
|
"SemSegmentacao": True, |
|
|
"Exibir": image, |
|
|
"Original": get_image_bytes(image), |
|
|
} |
|
|
|
|
|
points = prediction_data["predictions"][0]["points"] |
|
|
area_px2 = calculate_polygon_area(points) |
|
|
|
|
|
area_um2 = None |
|
|
if effective_pixel_size_um is not None: |
|
|
area_um2 = area_px2 * (effective_pixel_size_um**2) |
|
|
|
|
|
x = [p["x"] for p in points] + [points[0]["x"]] |
|
|
y = [p["y"] for p in points] + [points[0]["y"]] |
|
|
|
|
|
original_buffer = get_image_bytes(image) |
|
|
|
|
|
segmented_buffer = BytesIO() |
|
|
fig, ax = plt.subplots(figsize=(6, 6), dpi=dpi_value) |
|
|
ax.imshow(image) |
|
|
ax.plot(x, y, color="red", linewidth=2) |
|
|
ax.axis("off") |
|
|
plt.savefig(segmented_buffer, format="png", bbox_inches="tight", pad_inches=0) |
|
|
plt.close() |
|
|
|
|
|
polygon_buffer = BytesIO() |
|
|
fig2, ax2 = plt.subplots(figsize=(6, 6), dpi=dpi_value) |
|
|
ax2.plot(x, y, "r-", linewidth=2) |
|
|
ax2.scatter(x, y, color="red", s=5) |
|
|
ax2.set_title("Polygon contour") |
|
|
ax2.grid(True) |
|
|
plt.savefig(polygon_buffer, format="png", bbox_inches="tight") |
|
|
plt.close() |
|
|
|
|
|
return { |
|
|
"Imagem": safe_name, |
|
|
"Área Segmentada (px²)": area_px2, |
|
|
"Área Segmentada (µm²)": area_um2, |
|
|
"Original": original_buffer, |
|
|
"Segmentada": segmented_buffer, |
|
|
"Poligono": polygon_buffer, |
|
|
"Exibir": image, |
|
|
"SemSegmentacao": False, |
|
|
} |
|
|
|
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
def save_feedback(result, avaliacao, observacao): |
|
|
image_name = result["Imagem"] |
|
|
|
|
|
|
|
|
sheet.append_row([image_name, avaliacao, observacao]) |
|
|
|
|
|
|
|
|
if avaliacao in ["Acceptable", "Bad", "No segmentation"]: |
|
|
sufixo = "aceitavel" if avaliacao == "Acceptable" else "ruim" if avaliacao == "Bad" else "sem_segmentacao" |
|
|
parent_folder = find_or_create_folder("Feedback Segmentacoes") |
|
|
subfolder = find_or_create_folder(image_name.replace(".png", ""), parent_folder) |
|
|
|
|
|
resized_original = resize_image(result["Exibir"]) |
|
|
buf = BytesIO() |
|
|
resized_original.save(buf, format="PNG") |
|
|
buf.seek(0) |
|
|
upload_to_drive(buf, f"original_{sufixo}.png", subfolder) |
|
|
|
|
|
if avaliacao != "No segmentation" and result.get("Segmentada") and result.get("Poligono"): |
|
|
resized_segmented = resize_image(Image.open(BytesIO(result["Segmentada"].getvalue()))) |
|
|
resized_polygon = resize_image(Image.open(BytesIO(result["Poligono"].getvalue()))) |
|
|
|
|
|
for img_obj, nome in zip([resized_segmented, resized_polygon], ["segmentada", "poligono"]): |
|
|
buf = BytesIO() |
|
|
img_obj.save(buf, format="PNG") |
|
|
buf.seek(0) |
|
|
upload_to_drive(buf, f"{nome}_{sufixo}.png", subfolder) |
|
|
|
|
|
|
|
|
def render_metrics(result): |
|
|
area_px2 = result["Área Segmentada (px²)"] |
|
|
area_um2 = result["Área Segmentada (µm²)"] |
|
|
|
|
|
st.markdown("**Segmented area**") |
|
|
if area_px2 is not None: |
|
|
st.markdown(f"- {area_px2:.2f} px²") |
|
|
if area_um2 is not None: |
|
|
st.markdown(f"- {area_um2:.2f} µm²") |
|
|
|
|
|
|
|
|
def render_feedback_block(result, prefix_key=""): |
|
|
st.markdown("#### Segmentation quality feedback") |
|
|
st.caption("User evaluation used for future model refinement.") |
|
|
|
|
|
avaliacao = st.radio( |
|
|
"Segmentation quality assessment:", |
|
|
["Great", "Acceptable", "Bad", "No segmentation"], |
|
|
horizontal=True, |
|
|
key=f"{prefix_key}radio_{result['Imagem']}", |
|
|
) |
|
|
observacao = st.text_area( |
|
|
"Observations (optional):", |
|
|
key=f"{prefix_key}obs_{result['Imagem']}", |
|
|
) |
|
|
if st.button("Save feedback", key=f"{prefix_key}btn_{result['Imagem']}"): |
|
|
save_feedback(result, avaliacao, observacao) |
|
|
st.success("Feedback saved successfully.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Scratch Assay Segmentation", layout="wide") |
|
|
|
|
|
st.title("Scratch Assay Segmentation Tool") |
|
|
st.caption(f"Version {APP_VERSION} · Deep learning–based wound closure segmentation") |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
st.markdown("### Input") |
|
|
upload_option = st.radio("Choose upload type:", ["Single image", "Image folder"], horizontal=True) |
|
|
|
|
|
|
|
|
with st.expander("⚙️ Advanced Settings", expanded=False): |
|
|
model.confidence = st.slider("Model confidence (%)", 20, 100, 80) |
|
|
st.markdown( |
|
|
"### Physical calibration (optional)\n" |
|
|
"Provide the physical scale for conversion from pixel area to physical units (µm²). " |
|
|
"If left empty, results will be reported only in pixels²." |
|
|
) |
|
|
c1, c2 = st.columns(2) |
|
|
fov_um = c1.number_input( |
|
|
"Field of view width (µm)", |
|
|
min_value=0.0, |
|
|
value=0.0, |
|
|
step=1.0, |
|
|
help="Physical width of the image field, in micrometers.", |
|
|
) |
|
|
pixel_size_um = c2.number_input( |
|
|
"Pixel size (µm / pixel)", |
|
|
min_value=0.0, |
|
|
value=0.0, |
|
|
step=0.01, |
|
|
help="If provided, this overrides the FOV-based calibration.", |
|
|
) |
|
|
|
|
|
results = [] |
|
|
|
|
|
with st.sidebar: |
|
|
st.markdown("## Info") |
|
|
with st.expander("About / Citation", expanded=False): |
|
|
st.markdown( |
|
|
""" |
|
|
This tool was developed by the **Medical Physics Laboratory** of the Department of **Biophysics and Pharmacology – IBB, UNESP**. |
|
|
**FAPESP Process:** 2024/01849-4. |
|
|
**Coordination:** Prof. Allan Alves. |
|
|
**Development:** Nycolas Mariotto. |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if upload_option == "Single image": |
|
|
uploaded_file = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg", "tiff"]) |
|
|
if uploaded_file: |
|
|
st.markdown("---") |
|
|
st.markdown("### Result") |
|
|
|
|
|
result = process_image(uploaded_file, fov_um=fov_um, pixel_size_um=pixel_size_um) |
|
|
if result: |
|
|
results.append(result) |
|
|
|
|
|
st.markdown(f"#### {result['Imagem']}") |
|
|
|
|
|
if result["SemSegmentacao"]: |
|
|
col = st.columns(1)[0] |
|
|
with col: |
|
|
st.image(result["Exibir"], caption="Original", use_container_width=True) |
|
|
st.warning("No segmentation was detected for this image.") |
|
|
else: |
|
|
col1, col2, col3 = st.columns(3) |
|
|
with col1: |
|
|
st.image(result["Exibir"], caption="Original", use_container_width=True) |
|
|
with col2: |
|
|
st.image(result["Segmentada"], caption="Segmentation", use_container_width=True) |
|
|
with col3: |
|
|
st.image(result["Poligono"], caption="Polygon", use_container_width=True) |
|
|
|
|
|
render_metrics(result) |
|
|
|
|
|
st.markdown("### Export") |
|
|
st.download_button( |
|
|
"Download segmented overlay (PNG)", |
|
|
data=result["Segmentada"], |
|
|
file_name=f"segmented_{result['Imagem']}.png", |
|
|
mime="image/png", |
|
|
) |
|
|
|
|
|
st.markdown("---") |
|
|
render_feedback_block(result, prefix_key="single_") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif upload_option == "Image folder": |
|
|
uploaded_files = st.file_uploader( |
|
|
"Upload multiple images", |
|
|
type=["png", "jpg", "jpeg", "tiff"], |
|
|
accept_multiple_files=True, |
|
|
) |
|
|
|
|
|
if uploaded_files: |
|
|
st.markdown("---") |
|
|
st.markdown("### Processing") |
|
|
|
|
|
def process_wrapper(f): |
|
|
return process_image(f, fov_um=fov_um, pixel_size_um=pixel_size_um) |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as executor: |
|
|
processed = list(executor.map(process_wrapper, uploaded_files)) |
|
|
|
|
|
falhas = [f.name for f, r in zip(uploaded_files, processed) if r and r.get("SemSegmentacao")] |
|
|
if falhas: |
|
|
st.warning( |
|
|
f"{len(falhas)} image(s) with no segmentation detected:\n\n- " + "\n- ".join(falhas) |
|
|
) |
|
|
|
|
|
zip_images_buffer = BytesIO() |
|
|
with zipfile.ZipFile(zip_images_buffer, "w") as zip_file: |
|
|
for idx, result in enumerate(processed, start=1): |
|
|
if not result: |
|
|
continue |
|
|
|
|
|
results.append(result) |
|
|
st.markdown("---") |
|
|
st.markdown(f"### Result {idx} · {result['Imagem']}") |
|
|
|
|
|
if result["SemSegmentacao"]: |
|
|
st.image(result["Exibir"], caption="Original", use_container_width=True) |
|
|
st.warning("No segmentation was detected for this image.") |
|
|
else: |
|
|
col1, col2, col3 = st.columns(3) |
|
|
with col1: |
|
|
st.image(result["Exibir"], caption="Original", use_container_width=True) |
|
|
with col2: |
|
|
st.image(result["Segmentada"], caption="Segmentation", use_container_width=True) |
|
|
with col3: |
|
|
st.image(result["Poligono"], caption="Polygon", use_container_width=True) |
|
|
|
|
|
render_metrics(result) |
|
|
|
|
|
|
|
|
zip_file.writestr(f"segmentada_{result['Imagem']}.png", result["Segmentada"].getvalue()) |
|
|
zip_file.writestr(f"poligono_{result['Imagem']}.png", result["Poligono"].getvalue()) |
|
|
|
|
|
render_feedback_block(result, prefix_key="folder_") |
|
|
|
|
|
zip_images_buffer.seek(0) |
|
|
|
|
|
|
|
|
if results: |
|
|
st.markdown("---") |
|
|
st.markdown("### Quantitative results") |
|
|
|
|
|
df = pd.DataFrame( |
|
|
[ |
|
|
{ |
|
|
"Image": r["Imagem"], |
|
|
"Segmented Area (px²)": ( |
|
|
r["Área Segmentada (px²)"] |
|
|
if (not r["SemSegmentacao"] and r["Área Segmentada (px²)"] is not None) |
|
|
else "No Segmentation" |
|
|
), |
|
|
"Segmented Area (µm²)": ( |
|
|
f"{r['Área Segmentada (µm²)']:.2f}" |
|
|
if (not r["SemSegmentacao"] and r["Área Segmentada (µm²)"] is not None) |
|
|
else "" |
|
|
), |
|
|
} |
|
|
for r in results |
|
|
] |
|
|
) |
|
|
|
|
|
st.dataframe(df, use_container_width=True) |
|
|
|
|
|
excel_buffer = BytesIO() |
|
|
df.to_excel(excel_buffer, index=False) |
|
|
excel_buffer.seek(0) |
|
|
|
|
|
st.markdown("### Export results") |
|
|
c1, c2 = st.columns(2) |
|
|
with c1: |
|
|
st.download_button( |
|
|
"Download table (Excel)", |
|
|
data=excel_buffer, |
|
|
file_name="segmentation_results.xlsx", |
|
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
|
use_container_width=True, |
|
|
) |
|
|
with c2: |
|
|
st.download_button( |
|
|
"Download segmented images (ZIP)", |
|
|
data=zip_images_buffer, |
|
|
file_name="segmented_images.zip", |
|
|
mime="application/zip", |
|
|
use_container_width=True, |
|
|
) |