jacopo22295's picture
Update app.py
0230ec4 verified
import os
import io
import time
import traceback
from typing import Optional, Tuple
import gradio as gr
from PIL import Image
import torch
import torchvision.transforms as T
import torchvision.models as models
try:
from openai import OpenAI
except Exception:
OpenAI = None # se il pacchetto non c'è, non esplodiamo
import spaces # ZeroGPU decorator
# ======================
# Config / Model / Classes
# ======================
MODEL_PATH = "resnet50-corrosion-classifier-v1.pth"
IDX2LABEL = {
0: "crevice_corrosion",
1: "erosion_corrosion",
2: "galvanic_corrosion",
3: "mic_corrosion",
4: "no_corrosion",
5: "pitting_corrosion",
6: "stress_corrosion",
7: "under_insulation_corrosion",
8: "uniform_corrosion",
}
ZONES = [
"Below waterline (hull/AF area)",
"Waterline / Splash zone",
"Topsides / Boot-top",
"Deck / Weather deck",
"Superstructure / Accommodation",
"Ballast tanks (immersed)",
"Cargo holds / Dry bulk",
"Engine room / Hot surfaces",
"Pipes / Under insulation (UIC/CUI)",
"Other / Not sure",
]
# ======================
# Model load (CPU default)
# ======================
def load_model_cpu():
m = models.resnet50(weights=None)
num_ftrs = m.fc.in_features
m.fc = torch.nn.Linear(num_ftrs, len(IDX2LABEL))
sd = torch.load(MODEL_PATH, map_location="cpu")
m.load_state_dict(sd)
m.eval()
return m
print("[BOOT] Loading model on CPU…")
model_cpu = load_model_cpu()
transform = T.Compose([
T.Resize(256),
T.CenterCrop(224),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# ======================
# OpenAI Assistant (optional)
# ======================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
ASSISTANT_ID = os.environ.get("PPG_ASSISTANT_ID", "asst_20DNMEENkfBsYupFjPCwfijZ")
VECTOR_STORE_ID = os.environ.get("PPG_VECTOR_STORE_ID", "")
APP_FORCE_LANG = os.environ.get("APP_FORCE_LANG", "").strip()
client = None
assistant_enabled = False
if OPENAI_API_KEY and OpenAI is not None:
try:
client = OpenAI(api_key=OPENAI_API_KEY)
assistant_enabled = True
print("[BOOT] OpenAI client initialized.")
except Exception as e:
print("[BOOT][WARN] OpenAI init failed:", e)
def _assistant_safe() -> bool:
return bool(assistant_enabled and client is not None and ASSISTANT_ID)
# ======================
# Inference utils (CPU/GPU)
# ======================
def predict_on_cpu(img_pil: Image.Image) -> Tuple[str, float]:
x = transform(img_pil.convert("RGB")).unsqueeze(0)
with torch.no_grad():
logits = model_cpu(x)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
idx = int(probs.argmax())
return IDX2LABEL.get(idx, f"class_{idx}"), float(probs[idx])
@spaces.GPU(duration=60)
def predict_on_gpu(img_pil: Image.Image) -> Tuple[str, float]:
device = "cuda"
m = models.resnet50(weights=None)
num_ftrs = m.fc.in_features
m.fc = torch.nn.Linear(num_ftrs, len(IDX2LABEL))
sd = torch.load(MODEL_PATH, map_location=device)
m.load_state_dict(sd)
m.eval().to(device)
x = transform(img_pil.convert("RGB")).unsqueeze(0).to(device)
with torch.no_grad():
logits = m(x)
probs = torch.softmax(logits, dim=1).detach().cpu().numpy()[0]
idx = int(probs.argmax())
return IDX2LABEL.get(idx, f"class_{idx}"), float(probs[idx])
def predict_image(image: Image.Image) -> Tuple[str, float]:
try:
if torch.cuda.is_available():
return predict_on_gpu(image)
except Exception as e:
print("[GPU][WARN] Falling back to CPU:", e)
return predict_on_cpu(image)
# ======================
# Helper: upload immagine a OpenAI con estensione valida
# ======================
def _upload_image_file(image: Image.Image):
"""
Carica l'immagine su OpenAI Files con un nome file valido (.png)
così l'Assistants API accetta 'image_file'. Ritorna file_id.
"""
buf = io.BytesIO()
image.convert("RGB").save(buf, format="PNG")
buf.seek(0)
setattr(buf, "name", "upload.png") # indispensabile: l'API deduce il tipo dall'estensione
uploaded = client.files.create(file=buf, purpose="assistants")
return uploaded.id
# ======================
# Local fallback guidance (offline)
# ======================
def _compose_local_guidance(label: str, zone: str, note: str, conf: float) -> str:
z = zone or "Not specified"
n = note.strip() if note else ""
conf_pct = round(conf * 100, 2)
zone_tips = {
"Below waterline (hull/AF area)": [
"Rimuovere biofouling; high-pressure wash ≥ 250 bar.",
"Ispezionare blister/pitting; spot-blast Sa 2½ dove necessario.",
"Sistema tipico: tie-coat + AF compatibile (SPC o foul-release)."
],
"Waterline / Splash zone": [
"Cicli resistenti a immersione intermittente e impatto.",
"Minimo St 3 / SP 11; meglio Sa 2½ su aree estese.",
"Sigillare bordi; attenzione a UV e spruzzi salini."
],
"Deck / Weather deck": [
"Sgrassaggio e desalting; rimuovere contaminanti.",
"Primer/barriera + poliuretanico; antiscivolo dove richiesto.",
"Controllo DFT e drenaggi per evitare ristagni."
],
"Pipes / Under insulation (UIC/CUI)": [
"Rimuovere isolamento bagnato; ispezione completa dei punti caldi.",
"Cicli dedicati CUI e alta temperatura se serve.",
"Sigillare penetrazioni e clamp; ripristino isolamento a regola d’arte."
],
"Engine room / Hot surfaces": [
"Rigorosa rimozione di olio e fuel; solvent wipe.",
"Cicli high-temp/alluminizzati secondo range operativo.",
"Compatibilità con substrati caldi e cicli termici."
],
"Ballast tanks (immersed)": [
"Lavaggio completo; test sali residui (≤ 50 mg/m²).",
"Cicli certificati per immersione; stripe coat su saldature.",
"Controllo tempi ricopertura e dew point."
],
}
class_tips = {
"no_corrosion": [
"Mantenimento leggero; evitare overcoating inutile.",
"Pulizia + finitura protettiva se richiesto (UV/AF/CUI)."
],
"uniform_corrosion": [
"Rimozione ruggine generalizzata (SP 11 o Sa 2½).",
"Barriera epossidica ad alto spessore; controlli DFT."
],
"pitting_corrosion": [
"Aprire/pulire i pit fino a metallo sano; filler epossidico.",
"Barriera ad alto spessore, stripe coat accurato, poi finitura."
],
"crevice_corrosion": [
"Sigillare giunzioni e fessure; evitare ristagni.",
"Epossidici tolleranti + sigillature elastiche su punti critici."
],
"galvanic_corrosion": [
"Isolare elettricamente accoppiamenti galvanici.",
"Primer barriera dielettrico; controllare anodi/masse."
],
"erosion_corrosion": [
"Ridurre turbolenze; smussare bordi e raccordi.",
"Epossidici spessi o epossi-novolac in alto flusso."
],
"stress_corrosion": [
"Verificare carichi e microfessure; NDT se critico.",
"Sistemi con resilienza; evitare cicli troppo rigidi."
],
"mic_corrosion": [
"Bonifica biologica dove ammesso; rimuovere deposito organico.",
"Cicli a bassa permeabilità; evitare stagnazione/nutrienti."
],
"under_insulation_corrosion": [
"Rimuovere isolamento; asciugare; cicli CUI dedicati.",
"Sigillatura e ripristino corretti per prevenire ingressi futuri."
],
}
z_lines = zone_tips.get(z, [
"Pulizia accurata; rimozione contaminanti.",
"Selezionare ciclo coerente con esposizione (immersione, splash, UV, temperatura).",
"Verificare compatibilità con il vecchio sistema prima di overcoating."
])
lkey = label if label in class_tips else "uniform_corrosion"
c_lines = class_tips.get(lkey, class_tips["uniform_corrosion"])
md = []
md.append("### Local quick guidance (offline)")
md.append(f"**Detected class**: `{label}` ({conf_pct}%). **Zone**: {z}.")
if n:
md.append(f"**User note**: {n}")
md.append("\n**Diagnosis (generic):**")
if label == "no_corrosion":
md.append("- Nessuna evidenza di corrosione. Prevenzione/mantenimento consigliati.")
else:
md.append("- Presunta corrosione coerente con la classe rilevata; verificare estensione a bordo.")
md.append("\n**Surface Preparation (minimum):**")
md += [f"- {line}" for line in z_lines[:2]]
md.append("- Rimuovere ruggine non aderente e contaminanti; test sali se area immersa/splash.")
md.append("\n**Indicative System (generic, non-binding):**")
md += [f"- {line}" for line in c_lines[:2]]
md.append("- Verificare tempi di ricopertura e compatibilità con vecchi strati.")
md.append("\n**Notes:**")
md.append("- Stripe coat su spigoli/saldature; controllare DFT con misure reali.")
md.append("- Adattare il ciclo a specifiche PPG ufficiali e condizioni a bordo.")
md.append("\n**Disclaimer:**")
md.append("> Indicazioni generiche e non sostitutive delle specifiche PPG o di una ispezione tecnica.")
return "\n".join(md)
# ======================
# Assistant calls (image + istruzioni foto-prioritarie)
# ======================
def call_assistant(
label: str,
confidence: float,
zone: str,
note: str,
user_question: str,
image: Optional[Image.Image],
thread_id: Optional[str] = None,
max_wait_s: int = 45,
) -> Tuple[str, str]:
"""
Ritorna (reply_text, thread_id). Non lancia eccezioni fuori.
"""
if not _assistant_safe():
return ("[Assistant disabled] Model classification shown above.", thread_id or "")
try:
# crea thread se serve
if not thread_id:
if VECTOR_STORE_ID:
thread = client.beta.threads.create(
tool_resources={"file_search": {"vector_store_ids": [VECTOR_STORE_ID]}}
)
else:
thread = client.beta.threads.create()
thread_id = thread.id
# testo di contesto
core_context = (
f"Classification: {label} ({round(confidence*100,2)}%).\n"
f"Zone: {zone or 'Not specified'}.\n"
f"User note: {note or '(none)'}.\n"
)
user_payload = core_context + "\nUser question:\n" + (user_question or "Provide initial advisory.")
# content: 'text' + (opzionale) 'image_file'
content = [{"type": "text", "text": user_payload}]
if image is not None:
try:
file_id = _upload_image_file(image)
content.append({"type": "image_file", "image_file": {"file_id": file_id}})
except Exception as e_up:
print("[Assistant][WARN] Image upload failed, proceeding text-only:", e_up)
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=content
)
# ISTRUZIONI: immagine prioritaria, commento visivo obbligatorio
second_lang_clause = (
f"Then provide the same content in {APP_FORCE_LANG}."
if APP_FORCE_LANG else
"Then repeat in the user's language if detectable from note; else in Italian."
)
extra_instructions = (
"Act as a PPG marine coatings technical specialist for ships (marine environments only). "
"Priority order for sources: (1) the ATTACHED IMAGE(S) in this thread; (2) the attached documents via File Search; "
"if neither provides sufficient detail, say 'Not in docs'. "
"You MUST explicitly comment on what you see in the photo (corrosion features, morphology, likely mechanisms) "
"and you MAY contradict the classifier result if the image evidence disagrees; explain why. "
"ALWAYS request the zone if missing before prescribing. "
"Structure your output with headings: Diagnosis; Surface Preparation; System; Notes; Disclaimer. "
"Provide first in English. " + second_lang_clause
)
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=ASSISTANT_ID,
instructions=extra_instructions,
)
# polling con timeout
t0 = time.time()
while True:
r = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id)
if r.status in ["completed", "failed", "cancelled", "expired"]:
break
if time.time() - t0 > max_wait_s:
print("[Assistant][WARN] Timeout waiting run.")
break
time.sleep(0.7)
msgs = client.beta.threads.messages.list(thread_id=thread_id)
reply = None
for m in msgs.data:
if m.role == "assistant":
for part in m.content:
if getattr(part, "type", "") == "text":
reply = part.text.value
break
if reply:
break
if not reply:
reply = "[Assistant] No reply received."
return reply, thread_id
except Exception as e:
print("[Assistant][ERROR]", e)
traceback.print_exc()
return ("[Assistant error] " + str(e) + "\nProceed using model result only.", thread_id or "")
# ======================
# Pipelines (generator)
# ======================
def run_analysis(image, note, zone, chat_history, thread_state):
prog = gr.Progress()
try:
prog(0.03, desc="Checking input")
if not zone or zone == "Other / Not sure":
yield "**Please select the area/zone first.**", chat_history, thread_state
return
# Anche senza immagine procedo (fallback locale+assistant)
yield "**Analyzing...** Please wait.", chat_history, thread_state
label, conf = ("no_corrosion", 0.0)
if image is not None:
prog(0.18, desc="Preprocessing")
time.sleep(0.03)
prog(0.50, desc="Classifying (ResNet50)")
label, conf = predict_image(image)
else:
prog(0.18, desc="No image, text-only advisory")
prog(0.72, desc="Analysis: consulting assistant")
reply, thread_id = call_assistant(
label=label,
confidence=conf,
zone=zone,
note=note or "",
user_question="Provide initial advisory.",
image=image, # può essere None
thread_id=(thread_state or {}).get("thread_id")
)
header = f"**Model result:** `{label}` — confidence **{round(conf*100,2)}%**\n\n"
if image is not None:
header += "_Assistant has also inspected the attached image._\n\n"
# Se l'assistant ha dato errore/disabled/nessuna risposta, uso fallback locale
assistant_bad = (
not reply or
reply.startswith("[Assistant disabled]") or
reply.startswith("[Assistant error]") or
reply.startswith("[Assistant] No reply")
)
if assistant_bad:
local_md = _compose_local_guidance(label, zone, note or "", conf)
out_text = header + local_md
else:
out_text = header + reply
new_history = (chat_history[:] if chat_history else [])
new_history.append(("", out_text if assistant_bad else reply))
prog(1.0, desc="Done")
yield out_text, new_history, {
"thread_id": thread_id,
"label": label,
"confidence": conf,
"zone": zone or "",
}
except Exception as e:
print("[Pipeline][ERROR]", e)
traceback.print_exc()
err = f"**Error during analysis**:\n```\n{e}\n```"
try:
fallback = _compose_local_guidance("uniform_corrosion", zone or "", note or "", 0.0)
yield fallback, chat_history, thread_state or {}
except Exception:
yield err, chat_history, thread_state or {}
def continue_chat(user_msg, chat_history, thread_state, note, zone):
if not user_msg or not user_msg.strip():
return chat_history, ""
prog = gr.Progress()
try:
prog(0.2, desc="Sending")
label = (thread_state or {}).get("label") or "unknown"
conf = (thread_state or {}).get("confidence") or 0.0
current_zone = zone or (thread_state or {}).get("zone") or "Not specified"
thread_id = (thread_state or {}).get("thread_id")
# Ricorda all'assistant che c'è un'immagine nel thread (senza ricaricarla)
pref = ""
if thread_id:
pref = ("[Context: an image was attached in this thread during the last analysis. "
"If relevant, reference your visual observations.]\n")
prog(0.7, desc="Chat: contacting assistant")
reply, thread_id = call_assistant(
label=label,
confidence=conf,
zone=current_zone,
note=note or "",
user_question=pref + user_msg,
image=None, # la thread ha già l'immagine
thread_id=thread_id
)
# Se l'assistant fa cilecca, risposta locale contextual
assistant_bad = (
not reply or
reply.startswith("[Assistant disabled]") or
reply.startswith("[Assistant error]") or
reply.startswith("[Assistant] No reply")
)
if assistant_bad:
reply = _compose_local_guidance(label or "uniform_corrosion", current_zone, note or "", conf)
chat_history = chat_history or []
chat_history.append((user_msg, reply))
if isinstance(thread_state, dict):
thread_state["thread_id"] = thread_id
prog(1.0, desc="Done")
return chat_history, ""
except Exception as e:
print("[Chat][ERROR]", e)
traceback.print_exc()
chat_history = chat_history or []
chat_history.append((user_msg, f"[Error] {e}"))
return chat_history, ""
# ======================
# UI
# ======================
WELCOME = """
# Corrosion Assistant — Beta
**Welcome!**
This model is trained for educational purpose only. Some classes still weak (crevice, galvanic).
**Disclaimer**: research & experimental only. Validate with official PPG specs.
"""
LOADER_HTML = """
<div id="overlay-mask" style="
position: fixed; inset: 0; background: rgba(0,0,0,0.55);
display: flex; align-items: center; justify-content: center;
z-index: 9999; backdrop-filter: blur(2px);
">
<div style="background:#111; color:#fff; padding:24px 28px; border-radius:16px;
font-family: ui-sans-serif, system-ui, -apple-system; text-align:center;
box-shadow: 0 10px 30px rgba(0,0,0,0.5); max-width: 360px;">
<div class="spinner" style="
width:48px;height:48px;border:4px solid #444;border-top-color:#fff;border-radius:50%;
margin:0 auto 14px; animation: spin 0.9s linear infinite;"></div>
<div style="font-size:16px; font-weight:700;">Elaborazione in corso…</div>
<div style="opacity:0.9; font-size:12px; margin-top:6px;">Potrebbe richiedere alcuni secondi.</div>
</div>
</div>
<style>@keyframes spin { to { transform: rotate(360deg); } }</style>
"""
def _show_overlay_and_busy():
return gr.update(visible=True), gr.update(interactive=False, value="🔄 Analyzing…")
def _hide_overlay_and_idle():
return gr.update(visible=False), gr.update(interactive=True, value="Analyze image")
with gr.Blocks(title="Corrosion Assistant", theme=gr.themes.Soft()) as demo:
gr.Markdown(WELCOME)
overlay = gr.HTML(LOADER_HTML, visible=False)
with gr.Row():
with gr.Column(scale=2):
img = gr.Image(type="pil", sources=["upload", "webcam"], label="Upload or webcam")
note = gr.Textbox(label="Notes / Context (optional)")
zone = gr.Dropdown(choices=ZONES, label="Zone (indicative)", value="Other / Not sure")
analyze_btn = gr.Button("Analyze image", variant="primary")
with gr.Column(scale=3):
out_md = gr.Markdown(label="Analysis")
gr.Markdown("### Continue the conversation with the PPG Assistant")
with gr.Row():
with gr.Column(scale=3):
chat = gr.Chatbot(height=320, label="Advisor chat", type="tuples")
chat_in = gr.Textbox(label="Your message")
send_btn = gr.Button("Send")
clear_btn = gr.Button("Clear chat")
with gr.Column(scale=2):
gr.Markdown(
"> **Privacy note:** If enabled, the image is sent to OpenAI to allow visual analysis. "
"Disable API key to skip assistant."
)
chat_state = gr.State([])
thread_state = gr.State({"thread_id": None, "label": None, "confidence": 0.0, "zone": ""})
analyze_btn.click(
fn=_show_overlay_and_busy,
inputs=[],
outputs=[overlay, analyze_btn],
show_progress=False
).then(
fn=run_analysis,
inputs=[img, note, zone, chat_state, thread_state],
outputs=[out_md, chat_state, thread_state],
show_progress=True
).then(
fn=_hide_overlay_and_idle,
inputs=[],
outputs=[overlay, analyze_btn],
show_progress=False
).then(
lambda h: h,
inputs=[chat_state],
outputs=[chat],
show_progress=False
)
send_btn.click(
fn=continue_chat,
inputs=[chat_in, chat_state, thread_state, note, zone],
outputs=[chat, chat_in],
show_progress=True
)
clear_btn.click(
lambda: ([], ""),
inputs=[],
outputs=[chat, chat_in],
show_progress=False
)
demo.api_mode = "enabled"
if __name__ == "__main__":
demo.launch()