# inference.py - HF-compatible inference that mirrors analyze_product_by_index output import os, joblib, re, json import numpy as np, pandas as pd from difflib import get_close_matches from scipy.sparse import hstack, csr_matrix _here = os.path.dirname(__file__) def _load(name, allow_missing=False): p = os.path.join(_here, name) if not os.path.exists(p): if allow_missing: return None raise FileNotFoundError(f"Required artifact not found in package: {p}") return joblib.load(p) # load artifacts (best-effort) WORD_VECT = _load(os.path.join("model_artifacts","word_vect.pkl"), allow_missing=False) CHAR_VECT = _load(os.path.join("model_artifacts","char_vect.pkl"), allow_missing=False) LABEL_ENCODER = _load(os.path.join("model_artifacts","label_encoder.pkl"), allow_missing=True) MODEL_LGB = _load(os.path.join("model_artifacts","lgb_final_holdout.pkl"), allow_missing=True) MODEL_SVM = _load(os.path.join("model_artifacts","svm_calibrated_holdout.pkl"), allow_missing=True) ING_LOOKUP = _load(os.path.join("ingredient_artifacts","ingredient_lookup.pkl"), allow_missing=True) TRAINED_MODELS = _load(os.path.join("ingredient_artifacts","trained_models.pkl"), allow_missing=True) # products CSV (optional) PRODUCTS_CSV_NAME = "final_products_with_category710-MERGED - final_products_with_category.csv.csv" PRODUCTS_DF = None prod_csv_path = os.path.join(_here, PRODUCTS_CSV_NAME) if os.path.exists(prod_csv_path): try: PRODUCTS_DF = pd.read_csv(prod_csv_path) except Exception: PRODUCTS_DF = None # Normalizers and helpers (same as notebook) paren_re = re.compile(r'\([^)]*\)') unit_re = re.compile(r'\b(\d+ml|\d+mg|\d+g|\d+%)', flags=re.I) def normalize_ingredients_for_category(text): t = str(text).lower() t = paren_re.sub(" ", t) t = unit_re.sub(" ", t) t = re.sub(r"[^a-z0-9,;\-/%]+", " ", t) t = t.replace(";", ",") t = re.sub(r",\s*,", ",", t) t = " ".join(t.split()) return t.strip() def normalize_name_for_category(text): t = str(text).lower() t = re.sub(r"[^a-z0-9\-\s]", " ", t) t = " ".join(t.split()) return t.strip() def normalize_text(s): if s is None: return "" s = str(s).lower() s = re.sub(r'\([^)]*\)', ' ', s) s = re.sub(r'[^a-z0-9\-\s]', ' ', s) s = re.sub(r'\s+', ' ', s).strip() return s def parse_ingredients(text): if not text or pd.isna(text): return [] t = str(text) t = re.sub(r'\s*\([^)]*\)', '', t) t = t.replace(';', ',').replace('/', ',') items = [i.strip() for i in t.split(',') if i.strip()] return items def generate_engineered_features(normalized_product_name, normalized_ingredients_text): ingredient_count = len(normalized_ingredients_text.split(",")) if normalized_ingredients_text else 0 name_len = len(normalized_product_name.split()) if normalized_product_name else 0 has_aqua = int("aqua" in normalized_ingredients_text) has_sorbitol = int("sorbitol" in normalized_ingredients_text) return np.array([ingredient_count, name_len, has_aqua, has_sorbitol]) LOOKUP_DICT = ING_LOOKUP if ING_LOOKUP is not None else {} UNIQUE_ING_NORMS = list(LOOKUP_DICT.keys()) if LOOKUP_DICT else [] def get_best_fuzzy_match(query, choices, lookup_dict, cutoff): matches = get_close_matches(query, choices, n=1, cutoff=cutoff) if matches: matched_norm = matches[0] return {"match_norm": matched_norm, "data": lookup_dict[matched_norm]} return None def map_harm_text(v): if pd.isna(v): return None s = str(v).strip().lower() if s in ("1","2","3","4","5","6","7","8","9","10"): return float(s) if s in ("yes","y","true","t","harmful","toxic","unsafe","dangerous"): return 1.0 if s in ("no","n","false","f","safe","not harmful","none","na","0"): return 0.0 try: return float(s) except: return None def _predict_category(norm_name, norm_ing): combined = norm_name + " | " + norm_ing Xw = WORD_VECT.transform([combined]) Xc = CHAR_VECT.transform([combined]) X_comb = hstack([Xw, Xc, csr_matrix(generate_engineered_features(norm_name, norm_ing).reshape(1,-1))]).tocsr() probs_svm = None probs_lgb = None if MODEL_SVM is not None: try: probs_svm = MODEL_SVM.predict_proba(X_comb) except: probs_svm = None if MODEL_LGB is not None: try: probs_lgb = MODEL_LGB.predict_proba(X_comb) except: probs_lgb = None if probs_svm is not None and probs_lgb is not None: probs = (probs_svm + probs_lgb) / 2.0 else: probs = probs_svm if probs_svm is not None else probs_lgb if probs is None: return None, None code = int(np.argmax(probs, axis=1)[0]) label = LABEL_ENCODER.inverse_transform([code])[0] if LABEL_ENCODER is not None else str(code) return label, (probs.tolist() if probs is not None else None) def _analyze_rows(items_raw, fuzzy_cutoff=0.85): items_norm = [normalize_text(x) for x in items_raw] rows = [] for raw, norm in zip(items_raw, items_norm): entry = { "ingredient_raw": raw, "ingredient_norm": norm, "source": None,"function": None,"benefits": None,"explanation": None, "harm_label": None,"harm_score": None,"harm_pred_prob": None } if LOOKUP_DICT and norm in LOOKUP_DICT: r = LOOKUP_DICT[norm]; entry['source']='exact' for k in ("Function","function","function_name","Function "): if k in r: entry['function']=r.get(k); break for k in ("Benefits","benefit","short explanation","Short Explanation"): if k in r: entry['benefits']=r.get(k); break harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)] if harm_candidates: mapped = map_harm_text(r.get(harm_candidates[0])) if mapped is not None: entry['harm_score'] = float(mapped)/10.0; entry['harm_label']=mapped else: fuzzy = get_best_fuzzy_match(norm, UNIQUE_ING_NORMS, LOOKUP_DICT, fuzzy_cutoff) if UNIQUE_ING_NORMS else None if fuzzy: cand = fuzzy['match_norm']; r = LOOKUP_DICT[cand]; entry['source'] = f"fuzzy->{cand}" for k in ("Function","function","function_name"): if k in r: entry['function']=r.get(k); break for k in ("Benefits","benefit","short explanation"): if k in r: entry['benefits']=r.get(k); break harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)] if harm_candidates: mapped = map_harm_text(r.get(harm_candidates[0])) if mapped is not None: entry['harm_score'] = float(mapped)/10.0; entry['harm_label'] = mapped else: entry['source'] = 'predicted' if TRAINED_MODELS and 'function' in TRAINED_MODELS: try: vect_f, clf_f = TRAINED_MODELS['function'] code = clf_f.predict(vect_f.transform([norm]))[0] entry['function'] = str(code) try: entry['function_prob'] = float(max(clf_f.predict_proba(vect_f.transform([norm]))[0])) except: entry['function_prob'] = None except: pass if TRAINED_MODELS and 'harmful' in TRAINED_MODELS: try: vect_h, clf_h = TRAINED_MODELS['harmful'] hp = clf_h.predict_proba(vect_h.transform([norm]))[0] entry['harm_pred_prob'] = float(hp[1]) if len(hp)>1 else float(max(hp)) entry['harm_score'] = entry['harm_pred_prob'] except: pass if entry['harm_score'] is None: entry['harm_score'] = 0.0 rows.append(entry) return pd.DataFrame(rows) def predict(inputs: dict) -> dict: fuzzy_cutoff = float(inputs.get("fuzzy_cutoff", 0.85)) prod_index = inputs.get("product_index", None) if prod_index is not None: if PRODUCTS_DF is None: return {"error":"Products CSV not in package; cannot use product_index"} try: prod_index = int(prod_index) except: return {"error":"product_index must be integer"} if prod_index < 0 or prod_index >= len(PRODUCTS_DF): return {"error": f"product_index out of range 0..{len(PRODUCTS_DF)-1}"} row = PRODUCTS_DF.iloc[prod_index] product_name = row.get("PRODUCT NAME","") if "PRODUCT NAME" in row.index else row.iloc[0] if len(row)>0 else "" ingredient_text = row.get("INGREDIENTS","") if "INGREDIENTS" in row.index else (row.iloc[1] if len(row)>1 else "") else: product_name = inputs.get("product_name","") ingredient_text = inputs.get("ingredient_text","") norm_name = normalize_name_for_category(product_name) norm_ing = normalize_ingredients_for_category(ingredient_text) predicted_category_label, category_probs = _predict_category(norm_name, norm_ing) items_raw = parse_ingredients(ingredient_text) df_rows = _analyze_rows(items_raw, fuzzy_cutoff=fuzzy_cutoff) df_rows['harm_score'] = df_rows['harm_score'].fillna(0.0).astype(float) avg_harm = float(df_rows['harm_score'].mean()) if len(df_rows)>0 else 0.0 rows_json = df_rows.to_dict(orient='records') out = { "product_index": prod_index, "product_name": product_name, "predicted_category": predicted_category_label, "category_probs": category_probs, "avg_harm": avg_harm, "rows": rows_json, "product_ingredient_count": len(rows_json) } return out if __name__ == "__main__": example = {"product_index": 0} if PRODUCTS_DF is not None and len(PRODUCTS_DF)>0 else {"product_name":"Test","ingredient_text":"Aqua, Glycerin, Alcohol"} import json print(json.dumps(predict(example), indent=2))