| import streamlit as st |
| import pandas as pd |
| import requests |
| import time |
| import textwrap |
| import os |
| import glob |
|
|
| API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:8000") |
| CLIENT_ID = os.environ.get("CLIENT_ID", "DEMO_CLIENT_001") |
| APP_PASSWORD = os.environ.get("APP_PASSWORD", "s3poke") |
|
|
| st.set_page_config(page_title="Rilevatore di Ingredienti", layout="wide") |
|
|
| |
| st.markdown(""" |
| <style> |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap'); |
| |
| html, body, [class*="css"] { |
| font-family: 'Inter', sans-serif; |
| } |
| |
| .title-container { |
| padding-bottom: 1rem; |
| margin-bottom: 2rem; |
| border-bottom: 1px solid #eaeaea; |
| } |
| |
| .prediction-box { |
| padding: 16px; |
| margin-bottom: 12px; |
| border-radius: 6px; |
| border-left: 4px solid #2563eb; |
| background-color: #f8fafc; |
| color: #1e293b; |
| animation: fadeIn 0.5s ease-out; |
| box-shadow: 0 1px 3px rgba(0,0,0,0.05); |
| } |
| |
| .prediction-title { |
| font-size: 1.1em; |
| font-weight: 600; |
| color: #0f172a; |
| } |
| |
| .prediction-score { |
| float: right; |
| color: #2563eb; |
| font-weight: 600; |
| font-size: 1.1em; |
| } |
| |
| .prediction-desc { |
| font-size: 0.9em; |
| color: #64748b; |
| margin-top: 4px; |
| display: block; |
| } |
| |
| @keyframes fadeIn { |
| 0% { opacity: 0; transform: translateY(5px); } |
| 100% { opacity: 1; transform: translateY(0); } |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
| |
| def check_password(): |
| if not APP_PASSWORD: |
| return True |
| |
| if "password_correct" not in st.session_state: |
| st.session_state["password_correct"] = False |
|
|
| if not st.session_state["password_correct"]: |
| st.markdown('<div class="title-container"><h1>Accesso Rilevatore di Ingredienti</h1></div>', unsafe_allow_html=True) |
| st.info("Questo spazio è privato. Inserisci la password di accesso.") |
| password = st.text_input("Password", type="password") |
| if st.button("Accedi"): |
| if password == APP_PASSWORD: |
| st.session_state["password_correct"] = True |
| st.rerun() |
| else: |
| st.error("Password errata.") |
| return False |
| return True |
|
|
| if not check_password(): |
| st.stop() |
|
|
|
|
| def _do_configure_api(): |
| config_dir = os.path.join(os.path.dirname(__file__), "csv_config") |
| csv_files = glob.glob(os.path.join(config_dir, "*.csv")) |
| |
| if not csv_files: |
| return {"status": "error", "message": "Nessun CSV di configurazione trovato "} |
| |
| try: |
| df = pd.read_csv(csv_files[0]) |
| required_cols = {"PRODUCT_ID", "DESCRIPTION", "FAMILY", "COMPONENT", "EXTRA DESCRIPTION"} |
| if not required_cols.issubset(set(df.columns)): |
| return {"status": "error", "message": "Colonne richieste mancanti nel CSV."} |
| |
| mask = (df["COMPONENT"].str.lower() == "ingredient") | (df["COMPONENT"].str.lower().str.startswith("proteine")) |
| df_filtered = df[mask].copy() |
| |
| df_filtered["id"] = df_filtered["PRODUCT_ID"].astype(str) |
| df_filtered["description"] = df_filtered["DESCRIPTION"].astype(str) |
| |
| df_filtered["FAMILY"] = df_filtered["FAMILY"].fillna("") |
| df_filtered["EXTRA DESCRIPTION"] = df_filtered["EXTRA DESCRIPTION"].fillna("") |
| df_filtered["extra_description"] = df_filtered["COMPONENT"].astype(str) + " - " + df_filtered["EXTRA DESCRIPTION"].astype(str) |
| |
| products = df_filtered[["id", "description", "extra_description"]].to_dict(orient="records") |
| |
| payload = { |
| "client_id": CLIENT_ID, |
| "context": "Rilevamento degli ingredienti all'interno delle Poke Bowl in Italia.", |
| "products": products |
| } |
| |
| resp = requests.post(f"{API_BASE_URL}/configure", json=payload) |
| if resp.status_code == 200: |
| return {"status": "success", "message": "API configurata con successo."} |
| else: |
| return {"status": "error", "message": f"Errore API {resp.status_code}: {resp.text}"} |
| except Exception as e: |
| return {"status": "error", "message": f"Impossibile configurare l'API: {e}"} |
|
|
| @st.cache_resource(show_spinner=False) |
| def configure_api(): |
| return _do_configure_api() |
|
|
| |
| with st.spinner("Inizializzazione configurazione API..."): |
| config_status = configure_api() |
|
|
| st.markdown('<div class="title-container"><h1>Rilevatore di Ingredienti</h1></div>', unsafe_allow_html=True) |
|
|
| if config_status["status"] == "error": |
| st.error(f"Errore di Inizializzazione: {config_status['message']}") |
| st.info("Controlla i file di configurazione e la connessione API.") |
|
|
| st.markdown("### Analisi dell'Immagine") |
| st.markdown("Carica l'immagine di una bowl o seleziona un'immagine di test per identificarne automaticamente gli ingredienti.") |
|
|
| |
| possible_test_dirs = [ |
| os.path.join(os.path.dirname(__file__), "test_images"), |
| os.path.join(os.path.dirname(__file__), "..", "tests") |
| ] |
| test_images_dir = next((d for d in possible_test_dirs if os.path.exists(d)), None) |
|
|
| image_to_analyze_path = None |
| uploaded_img = None |
|
|
| if test_images_dir: |
| test_images = [] |
| for ext in ('*.png', '*.jpg', '*.jpeg', '*.webp'): |
| test_images.extend(glob.glob(os.path.join(test_images_dir, ext))) |
| test_images = sorted(test_images) |
|
|
| if test_images: |
| selection_mode = st.radio("Scegli la sorgente dell'immagine:", ["Carica la tua", "Seleziona un'immagine di test"], horizontal=True) |
| if selection_mode == "Seleziona un'immagine di test": |
| img_names = [os.path.basename(p) for p in test_images] |
| selected_img_name = st.selectbox("Seleziona un'immagine di test:", img_names) |
| image_to_analyze_path = os.path.join(test_images_dir, selected_img_name) |
| else: |
| uploaded_img = st.file_uploader("Carica Immagine", type=["png", "jpg", "jpeg", "webp"], label_visibility="collapsed") |
| else: |
| uploaded_img = st.file_uploader("Carica Immagine", type=["png", "jpg", "jpeg", "webp"], label_visibility="collapsed") |
|
|
| if uploaded_img is not None or image_to_analyze_path is not None: |
| col1, col2 = st.columns([1, 1], gap="large") |
| |
| with col1: |
| if uploaded_img is not None: |
| st.image(uploaded_img, width='stretch') |
| img_name = uploaded_img.name |
| img_bytes = uploaded_img.getvalue() |
| img_type = uploaded_img.type |
| else: |
| with open(image_to_analyze_path, "rb") as f: |
| img_bytes = f.read() |
| st.image(img_bytes, width='stretch') |
| img_name = os.path.basename(image_to_analyze_path) |
| img_type = "image/jpeg" if img_name.lower().endswith(('.jpg', '.jpeg')) else "image/png" |
| |
| with col2: |
| if st.button("Identifica Ingredienti", type="primary", width='stretch'): |
| with st.spinner("Analisi dell'immagine in corso..."): |
| files = {"image": (img_name, img_bytes, img_type)} |
| data = {"client_id": CLIENT_ID} |
| try: |
| resp = requests.post(f"{API_BASE_URL}/predict", files=files, data=data) |
| |
| if resp.status_code == 409: |
| st.info("Configurazione assente in questa istanza. Sincronizzazione in corso... riprova la predizione in automatico.") |
| conf_resp = _do_configure_api() |
| if conf_resp.get("status") == "success": |
| |
| files = {"image": (img_name, img_bytes, img_type)} |
| resp = requests.post(f"{API_BASE_URL}/predict", files=files, data=data) |
| else: |
| st.error(f"Errore durante l'auto-configurazione: {conf_resp.get('message')}") |
| |
| if resp.status_code == 200: |
| predictions = resp.json().get("predictions", []) |
| if not predictions: |
| st.info("Nessun ingrediente rilevato nell'immagine.") |
| else: |
| st.success("Analisi completata") |
| |
| result_container = st.empty() |
| displayed_markdown = "<h4>Componenti Rilevati</h4>" |
| |
| for pred in predictions: |
| score_pct = pred.get("score", 0) * 100 |
| desc = pred.get("description") or "Nessun dettaglio aggiuntivo disponibile" |
| p_id = pred.get("product_id") |
| |
| item_html = textwrap.dedent(f""" |
| <div class="prediction-box"> |
| <span class="prediction-title">{p_id}</span> |
| <span class="prediction-score">{score_pct:.1f}%</span><br> |
| <span class="prediction-desc">{desc}</span> |
| </div> |
| """) |
| displayed_markdown += item_html |
|
|
| result_container.markdown(displayed_markdown, unsafe_allow_html=True) |
| time.sleep(0.08) |
| |
| else: |
| st.error(f"Errore API {resp.status_code}: {resp.text}") |
| except Exception as e: |
| st.error(f"Impossibile connettersi all'API: {e}") |
|
|