ashtii's picture
Update app.py
cddf9bb verified
# app.py
import gradio as gr
import joblib
import os, requests, json
import numpy as np
import pandas as pd
from difflib import get_close_matches, SequenceMatcher
from scipy.sparse import hstack, csr_matrix
# ---- CONFIG ----
HF_REPO = "ashtii/cosmetic-category-model" # your HF repo with model + vectorizers + optional labels/ingredients
BASE_URL = f"https://huggingface.co/{HF_REPO}/resolve/main/"
# filenames we expect in the repo
MODEL_FNAME = "model.joblib"
LABELS_FNAME = "labels.json" # optional: list of class names in order
ING_CSV_CANDIDATES = [
"ingredients.csv",
"final_ingridients_dataset.csv",
"final_ingridients_dataset - Sheet1.csv",
"final ingridients dataset - Sheet1.csv"
]
VECT_FILES = ["char_vect.joblib","word_vect.joblib","vect_f.joblib","char_vect_cat.joblib","word_vect_cat.joblib"]
WORKDIR = "modelrepo"
os.makedirs(WORKDIR, exist_ok=True)
# ---- helper: download file from HF repo if exists ----
def try_download(fname):
url = BASE_URL + fname
save_path = os.path.join(WORKDIR, fname)
try:
r = requests.get(url, timeout=20)
if r.status_code == 200 and r.content:
with open(save_path, "wb") as fh:
fh.write(r.content)
return save_path
except Exception:
pass
return None
# download model + vectorizers + labels + ingredients if available
print("Downloading model & assets (best-effort)...")
try_download(MODEL_FNAME)
for vf in VECT_FILES:
try_download(vf)
try_download(LABELS_FNAME)
ing_path = None
for cand in ING_CSV_CANDIDATES:
p = try_download(cand)
if p:
ing_path = p
break
# ---- load model ----
if not os.path.exists(os.path.join(WORKDIR, MODEL_FNAME)):
raise RuntimeError(f"Model file not found in repo. Please add {MODEL_FNAME} to {HF_REPO}.")
model = joblib.load(os.path.join(WORKDIR, MODEL_FNAME))
print("Loaded model:", type(model))
# get class labels from model if possible, else from labels.json
CLASS_LABELS = None
try:
if hasattr(model, "classes_"):
CLASS_LABELS = list(map(str, model.classes_.tolist()))
except Exception:
CLASS_LABELS = None
if CLASS_LABELS is None and os.path.exists(os.path.join(WORKDIR, LABELS_FNAME)):
try:
CLASS_LABELS = json.load(open(os.path.join(WORKDIR, LABELS_FNAME), "r"))
except Exception:
CLASS_LABELS = None
# ---- load available vectorizers (order matters) ----
vectorizers = []
for name in VECT_FILES:
p = os.path.join(WORKDIR, name)
if os.path.exists(p):
try:
v = joblib.load(p)
vectorizers.append((name, v))
print("Loaded vectorizer:", name, type(v))
except Exception as e:
print("Failed load vectorizer", name, e)
# ---- load ingredients CSV (if available) ----
ING_DF = None
if ing_path and os.path.exists(ing_path):
try:
ING_DF = pd.read_csv(ing_path)
# normalize column names to lower-case trimmed
ING_DF.columns = [c.strip() for c in ING_DF.columns]
print("Loaded ingredients CSV:", ing_path, "columns:", ING_DF.columns.tolist())
except Exception as e:
print("Failed to load ingredients CSV:", e)
else:
print("No ingredients CSV found in repo. Upload a CSV named ingredients.csv with columns like Ingredient, Function, Benefits, Harmfulness.")
# ---- helpers for ingredient matching & normalization ----
def normalize_ingredient(s):
if not isinstance(s, str):
return ""
s = s.lower().strip()
# remove common parentheses content and extra punctuation
import re
s = re.sub(r"\([^)]*\)", "", s)
s = re.sub(r"[^a-z0-9\s%/.,-]", " ", s)
s = " ".join(s.split())
return s
def fuzzy_best_match(name, choices, cutoff=0.6):
"""Return (best_match, score) using SequenceMatcher ratio; or (None,0)"""
if not choices:
return None, 0.0
best = None
best_score = 0.0
for c in choices:
score = SequenceMatcher(None, name, c).ratio()
if score > best_score:
best_score = score
best = c
if best_score >= cutoff:
return best, best_score
return best, best_score # return best even if below cutoff
# get choices from ING_DF
ING_CHOICES = []
if ING_DF is not None and "Ingredient" in ING_DF.columns:
# use original names
ING_CHOICES = [str(x).strip().lower() for x in ING_DF["Ingredient"].astype(str).tolist()]
else:
# if Ingredient column not present, try first column
if ING_DF is not None and len(ING_DF.columns) > 0:
col0 = ING_DF.columns[0]
ING_CHOICES = [str(x).strip().lower() for x in ING_DF[col0].astype(str).tolist()]
# ---- helper to build feature vector consistent with model ----
def build_feature_matrix(texts):
"""
texts: list[str]
returns sparse matrix compatible with model (pads/trims to n_features_in_ if needed)
"""
mats = []
for name, v in vectorizers:
try:
mats.append(v.transform(texts))
except Exception as e:
# if transform fails, try transform on cleaned strings
try:
mats.append(v.transform([normalize_ingredient(t) for t in texts]))
except Exception:
pass
if not mats:
return None
try:
X = hstack(mats).tocsr()
except Exception:
mats2 = [csr_matrix(m) if not hasattr(m, "tocsr") else m.tocsr() for m in mats]
X = hstack(mats2).tocsr()
# pad or trim to model.n_features_in_ if available
n_expected = getattr(model, "n_features_in_", None)
if n_expected is not None:
cur = X.shape[1]
if cur < n_expected:
pad = csr_matrix((X.shape[0], n_expected - cur), dtype=X.dtype)
X = hstack([X, pad]).tocsr()
elif cur > n_expected:
X = X[:, :n_expected]
return X
# ---- main predict + ingredient analysis function ----
def analyze_and_predict(raw_text: str):
try:
# 1) category prediction
texts = [raw_text]
X = build_feature_matrix(texts)
category_result = None
if X is None:
# try direct predict (if model can accept raw text)
try:
if hasattr(model, "predict_proba"):
probs = model.predict_proba(texts)[0].tolist()
else:
pred = model.predict(texts).tolist()
probs = [float(pred[0])]
except Exception as e:
category_result = {"error": "Model cannot run (missing vectorizers). " + str(e)}
probs = None
else:
if hasattr(model, "predict_proba"):
probs = model.predict_proba(X)[0].tolist()
else:
pred = model.predict(X).tolist()
# still make it list-of-probs-like
probs = [float(x) for x in pred]
if probs is not None:
# map to labels if available, else use indices
if CLASS_LABELS:
label_idx = int(np.argmax(probs))
label_name = CLASS_LABELS[label_idx] if label_idx < len(CLASS_LABELS) else str(label_idx)
else:
label_idx = int(np.argmax(probs))
label_name = str(label_idx)
category_result = {
"label": label_name,
"label_index": int(label_idx),
"probabilities": probs,
"classes": CLASS_LABELS or [str(i) for i in range(len(probs))]
}
# 2) ingredient analysis: split input by commas and newlines
# basic splitting — you can improve for multi-word separators
raw_items = [i.strip() for i in raw_text.replace("\n", ",").split(",") if i.strip()]
analyses = []
for item in raw_items:
norm = normalize_ingredient(item)
best_match, score = fuzzy_best_match(norm, ING_CHOICES, cutoff=0.0)
row = None
if best_match and ING_DF is not None:
# find first row with that ingredient (match lowercase)
mask = ING_DF.apply(lambda r: str(r.astype(str).tolist()).lower().find(best_match) >= 0, axis=1)
# safer: try find exact match in Ingredient column
if "Ingredient" in ING_DF.columns:
matches = ING_DF[ING_DF["Ingredient"].astype(str).str.strip().str.lower() == best_match]
if len(matches) == 0:
# fallback to fuzzy first hit
matches = ING_DF[ING_DF.apply(lambda row: best_match in str(row.values).lower(), axis=1)]
else:
matches = ING_DF[ING_DF.apply(lambda row: best_match in str(row.values).lower(), axis=1)]
if len(matches) > 0:
row = matches.iloc[0]
# build analysis dict
analysis = {
"input": item,
"normalized": norm,
"matched": best_match,
"match_score": float(score)
}
if row is not None:
# add known fields if present
for col in ING_DF.columns:
try:
analysis[col] = row[col] if pd.notna(row[col]) else None
except Exception:
analysis[col] = None
analyses.append(analysis)
# final JSON
return {"category": category_result, "ingredients": analyses}
except Exception as e:
return {"error": str(e)}
# ---- Gradio interface ----
def api_predict(text):
# Gradio passes raw string; return JSON-like structure
return analyze_and_predict(text)
title = "Category + Ingredient Analysis"
desc = "Paste product ingredient string (comma separated). Returns predicted category and per-ingredient analysis."
iface = gr.Interface(fn=api_predict,
inputs=gr.Textbox(lines=3, placeholder="Aqua, Glycerin, Aloe vera, ..."),
outputs="json",
title=title, description=desc)
iface.launch()