Spaces:
Sleeping
Sleeping
| # inference.py - clean, HF-Space compatible | |
| import os, re, joblib, json | |
| import numpy as np | |
| import pandas as pd | |
| from difflib import get_close_matches | |
| from scipy.sparse import hstack, csr_matrix | |
| # ----------------------- | |
| # Load artifacts | |
| # ----------------------- | |
| HERE = os.path.dirname(__file__) | |
| def load_artifact(path, allow_missing=False): | |
| full = os.path.join(HERE, path) | |
| if not os.path.exists(full): | |
| if allow_missing: | |
| return None | |
| raise FileNotFoundError(f"Missing artifact: {full}") | |
| return joblib.load(full) | |
| # Category models | |
| WORD_VECT = load_artifact("model_artifacts/word_vect.pkl") | |
| CHAR_VECT = load_artifact("model_artifacts/char_vect.pkl") | |
| LABEL_ENCODER = load_artifact("model_artifacts/label_encoder.pkl", allow_missing=True) | |
| MODEL_SVM = load_artifact("model_artifacts/svm_calibrated_holdout.pkl", allow_missing=True) | |
| MODEL_LGB = load_artifact("model_artifacts/lgb_final_holdout.pkl", allow_missing=True) | |
| # Ingredient models | |
| ING_LOOKUP = load_artifact("ingredient_artifacts/ingredient_lookup.pkl", allow_missing=True) | |
| TRAINED_MODELS = load_artifact("ingredient_artifacts/trained_models.pkl", allow_missing=True) | |
| LOOKUP_DICT = ING_LOOKUP or {} | |
| UNIQUE_ING_NORMS = list(LOOKUP_DICT.keys()) | |
| # Optional product CSV | |
| CSV_NAME = "final_products_with_category710-MERGED - final_products_with_category.csv.csv" | |
| PRODUCTS_DF = None | |
| p = os.path.join(HERE, CSV_NAME) | |
| if os.path.exists(p): | |
| try: | |
| PRODUCTS_DF = pd.read_csv(p) | |
| except: | |
| PRODUCTS_DF = None | |
| # ----------------------- | |
| # Normalization | |
| # ----------------------- | |
| paren_re = re.compile(r"\([^)]*\)") | |
| unit_re = re.compile(r"\b(\d+ml|\d+mg|\d+g|\d+%)", flags=re.I) | |
| def normalize_name(text): | |
| t = str(text).lower() | |
| t = re.sub(r"[^a-z0-9\s\-]", " ", t) | |
| return " ".join(t.split()) | |
| def normalize_ingredients_text(text): | |
| t = str(text).lower() | |
| t = paren_re.sub(" ", t) | |
| t = unit_re.sub(" ", t) | |
| t = re.sub(r"[^a-z0-9,;\-/%]+", " ", t) | |
| t = t.replace(";", ",") | |
| t = re.sub(r",\s*,", ",", t) | |
| return " ".join(t.split()) | |
| def normalize_token(s): | |
| if not s: return "" | |
| s = str(s).lower() | |
| s = paren_re.sub(" ", s) | |
| s = re.sub(r"[^a-z0-9\s\-]", " ", s) | |
| return " ".join(s.split()) | |
| def parse_ing(text): | |
| if not text: return [] | |
| t = re.sub(r"\([^)]*\)", "", str(text)) | |
| t = t.replace(";", ",").replace("/", ",") | |
| return [x.strip() for x in t.split(",") if x.strip()] | |
| def engineered_features(name, ing): | |
| return np.array([ | |
| len(ing.split(",")) if ing else 0, | |
| len(name.split()) if name else 0, | |
| int("aqua" in ing), | |
| int("sorbitol" in ing) | |
| ]) | |
| # ----------------------- | |
| # Fuzzy matching | |
| # ----------------------- | |
| def best_fuzzy(norm): | |
| if not LOOKUP_DICT: | |
| return None | |
| matches = get_close_matches(norm, UNIQUE_ING_NORMS, n=1, cutoff=0.85) | |
| if matches: | |
| key = matches[0] | |
| return key, LOOKUP_DICT[key] | |
| return None | |
| # ----------------------- | |
| # Harm mapping | |
| # ----------------------- | |
| def map_harm(v): | |
| if v is None or str(v).strip() == "": | |
| return None | |
| s = str(v).strip().lower() | |
| if s.isdigit() and 1 <= int(s) <= 10: | |
| return float(s) | |
| if s in ["yes","true","harmful","unsafe","dangerous","toxic"]: | |
| return 1.0 | |
| if s in ["no","false","safe","none","0"]: | |
| return 0.0 | |
| try: | |
| f = float(s) | |
| return f if 0 <= f <= 10 else None | |
| except: | |
| return None | |
| # ----------------------- | |
| # Category prediction | |
| # ----------------------- | |
| def predict_category(n_name, n_ing): | |
| combined = n_name + " | " + n_ing | |
| Xw = WORD_VECT.transform([combined]) | |
| Xc = CHAR_VECT.transform([combined]) | |
| feats = engineered_features(n_name, n_ing).reshape(1, -1) | |
| X = hstack([Xw, Xc, csr_matrix(feats)]).tocsr() | |
| ps = None | |
| if MODEL_SVM: | |
| try: ps = MODEL_SVM.predict_proba(X) | |
| except: pass | |
| if MODEL_LGB: | |
| try: | |
| pl = MODEL_LGB.predict_proba(X) | |
| ps = (ps + pl) / 2 if ps is not None else pl | |
| except: | |
| pass | |
| if ps is None: | |
| return None | |
| code = int(np.argmax(ps, axis=1)[0]) | |
| label = LABEL_ENCODER.inverse_transform([code])[0] if LABEL_ENCODER else str(code) | |
| return label | |
| # ----------------------- | |
| # Ingredient info extractor (clean output) | |
| # ----------------------- | |
| def extract_info(raw, norm): | |
| entry = { | |
| "ingredient_norm": norm, | |
| "function": None, | |
| "benefits": None, | |
| "explanation": None, | |
| "harm_score": None | |
| } | |
| r = None | |
| if norm in LOOKUP_DICT: | |
| r = LOOKUP_DICT[norm] | |
| else: | |
| fuzzy = best_fuzzy(norm) | |
| if fuzzy: | |
| _, r = fuzzy | |
| if r: | |
| for k in ["Function","function","Role","Use"]: | |
| if k in r: | |
| entry["function"] = r[k] | |
| break | |
| for k in ["Benefits","benefit","Purpose"]: | |
| if k in r: | |
| entry["benefits"] = r[k] | |
| break | |
| explanation = None | |
| for k in ["Short Explanation","short explanation","Explanation","Desc","Description"]: | |
| if k in r and str(r[k]).strip(): | |
| explanation = str(r[k]).strip() | |
| break | |
| if not explanation: | |
| for cand in r.keys(): | |
| if any(x in cand.lower() for x in ["expla","desc","short"]): | |
| explanation = str(r[cand]).strip() | |
| break | |
| if not explanation: | |
| explanation = entry["benefits"] | |
| entry["explanation"] = explanation | |
| harm_key = None | |
| for hk in r.keys(): | |
| if "harm" in hk.lower() or "risk" in hk.lower(): | |
| harm_key = hk | |
| break | |
| if harm_key: | |
| mapped = map_harm(r[harm_key]) | |
| if mapped is not None: | |
| entry["harm_score"] = mapped | |
| if entry["harm_score"] is None: | |
| entry["harm_score"] = 0.0 | |
| return entry | |
| # ----------------------- | |
| # Main predict | |
| # ----------------------- | |
| def predict(inputs: dict) -> dict: | |
| idx = inputs.get("product_index") | |
| if idx is not None and PRODUCTS_DF is not None: | |
| try: idx = int(idx) | |
| except: return {"error": "product_index must be integer"} | |
| if idx < 0 or idx >= len(PRODUCTS_DF): | |
| return {"error": f"index out of range 0..{len(PRODUCTS_DF)-1}"} | |
| row = PRODUCTS_DF.iloc[idx] | |
| product_name = row.get("PRODUCT NAME", "") | |
| ingredient_text = row.get("INGREDIENTS", "") | |
| else: | |
| product_name = inputs.get("product_name", "") | |
| ingredient_text = inputs.get("ingredient_text", "") | |
| n_name = normalize_name(product_name) | |
| n_ing = normalize_ingredients_text(ingredient_text) | |
| category_label = predict_category(n_name, n_ing) | |
| items = parse_ing(ingredient_text) | |
| rows = [extract_info(r, normalize_token(r)) for r in items] | |
| avg_harm = float(np.mean([r["harm_score"] for r in rows])) if rows else 0.0 | |
| return { | |
| "product_index": idx, | |
| "product_name": product_name, | |
| "predicted_category": category_label, | |
| "avg_harm": avg_harm, | |
| "product_ingredient_count": len(rows), | |
| "rows": rows | |
| } | |
| # Debug run | |
| if __name__ == "__main__": | |
| example = {"product_name": "Test", "ingredient_text": "Aqua, Glycerin"} | |
| print(json.dumps(predict(example), indent=2)) | |