ingredients-model / inference.py
ashtii's picture
Upload model artifacts + inference
4eaf81d
# inference.py - HF-compatible inference that mirrors analyze_product_by_index output
import os, joblib, re, json
import numpy as np, pandas as pd
from difflib import get_close_matches
from scipy.sparse import hstack, csr_matrix
_here = os.path.dirname(__file__)
def _load(name, allow_missing=False):
p = os.path.join(_here, name)
if not os.path.exists(p):
if allow_missing:
return None
raise FileNotFoundError(f"Required artifact not found in package: {p}")
return joblib.load(p)
# load artifacts (best-effort)
WORD_VECT = _load(os.path.join("model_artifacts","word_vect.pkl"), allow_missing=False)
CHAR_VECT = _load(os.path.join("model_artifacts","char_vect.pkl"), allow_missing=False)
LABEL_ENCODER = _load(os.path.join("model_artifacts","label_encoder.pkl"), allow_missing=True)
MODEL_LGB = _load(os.path.join("model_artifacts","lgb_final_holdout.pkl"), allow_missing=True)
MODEL_SVM = _load(os.path.join("model_artifacts","svm_calibrated_holdout.pkl"), allow_missing=True)
ING_LOOKUP = _load(os.path.join("ingredient_artifacts","ingredient_lookup.pkl"), allow_missing=True)
TRAINED_MODELS = _load(os.path.join("ingredient_artifacts","trained_models.pkl"), allow_missing=True)
# products CSV (optional)
PRODUCTS_CSV_NAME = "final_products_with_category710-MERGED - final_products_with_category.csv.csv"
PRODUCTS_DF = None
prod_csv_path = os.path.join(_here, PRODUCTS_CSV_NAME)
if os.path.exists(prod_csv_path):
try:
PRODUCTS_DF = pd.read_csv(prod_csv_path)
except Exception:
PRODUCTS_DF = None
# Normalizers and helpers (same as notebook)
paren_re = re.compile(r'\([^)]*\)')
unit_re = re.compile(r'\b(\d+ml|\d+mg|\d+g|\d+%)', flags=re.I)
def normalize_ingredients_for_category(text):
t = str(text).lower()
t = paren_re.sub(" ", t)
t = unit_re.sub(" ", t)
t = re.sub(r"[^a-z0-9,;\-/%]+", " ", t)
t = t.replace(";", ",")
t = re.sub(r",\s*,", ",", t)
t = " ".join(t.split())
return t.strip()
def normalize_name_for_category(text):
t = str(text).lower()
t = re.sub(r"[^a-z0-9\-\s]", " ", t)
t = " ".join(t.split())
return t.strip()
def normalize_text(s):
if s is None: return ""
s = str(s).lower()
s = re.sub(r'\([^)]*\)', ' ', s)
s = re.sub(r'[^a-z0-9\-\s]', ' ', s)
s = re.sub(r'\s+', ' ', s).strip()
return s
def parse_ingredients(text):
if not text or pd.isna(text): return []
t = str(text)
t = re.sub(r'\s*\([^)]*\)', '', t)
t = t.replace(';', ',').replace('/', ',')
items = [i.strip() for i in t.split(',') if i.strip()]
return items
def generate_engineered_features(normalized_product_name, normalized_ingredients_text):
ingredient_count = len(normalized_ingredients_text.split(",")) if normalized_ingredients_text else 0
name_len = len(normalized_product_name.split()) if normalized_product_name else 0
has_aqua = int("aqua" in normalized_ingredients_text)
has_sorbitol = int("sorbitol" in normalized_ingredients_text)
return np.array([ingredient_count, name_len, has_aqua, has_sorbitol])
LOOKUP_DICT = ING_LOOKUP if ING_LOOKUP is not None else {}
UNIQUE_ING_NORMS = list(LOOKUP_DICT.keys()) if LOOKUP_DICT else []
def get_best_fuzzy_match(query, choices, lookup_dict, cutoff):
matches = get_close_matches(query, choices, n=1, cutoff=cutoff)
if matches:
matched_norm = matches[0]
return {"match_norm": matched_norm, "data": lookup_dict[matched_norm]}
return None
def map_harm_text(v):
if pd.isna(v): return None
s = str(v).strip().lower()
if s in ("1","2","3","4","5","6","7","8","9","10"):
return float(s)
if s in ("yes","y","true","t","harmful","toxic","unsafe","dangerous"):
return 1.0
if s in ("no","n","false","f","safe","not harmful","none","na","0"):
return 0.0
try:
return float(s)
except:
return None
def _predict_category(norm_name, norm_ing):
combined = norm_name + " | " + norm_ing
Xw = WORD_VECT.transform([combined])
Xc = CHAR_VECT.transform([combined])
X_comb = hstack([Xw, Xc, csr_matrix(generate_engineered_features(norm_name, norm_ing).reshape(1,-1))]).tocsr()
probs_svm = None
probs_lgb = None
if MODEL_SVM is not None:
try: probs_svm = MODEL_SVM.predict_proba(X_comb)
except: probs_svm = None
if MODEL_LGB is not None:
try: probs_lgb = MODEL_LGB.predict_proba(X_comb)
except: probs_lgb = None
if probs_svm is not None and probs_lgb is not None:
probs = (probs_svm + probs_lgb) / 2.0
else:
probs = probs_svm if probs_svm is not None else probs_lgb
if probs is None:
return None, None
code = int(np.argmax(probs, axis=1)[0])
label = LABEL_ENCODER.inverse_transform([code])[0] if LABEL_ENCODER is not None else str(code)
return label, (probs.tolist() if probs is not None else None)
def _analyze_rows(items_raw, fuzzy_cutoff=0.85):
items_norm = [normalize_text(x) for x in items_raw]
rows = []
for raw, norm in zip(items_raw, items_norm):
entry = {
"ingredient_raw": raw,
"ingredient_norm": norm,
"source": None,"function": None,"benefits": None,"explanation": None,
"harm_label": None,"harm_score": None,"harm_pred_prob": None
}
if LOOKUP_DICT and norm in LOOKUP_DICT:
r = LOOKUP_DICT[norm]; entry['source']='exact'
for k in ("Function","function","function_name","Function "):
if k in r: entry['function']=r.get(k); break
for k in ("Benefits","benefit","short explanation","Short Explanation"):
if k in r: entry['benefits']=r.get(k); break
harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)]
if harm_candidates:
mapped = map_harm_text(r.get(harm_candidates[0]))
if mapped is not None:
entry['harm_score'] = float(mapped)/10.0; entry['harm_label']=mapped
else:
fuzzy = get_best_fuzzy_match(norm, UNIQUE_ING_NORMS, LOOKUP_DICT, fuzzy_cutoff) if UNIQUE_ING_NORMS else None
if fuzzy:
cand = fuzzy['match_norm']; r = LOOKUP_DICT[cand]; entry['source'] = f"fuzzy->{cand}"
for k in ("Function","function","function_name"):
if k in r: entry['function']=r.get(k); break
for k in ("Benefits","benefit","short explanation"):
if k in r: entry['benefits']=r.get(k); break
harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)]
if harm_candidates:
mapped = map_harm_text(r.get(harm_candidates[0]))
if mapped is not None:
entry['harm_score'] = float(mapped)/10.0; entry['harm_label'] = mapped
else:
entry['source'] = 'predicted'
if TRAINED_MODELS and 'function' in TRAINED_MODELS:
try:
vect_f, clf_f = TRAINED_MODELS['function']
code = clf_f.predict(vect_f.transform([norm]))[0]
entry['function'] = str(code)
try: entry['function_prob'] = float(max(clf_f.predict_proba(vect_f.transform([norm]))[0]))
except: entry['function_prob'] = None
except: pass
if TRAINED_MODELS and 'harmful' in TRAINED_MODELS:
try:
vect_h, clf_h = TRAINED_MODELS['harmful']
hp = clf_h.predict_proba(vect_h.transform([norm]))[0]
entry['harm_pred_prob'] = float(hp[1]) if len(hp)>1 else float(max(hp))
entry['harm_score'] = entry['harm_pred_prob']
except: pass
if entry['harm_score'] is None: entry['harm_score'] = 0.0
rows.append(entry)
return pd.DataFrame(rows)
def predict(inputs: dict) -> dict:
fuzzy_cutoff = float(inputs.get("fuzzy_cutoff", 0.85))
prod_index = inputs.get("product_index", None)
if prod_index is not None:
if PRODUCTS_DF is None:
return {"error":"Products CSV not in package; cannot use product_index"}
try: prod_index = int(prod_index)
except: return {"error":"product_index must be integer"}
if prod_index < 0 or prod_index >= len(PRODUCTS_DF):
return {"error": f"product_index out of range 0..{len(PRODUCTS_DF)-1}"}
row = PRODUCTS_DF.iloc[prod_index]
product_name = row.get("PRODUCT NAME","") if "PRODUCT NAME" in row.index else row.iloc[0] if len(row)>0 else ""
ingredient_text = row.get("INGREDIENTS","") if "INGREDIENTS" in row.index else (row.iloc[1] if len(row)>1 else "")
else:
product_name = inputs.get("product_name","")
ingredient_text = inputs.get("ingredient_text","")
norm_name = normalize_name_for_category(product_name)
norm_ing = normalize_ingredients_for_category(ingredient_text)
predicted_category_label, category_probs = _predict_category(norm_name, norm_ing)
items_raw = parse_ingredients(ingredient_text)
df_rows = _analyze_rows(items_raw, fuzzy_cutoff=fuzzy_cutoff)
df_rows['harm_score'] = df_rows['harm_score'].fillna(0.0).astype(float)
avg_harm = float(df_rows['harm_score'].mean()) if len(df_rows)>0 else 0.0
rows_json = df_rows.to_dict(orient='records')
out = {
"product_index": prod_index,
"product_name": product_name,
"predicted_category": predicted_category_label,
"category_probs": category_probs,
"avg_harm": avg_harm,
"rows": rows_json,
"product_ingredient_count": len(rows_json)
}
return out
if __name__ == "__main__":
example = {"product_index": 0} if PRODUCTS_DF is not None and len(PRODUCTS_DF)>0 else {"product_name":"Test","ingredient_text":"Aqua, Glycerin, Alcohol"}
import json
print(json.dumps(predict(example), indent=2))