import os import io import time import traceback from typing import Optional, Tuple import gradio as gr from PIL import Image import torch import torchvision.transforms as T import torchvision.models as models try: from openai import OpenAI except Exception: OpenAI = None # se il pacchetto non c'è, non esplodiamo import spaces # ZeroGPU decorator # ====================== # Config / Model / Classes # ====================== MODEL_PATH = "resnet50-corrosion-classifier-v1.pth" IDX2LABEL = { 0: "crevice_corrosion", 1: "erosion_corrosion", 2: "galvanic_corrosion", 3: "mic_corrosion", 4: "no_corrosion", 5: "pitting_corrosion", 6: "stress_corrosion", 7: "under_insulation_corrosion", 8: "uniform_corrosion", } ZONES = [ "Below waterline (hull/AF area)", "Waterline / Splash zone", "Topsides / Boot-top", "Deck / Weather deck", "Superstructure / Accommodation", "Ballast tanks (immersed)", "Cargo holds / Dry bulk", "Engine room / Hot surfaces", "Pipes / Under insulation (UIC/CUI)", "Other / Not sure", ] # ====================== # Model load (CPU default) # ====================== def load_model_cpu(): m = models.resnet50(weights=None) num_ftrs = m.fc.in_features m.fc = torch.nn.Linear(num_ftrs, len(IDX2LABEL)) sd = torch.load(MODEL_PATH, map_location="cpu") m.load_state_dict(sd) m.eval() return m print("[BOOT] Loading model on CPU…") model_cpu = load_model_cpu() transform = T.Compose([ T.Resize(256), T.CenterCrop(224), T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) # ====================== # OpenAI Assistant (optional) # ====================== OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") ASSISTANT_ID = os.environ.get("PPG_ASSISTANT_ID", "asst_20DNMEENkfBsYupFjPCwfijZ") VECTOR_STORE_ID = os.environ.get("PPG_VECTOR_STORE_ID", "") APP_FORCE_LANG = os.environ.get("APP_FORCE_LANG", "").strip() client = None assistant_enabled = False if OPENAI_API_KEY and OpenAI is not None: try: client = OpenAI(api_key=OPENAI_API_KEY) assistant_enabled = True print("[BOOT] OpenAI client initialized.") except Exception as e: print("[BOOT][WARN] OpenAI init failed:", e) def _assistant_safe() -> bool: return bool(assistant_enabled and client is not None and ASSISTANT_ID) # ====================== # Inference utils (CPU/GPU) # ====================== def predict_on_cpu(img_pil: Image.Image) -> Tuple[str, float]: x = transform(img_pil.convert("RGB")).unsqueeze(0) with torch.no_grad(): logits = model_cpu(x) probs = torch.softmax(logits, dim=1).cpu().numpy()[0] idx = int(probs.argmax()) return IDX2LABEL.get(idx, f"class_{idx}"), float(probs[idx]) @spaces.GPU(duration=60) def predict_on_gpu(img_pil: Image.Image) -> Tuple[str, float]: device = "cuda" m = models.resnet50(weights=None) num_ftrs = m.fc.in_features m.fc = torch.nn.Linear(num_ftrs, len(IDX2LABEL)) sd = torch.load(MODEL_PATH, map_location=device) m.load_state_dict(sd) m.eval().to(device) x = transform(img_pil.convert("RGB")).unsqueeze(0).to(device) with torch.no_grad(): logits = m(x) probs = torch.softmax(logits, dim=1).detach().cpu().numpy()[0] idx = int(probs.argmax()) return IDX2LABEL.get(idx, f"class_{idx}"), float(probs[idx]) def predict_image(image: Image.Image) -> Tuple[str, float]: try: if torch.cuda.is_available(): return predict_on_gpu(image) except Exception as e: print("[GPU][WARN] Falling back to CPU:", e) return predict_on_cpu(image) # ====================== # Helper: upload immagine a OpenAI con estensione valida # ====================== def _upload_image_file(image: Image.Image): """ Carica l'immagine su OpenAI Files con un nome file valido (.png) così l'Assistants API accetta 'image_file'. Ritorna file_id. """ buf = io.BytesIO() image.convert("RGB").save(buf, format="PNG") buf.seek(0) setattr(buf, "name", "upload.png") # indispensabile: l'API deduce il tipo dall'estensione uploaded = client.files.create(file=buf, purpose="assistants") return uploaded.id # ====================== # Local fallback guidance (offline) # ====================== def _compose_local_guidance(label: str, zone: str, note: str, conf: float) -> str: z = zone or "Not specified" n = note.strip() if note else "" conf_pct = round(conf * 100, 2) zone_tips = { "Below waterline (hull/AF area)": [ "Rimuovere biofouling; high-pressure wash ≥ 250 bar.", "Ispezionare blister/pitting; spot-blast Sa 2½ dove necessario.", "Sistema tipico: tie-coat + AF compatibile (SPC o foul-release)." ], "Waterline / Splash zone": [ "Cicli resistenti a immersione intermittente e impatto.", "Minimo St 3 / SP 11; meglio Sa 2½ su aree estese.", "Sigillare bordi; attenzione a UV e spruzzi salini." ], "Deck / Weather deck": [ "Sgrassaggio e desalting; rimuovere contaminanti.", "Primer/barriera + poliuretanico; antiscivolo dove richiesto.", "Controllo DFT e drenaggi per evitare ristagni." ], "Pipes / Under insulation (UIC/CUI)": [ "Rimuovere isolamento bagnato; ispezione completa dei punti caldi.", "Cicli dedicati CUI e alta temperatura se serve.", "Sigillare penetrazioni e clamp; ripristino isolamento a regola d’arte." ], "Engine room / Hot surfaces": [ "Rigorosa rimozione di olio e fuel; solvent wipe.", "Cicli high-temp/alluminizzati secondo range operativo.", "Compatibilità con substrati caldi e cicli termici." ], "Ballast tanks (immersed)": [ "Lavaggio completo; test sali residui (≤ 50 mg/m²).", "Cicli certificati per immersione; stripe coat su saldature.", "Controllo tempi ricopertura e dew point." ], } class_tips = { "no_corrosion": [ "Mantenimento leggero; evitare overcoating inutile.", "Pulizia + finitura protettiva se richiesto (UV/AF/CUI)." ], "uniform_corrosion": [ "Rimozione ruggine generalizzata (SP 11 o Sa 2½).", "Barriera epossidica ad alto spessore; controlli DFT." ], "pitting_corrosion": [ "Aprire/pulire i pit fino a metallo sano; filler epossidico.", "Barriera ad alto spessore, stripe coat accurato, poi finitura." ], "crevice_corrosion": [ "Sigillare giunzioni e fessure; evitare ristagni.", "Epossidici tolleranti + sigillature elastiche su punti critici." ], "galvanic_corrosion": [ "Isolare elettricamente accoppiamenti galvanici.", "Primer barriera dielettrico; controllare anodi/masse." ], "erosion_corrosion": [ "Ridurre turbolenze; smussare bordi e raccordi.", "Epossidici spessi o epossi-novolac in alto flusso." ], "stress_corrosion": [ "Verificare carichi e microfessure; NDT se critico.", "Sistemi con resilienza; evitare cicli troppo rigidi." ], "mic_corrosion": [ "Bonifica biologica dove ammesso; rimuovere deposito organico.", "Cicli a bassa permeabilità; evitare stagnazione/nutrienti." ], "under_insulation_corrosion": [ "Rimuovere isolamento; asciugare; cicli CUI dedicati.", "Sigillatura e ripristino corretti per prevenire ingressi futuri." ], } z_lines = zone_tips.get(z, [ "Pulizia accurata; rimozione contaminanti.", "Selezionare ciclo coerente con esposizione (immersione, splash, UV, temperatura).", "Verificare compatibilità con il vecchio sistema prima di overcoating." ]) lkey = label if label in class_tips else "uniform_corrosion" c_lines = class_tips.get(lkey, class_tips["uniform_corrosion"]) md = [] md.append("### Local quick guidance (offline)") md.append(f"**Detected class**: `{label}` ({conf_pct}%). **Zone**: {z}.") if n: md.append(f"**User note**: {n}") md.append("\n**Diagnosis (generic):**") if label == "no_corrosion": md.append("- Nessuna evidenza di corrosione. Prevenzione/mantenimento consigliati.") else: md.append("- Presunta corrosione coerente con la classe rilevata; verificare estensione a bordo.") md.append("\n**Surface Preparation (minimum):**") md += [f"- {line}" for line in z_lines[:2]] md.append("- Rimuovere ruggine non aderente e contaminanti; test sali se area immersa/splash.") md.append("\n**Indicative System (generic, non-binding):**") md += [f"- {line}" for line in c_lines[:2]] md.append("- Verificare tempi di ricopertura e compatibilità con vecchi strati.") md.append("\n**Notes:**") md.append("- Stripe coat su spigoli/saldature; controllare DFT con misure reali.") md.append("- Adattare il ciclo a specifiche PPG ufficiali e condizioni a bordo.") md.append("\n**Disclaimer:**") md.append("> Indicazioni generiche e non sostitutive delle specifiche PPG o di una ispezione tecnica.") return "\n".join(md) # ====================== # Assistant calls (image + istruzioni foto-prioritarie) # ====================== def call_assistant( label: str, confidence: float, zone: str, note: str, user_question: str, image: Optional[Image.Image], thread_id: Optional[str] = None, max_wait_s: int = 45, ) -> Tuple[str, str]: """ Ritorna (reply_text, thread_id). Non lancia eccezioni fuori. """ if not _assistant_safe(): return ("[Assistant disabled] Model classification shown above.", thread_id or "") try: # crea thread se serve if not thread_id: if VECTOR_STORE_ID: thread = client.beta.threads.create( tool_resources={"file_search": {"vector_store_ids": [VECTOR_STORE_ID]}} ) else: thread = client.beta.threads.create() thread_id = thread.id # testo di contesto core_context = ( f"Classification: {label} ({round(confidence*100,2)}%).\n" f"Zone: {zone or 'Not specified'}.\n" f"User note: {note or '(none)'}.\n" ) user_payload = core_context + "\nUser question:\n" + (user_question or "Provide initial advisory.") # content: 'text' + (opzionale) 'image_file' content = [{"type": "text", "text": user_payload}] if image is not None: try: file_id = _upload_image_file(image) content.append({"type": "image_file", "image_file": {"file_id": file_id}}) except Exception as e_up: print("[Assistant][WARN] Image upload failed, proceeding text-only:", e_up) client.beta.threads.messages.create( thread_id=thread_id, role="user", content=content ) # ISTRUZIONI: immagine prioritaria, commento visivo obbligatorio second_lang_clause = ( f"Then provide the same content in {APP_FORCE_LANG}." if APP_FORCE_LANG else "Then repeat in the user's language if detectable from note; else in Italian." ) extra_instructions = ( "Act as a PPG marine coatings technical specialist for ships (marine environments only). " "Priority order for sources: (1) the ATTACHED IMAGE(S) in this thread; (2) the attached documents via File Search; " "if neither provides sufficient detail, say 'Not in docs'. " "You MUST explicitly comment on what you see in the photo (corrosion features, morphology, likely mechanisms) " "and you MAY contradict the classifier result if the image evidence disagrees; explain why. " "ALWAYS request the zone if missing before prescribing. " "Structure your output with headings: Diagnosis; Surface Preparation; System; Notes; Disclaimer. " "Provide first in English. " + second_lang_clause ) run = client.beta.threads.runs.create( thread_id=thread_id, assistant_id=ASSISTANT_ID, instructions=extra_instructions, ) # polling con timeout t0 = time.time() while True: r = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) if r.status in ["completed", "failed", "cancelled", "expired"]: break if time.time() - t0 > max_wait_s: print("[Assistant][WARN] Timeout waiting run.") break time.sleep(0.7) msgs = client.beta.threads.messages.list(thread_id=thread_id) reply = None for m in msgs.data: if m.role == "assistant": for part in m.content: if getattr(part, "type", "") == "text": reply = part.text.value break if reply: break if not reply: reply = "[Assistant] No reply received." return reply, thread_id except Exception as e: print("[Assistant][ERROR]", e) traceback.print_exc() return ("[Assistant error] " + str(e) + "\nProceed using model result only.", thread_id or "") # ====================== # Pipelines (generator) # ====================== def run_analysis(image, note, zone, chat_history, thread_state): prog = gr.Progress() try: prog(0.03, desc="Checking input") if not zone or zone == "Other / Not sure": yield "**Please select the area/zone first.**", chat_history, thread_state return # Anche senza immagine procedo (fallback locale+assistant) yield "**Analyzing...** Please wait.", chat_history, thread_state label, conf = ("no_corrosion", 0.0) if image is not None: prog(0.18, desc="Preprocessing") time.sleep(0.03) prog(0.50, desc="Classifying (ResNet50)") label, conf = predict_image(image) else: prog(0.18, desc="No image, text-only advisory") prog(0.72, desc="Analysis: consulting assistant") reply, thread_id = call_assistant( label=label, confidence=conf, zone=zone, note=note or "", user_question="Provide initial advisory.", image=image, # può essere None thread_id=(thread_state or {}).get("thread_id") ) header = f"**Model result:** `{label}` — confidence **{round(conf*100,2)}%**\n\n" if image is not None: header += "_Assistant has also inspected the attached image._\n\n" # Se l'assistant ha dato errore/disabled/nessuna risposta, uso fallback locale assistant_bad = ( not reply or reply.startswith("[Assistant disabled]") or reply.startswith("[Assistant error]") or reply.startswith("[Assistant] No reply") ) if assistant_bad: local_md = _compose_local_guidance(label, zone, note or "", conf) out_text = header + local_md else: out_text = header + reply new_history = (chat_history[:] if chat_history else []) new_history.append(("", out_text if assistant_bad else reply)) prog(1.0, desc="Done") yield out_text, new_history, { "thread_id": thread_id, "label": label, "confidence": conf, "zone": zone or "", } except Exception as e: print("[Pipeline][ERROR]", e) traceback.print_exc() err = f"**Error during analysis**:\n```\n{e}\n```" try: fallback = _compose_local_guidance("uniform_corrosion", zone or "", note or "", 0.0) yield fallback, chat_history, thread_state or {} except Exception: yield err, chat_history, thread_state or {} def continue_chat(user_msg, chat_history, thread_state, note, zone): if not user_msg or not user_msg.strip(): return chat_history, "" prog = gr.Progress() try: prog(0.2, desc="Sending") label = (thread_state or {}).get("label") or "unknown" conf = (thread_state or {}).get("confidence") or 0.0 current_zone = zone or (thread_state or {}).get("zone") or "Not specified" thread_id = (thread_state or {}).get("thread_id") # Ricorda all'assistant che c'è un'immagine nel thread (senza ricaricarla) pref = "" if thread_id: pref = ("[Context: an image was attached in this thread during the last analysis. " "If relevant, reference your visual observations.]\n") prog(0.7, desc="Chat: contacting assistant") reply, thread_id = call_assistant( label=label, confidence=conf, zone=current_zone, note=note or "", user_question=pref + user_msg, image=None, # la thread ha già l'immagine thread_id=thread_id ) # Se l'assistant fa cilecca, risposta locale contextual assistant_bad = ( not reply or reply.startswith("[Assistant disabled]") or reply.startswith("[Assistant error]") or reply.startswith("[Assistant] No reply") ) if assistant_bad: reply = _compose_local_guidance(label or "uniform_corrosion", current_zone, note or "", conf) chat_history = chat_history or [] chat_history.append((user_msg, reply)) if isinstance(thread_state, dict): thread_state["thread_id"] = thread_id prog(1.0, desc="Done") return chat_history, "" except Exception as e: print("[Chat][ERROR]", e) traceback.print_exc() chat_history = chat_history or [] chat_history.append((user_msg, f"[Error] {e}")) return chat_history, "" # ====================== # UI # ====================== WELCOME = """ # Corrosion Assistant — Beta **Welcome!** This model is trained for educational purpose only. Some classes still weak (crevice, galvanic). **Disclaimer**: research & experimental only. Validate with official PPG specs. """ LOADER_HTML = """