3SPoke / app.py
Glainez's picture
Upload folder using huggingface_hub
3cf31a7 verified
import streamlit as st
import pandas as pd
import requests
import time
import textwrap
import os
import glob
API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:8000")
CLIENT_ID = os.environ.get("CLIENT_ID", "DEMO_CLIENT_001")
APP_PASSWORD = os.environ.get("APP_PASSWORD", "s3poke") # Imposta questa nei Segreti di HF Space
st.set_page_config(page_title="Rilevatore di Ingredienti", layout="wide")
# CSS moderno e professionale con il font Inter
st.markdown("""
<style>
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap');
html, body, [class*="css"] {
font-family: 'Inter', sans-serif;
}
.title-container {
padding-bottom: 1rem;
margin-bottom: 2rem;
border-bottom: 1px solid #eaeaea;
}
.prediction-box {
padding: 16px;
margin-bottom: 12px;
border-radius: 6px;
border-left: 4px solid #2563eb;
background-color: #f8fafc;
color: #1e293b;
animation: fadeIn 0.5s ease-out;
box-shadow: 0 1px 3px rgba(0,0,0,0.05);
}
.prediction-title {
font-size: 1.1em;
font-weight: 600;
color: #0f172a;
}
.prediction-score {
float: right;
color: #2563eb;
font-weight: 600;
font-size: 1.1em;
}
.prediction-desc {
font-size: 0.9em;
color: #64748b;
margin-top: 4px;
display: block;
}
@keyframes fadeIn {
0% { opacity: 0; transform: translateY(5px); }
100% { opacity: 1; transform: translateY(0); }
}
</style>
""", unsafe_allow_html=True)
# Logica di autenticazione
def check_password():
if not APP_PASSWORD:
return True
if "password_correct" not in st.session_state:
st.session_state["password_correct"] = False
if not st.session_state["password_correct"]:
st.markdown('<div class="title-container"><h1>Accesso Rilevatore di Ingredienti</h1></div>', unsafe_allow_html=True)
st.info("Questo spazio è privato. Inserisci la password di accesso.")
password = st.text_input("Password", type="password")
if st.button("Accedi"):
if password == APP_PASSWORD:
st.session_state["password_correct"] = True
st.rerun()
else:
st.error("Password errata.")
return False
return True
if not check_password():
st.stop()
def _do_configure_api():
config_dir = os.path.join(os.path.dirname(__file__), "csv_config")
csv_files = glob.glob(os.path.join(config_dir, "*.csv"))
if not csv_files:
return {"status": "error", "message": "Nessun CSV di configurazione trovato "}
try:
df = pd.read_csv(csv_files[0])
required_cols = {"PRODUCT_ID", "DESCRIPTION", "FAMILY", "COMPONENT", "EXTRA DESCRIPTION"}
if not required_cols.issubset(set(df.columns)):
return {"status": "error", "message": "Colonne richieste mancanti nel CSV."}
mask = (df["COMPONENT"].str.lower() == "ingredient") | (df["COMPONENT"].str.lower().str.startswith("proteine"))
df_filtered = df[mask].copy()
df_filtered["id"] = df_filtered["PRODUCT_ID"].astype(str)
df_filtered["description"] = df_filtered["DESCRIPTION"].astype(str)
df_filtered["FAMILY"] = df_filtered["FAMILY"].fillna("")
df_filtered["EXTRA DESCRIPTION"] = df_filtered["EXTRA DESCRIPTION"].fillna("")
df_filtered["extra_description"] = df_filtered["COMPONENT"].astype(str) + " - " + df_filtered["EXTRA DESCRIPTION"].astype(str)
products = df_filtered[["id", "description", "extra_description"]].to_dict(orient="records")
payload = {
"client_id": CLIENT_ID,
"context": "Rilevamento degli ingredienti all'interno delle Poke Bowl in Italia.",
"products": products
}
resp = requests.post(f"{API_BASE_URL}/configure", json=payload)
if resp.status_code == 200:
return {"status": "success", "message": "API configurata con successo."}
else:
return {"status": "error", "message": f"Errore API {resp.status_code}: {resp.text}"}
except Exception as e:
return {"status": "error", "message": f"Impossibile configurare l'API: {e}"}
@st.cache_resource(show_spinner=False)
def configure_api():
return _do_configure_api()
# Inizializza la configurazione silenziosamente
with st.spinner("Inizializzazione configurazione API..."):
config_status = configure_api()
st.markdown('<div class="title-container"><h1>Rilevatore di Ingredienti</h1></div>', unsafe_allow_html=True)
if config_status["status"] == "error":
st.error(f"Errore di Inizializzazione: {config_status['message']}")
st.info("Controlla i file di configurazione e la connessione API.")
st.markdown("### Analisi dell'Immagine")
st.markdown("Carica l'immagine di una bowl o seleziona un'immagine di test per identificarne automaticamente gli ingredienti.")
# Cerca la cartella delle immagini di test (demo/test_images o ../tests)
possible_test_dirs = [
os.path.join(os.path.dirname(__file__), "test_images"),
os.path.join(os.path.dirname(__file__), "..", "tests")
]
test_images_dir = next((d for d in possible_test_dirs if os.path.exists(d)), None)
image_to_analyze_path = None
uploaded_img = None
if test_images_dir:
test_images = []
for ext in ('*.png', '*.jpg', '*.jpeg', '*.webp'):
test_images.extend(glob.glob(os.path.join(test_images_dir, ext)))
test_images = sorted(test_images)
if test_images:
selection_mode = st.radio("Scegli la sorgente dell'immagine:", ["Carica la tua", "Seleziona un'immagine di test"], horizontal=True)
if selection_mode == "Seleziona un'immagine di test":
img_names = [os.path.basename(p) for p in test_images]
selected_img_name = st.selectbox("Seleziona un'immagine di test:", img_names)
image_to_analyze_path = os.path.join(test_images_dir, selected_img_name)
else:
uploaded_img = st.file_uploader("Carica Immagine", type=["png", "jpg", "jpeg", "webp"], label_visibility="collapsed")
else:
uploaded_img = st.file_uploader("Carica Immagine", type=["png", "jpg", "jpeg", "webp"], label_visibility="collapsed")
if uploaded_img is not None or image_to_analyze_path is not None:
col1, col2 = st.columns([1, 1], gap="large")
with col1:
if uploaded_img is not None:
st.image(uploaded_img, width='stretch')
img_name = uploaded_img.name
img_bytes = uploaded_img.getvalue()
img_type = uploaded_img.type
else:
with open(image_to_analyze_path, "rb") as f:
img_bytes = f.read()
st.image(img_bytes, width='stretch')
img_name = os.path.basename(image_to_analyze_path)
img_type = "image/jpeg" if img_name.lower().endswith(('.jpg', '.jpeg')) else "image/png"
with col2:
if st.button("Identifica Ingredienti", type="primary", width='stretch'):
with st.spinner("Analisi dell'immagine in corso..."):
files = {"image": (img_name, img_bytes, img_type)}
data = {"client_id": CLIENT_ID}
try:
resp = requests.post(f"{API_BASE_URL}/predict", files=files, data=data)
if resp.status_code == 409:
st.info("Configurazione assente in questa istanza. Sincronizzazione in corso... riprova la predizione in automatico.")
conf_resp = _do_configure_api()
if conf_resp.get("status") == "success":
# Ripopolare i file consumati da request.post e riprovare
files = {"image": (img_name, img_bytes, img_type)}
resp = requests.post(f"{API_BASE_URL}/predict", files=files, data=data)
else:
st.error(f"Errore durante l'auto-configurazione: {conf_resp.get('message')}")
if resp.status_code == 200:
predictions = resp.json().get("predictions", [])
if not predictions:
st.info("Nessun ingrediente rilevato nell'immagine.")
else:
st.success("Analisi completata")
result_container = st.empty()
displayed_markdown = "<h4>Componenti Rilevati</h4>"
for pred in predictions:
score_pct = pred.get("score", 0) * 100
desc = pred.get("description") or "Nessun dettaglio aggiuntivo disponibile"
p_id = pred.get("product_id")
item_html = textwrap.dedent(f"""
<div class="prediction-box">
<span class="prediction-title">{p_id}</span>
<span class="prediction-score">{score_pct:.1f}%</span><br>
<span class="prediction-desc">{desc}</span>
</div>
""")
displayed_markdown += item_html
result_container.markdown(displayed_markdown, unsafe_allow_html=True)
time.sleep(0.08) # Elegante ritardo d'animazione
else:
st.error(f"Errore API {resp.status_code}: {resp.text}")
except Exception as e:
st.error(f"Impossibile connettersi all'API: {e}")