Spaces:
Runtime error
Runtime error
| # app.py | |
| import gradio as gr | |
| import joblib | |
| import os, requests, json | |
| import numpy as np | |
| import pandas as pd | |
| from difflib import get_close_matches, SequenceMatcher | |
| from scipy.sparse import hstack, csr_matrix | |
| # ---- CONFIG ---- | |
| HF_REPO = "ashtii/cosmetic-category-model" # your HF repo with model + vectorizers + optional labels/ingredients | |
| BASE_URL = f"https://huggingface.co/{HF_REPO}/resolve/main/" | |
| # filenames we expect in the repo | |
| MODEL_FNAME = "model.joblib" | |
| LABELS_FNAME = "labels.json" # optional: list of class names in order | |
| ING_CSV_CANDIDATES = [ | |
| "ingredients.csv", | |
| "final_ingridients_dataset.csv", | |
| "final_ingridients_dataset - Sheet1.csv", | |
| "final ingridients dataset - Sheet1.csv" | |
| ] | |
| VECT_FILES = ["char_vect.joblib","word_vect.joblib","vect_f.joblib","char_vect_cat.joblib","word_vect_cat.joblib"] | |
| WORKDIR = "modelrepo" | |
| os.makedirs(WORKDIR, exist_ok=True) | |
| # ---- helper: download file from HF repo if exists ---- | |
| def try_download(fname): | |
| url = BASE_URL + fname | |
| save_path = os.path.join(WORKDIR, fname) | |
| try: | |
| r = requests.get(url, timeout=20) | |
| if r.status_code == 200 and r.content: | |
| with open(save_path, "wb") as fh: | |
| fh.write(r.content) | |
| return save_path | |
| except Exception: | |
| pass | |
| return None | |
| # download model + vectorizers + labels + ingredients if available | |
| print("Downloading model & assets (best-effort)...") | |
| try_download(MODEL_FNAME) | |
| for vf in VECT_FILES: | |
| try_download(vf) | |
| try_download(LABELS_FNAME) | |
| ing_path = None | |
| for cand in ING_CSV_CANDIDATES: | |
| p = try_download(cand) | |
| if p: | |
| ing_path = p | |
| break | |
| # ---- load model ---- | |
| if not os.path.exists(os.path.join(WORKDIR, MODEL_FNAME)): | |
| raise RuntimeError(f"Model file not found in repo. Please add {MODEL_FNAME} to {HF_REPO}.") | |
| model = joblib.load(os.path.join(WORKDIR, MODEL_FNAME)) | |
| print("Loaded model:", type(model)) | |
| # get class labels from model if possible, else from labels.json | |
| CLASS_LABELS = None | |
| try: | |
| if hasattr(model, "classes_"): | |
| CLASS_LABELS = list(map(str, model.classes_.tolist())) | |
| except Exception: | |
| CLASS_LABELS = None | |
| if CLASS_LABELS is None and os.path.exists(os.path.join(WORKDIR, LABELS_FNAME)): | |
| try: | |
| CLASS_LABELS = json.load(open(os.path.join(WORKDIR, LABELS_FNAME), "r")) | |
| except Exception: | |
| CLASS_LABELS = None | |
| # ---- load available vectorizers (order matters) ---- | |
| vectorizers = [] | |
| for name in VECT_FILES: | |
| p = os.path.join(WORKDIR, name) | |
| if os.path.exists(p): | |
| try: | |
| v = joblib.load(p) | |
| vectorizers.append((name, v)) | |
| print("Loaded vectorizer:", name, type(v)) | |
| except Exception as e: | |
| print("Failed load vectorizer", name, e) | |
| # ---- load ingredients CSV (if available) ---- | |
| ING_DF = None | |
| if ing_path and os.path.exists(ing_path): | |
| try: | |
| ING_DF = pd.read_csv(ing_path) | |
| # normalize column names to lower-case trimmed | |
| ING_DF.columns = [c.strip() for c in ING_DF.columns] | |
| print("Loaded ingredients CSV:", ing_path, "columns:", ING_DF.columns.tolist()) | |
| except Exception as e: | |
| print("Failed to load ingredients CSV:", e) | |
| else: | |
| print("No ingredients CSV found in repo. Upload a CSV named ingredients.csv with columns like Ingredient, Function, Benefits, Harmfulness.") | |
| # ---- helpers for ingredient matching & normalization ---- | |
| def normalize_ingredient(s): | |
| if not isinstance(s, str): | |
| return "" | |
| s = s.lower().strip() | |
| # remove common parentheses content and extra punctuation | |
| import re | |
| s = re.sub(r"\([^)]*\)", "", s) | |
| s = re.sub(r"[^a-z0-9\s%/.,-]", " ", s) | |
| s = " ".join(s.split()) | |
| return s | |
| def fuzzy_best_match(name, choices, cutoff=0.6): | |
| """Return (best_match, score) using SequenceMatcher ratio; or (None,0)""" | |
| if not choices: | |
| return None, 0.0 | |
| best = None | |
| best_score = 0.0 | |
| for c in choices: | |
| score = SequenceMatcher(None, name, c).ratio() | |
| if score > best_score: | |
| best_score = score | |
| best = c | |
| if best_score >= cutoff: | |
| return best, best_score | |
| return best, best_score # return best even if below cutoff | |
| # get choices from ING_DF | |
| ING_CHOICES = [] | |
| if ING_DF is not None and "Ingredient" in ING_DF.columns: | |
| # use original names | |
| ING_CHOICES = [str(x).strip().lower() for x in ING_DF["Ingredient"].astype(str).tolist()] | |
| else: | |
| # if Ingredient column not present, try first column | |
| if ING_DF is not None and len(ING_DF.columns) > 0: | |
| col0 = ING_DF.columns[0] | |
| ING_CHOICES = [str(x).strip().lower() for x in ING_DF[col0].astype(str).tolist()] | |
| # ---- helper to build feature vector consistent with model ---- | |
| def build_feature_matrix(texts): | |
| """ | |
| texts: list[str] | |
| returns sparse matrix compatible with model (pads/trims to n_features_in_ if needed) | |
| """ | |
| mats = [] | |
| for name, v in vectorizers: | |
| try: | |
| mats.append(v.transform(texts)) | |
| except Exception as e: | |
| # if transform fails, try transform on cleaned strings | |
| try: | |
| mats.append(v.transform([normalize_ingredient(t) for t in texts])) | |
| except Exception: | |
| pass | |
| if not mats: | |
| return None | |
| try: | |
| X = hstack(mats).tocsr() | |
| except Exception: | |
| mats2 = [csr_matrix(m) if not hasattr(m, "tocsr") else m.tocsr() for m in mats] | |
| X = hstack(mats2).tocsr() | |
| # pad or trim to model.n_features_in_ if available | |
| n_expected = getattr(model, "n_features_in_", None) | |
| if n_expected is not None: | |
| cur = X.shape[1] | |
| if cur < n_expected: | |
| pad = csr_matrix((X.shape[0], n_expected - cur), dtype=X.dtype) | |
| X = hstack([X, pad]).tocsr() | |
| elif cur > n_expected: | |
| X = X[:, :n_expected] | |
| return X | |
| # ---- main predict + ingredient analysis function ---- | |
| def analyze_and_predict(raw_text: str): | |
| try: | |
| # 1) category prediction | |
| texts = [raw_text] | |
| X = build_feature_matrix(texts) | |
| category_result = None | |
| if X is None: | |
| # try direct predict (if model can accept raw text) | |
| try: | |
| if hasattr(model, "predict_proba"): | |
| probs = model.predict_proba(texts)[0].tolist() | |
| else: | |
| pred = model.predict(texts).tolist() | |
| probs = [float(pred[0])] | |
| except Exception as e: | |
| category_result = {"error": "Model cannot run (missing vectorizers). " + str(e)} | |
| probs = None | |
| else: | |
| if hasattr(model, "predict_proba"): | |
| probs = model.predict_proba(X)[0].tolist() | |
| else: | |
| pred = model.predict(X).tolist() | |
| # still make it list-of-probs-like | |
| probs = [float(x) for x in pred] | |
| if probs is not None: | |
| # map to labels if available, else use indices | |
| if CLASS_LABELS: | |
| label_idx = int(np.argmax(probs)) | |
| label_name = CLASS_LABELS[label_idx] if label_idx < len(CLASS_LABELS) else str(label_idx) | |
| else: | |
| label_idx = int(np.argmax(probs)) | |
| label_name = str(label_idx) | |
| category_result = { | |
| "label": label_name, | |
| "label_index": int(label_idx), | |
| "probabilities": probs, | |
| "classes": CLASS_LABELS or [str(i) for i in range(len(probs))] | |
| } | |
| # 2) ingredient analysis: split input by commas and newlines | |
| # basic splitting — you can improve for multi-word separators | |
| raw_items = [i.strip() for i in raw_text.replace("\n", ",").split(",") if i.strip()] | |
| analyses = [] | |
| for item in raw_items: | |
| norm = normalize_ingredient(item) | |
| best_match, score = fuzzy_best_match(norm, ING_CHOICES, cutoff=0.0) | |
| row = None | |
| if best_match and ING_DF is not None: | |
| # find first row with that ingredient (match lowercase) | |
| mask = ING_DF.apply(lambda r: str(r.astype(str).tolist()).lower().find(best_match) >= 0, axis=1) | |
| # safer: try find exact match in Ingredient column | |
| if "Ingredient" in ING_DF.columns: | |
| matches = ING_DF[ING_DF["Ingredient"].astype(str).str.strip().str.lower() == best_match] | |
| if len(matches) == 0: | |
| # fallback to fuzzy first hit | |
| matches = ING_DF[ING_DF.apply(lambda row: best_match in str(row.values).lower(), axis=1)] | |
| else: | |
| matches = ING_DF[ING_DF.apply(lambda row: best_match in str(row.values).lower(), axis=1)] | |
| if len(matches) > 0: | |
| row = matches.iloc[0] | |
| # build analysis dict | |
| analysis = { | |
| "input": item, | |
| "normalized": norm, | |
| "matched": best_match, | |
| "match_score": float(score) | |
| } | |
| if row is not None: | |
| # add known fields if present | |
| for col in ING_DF.columns: | |
| try: | |
| analysis[col] = row[col] if pd.notna(row[col]) else None | |
| except Exception: | |
| analysis[col] = None | |
| analyses.append(analysis) | |
| # final JSON | |
| return {"category": category_result, "ingredients": analyses} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ---- Gradio interface ---- | |
| def api_predict(text): | |
| # Gradio passes raw string; return JSON-like structure | |
| return analyze_and_predict(text) | |
| title = "Category + Ingredient Analysis" | |
| desc = "Paste product ingredient string (comma separated). Returns predicted category and per-ingredient analysis." | |
| iface = gr.Interface(fn=api_predict, | |
| inputs=gr.Textbox(lines=3, placeholder="Aqua, Glycerin, Aloe vera, ..."), | |
| outputs="json", | |
| title=title, description=desc) | |
| iface.launch() | |