|
|
|
|
|
|
|
|
import os, joblib, re, json |
|
|
import numpy as np, pandas as pd |
|
|
from difflib import get_close_matches |
|
|
from scipy.sparse import hstack, csr_matrix |
|
|
|
|
|
_here = os.path.dirname(__file__) |
|
|
|
|
|
def _load(name, allow_missing=False): |
|
|
p = os.path.join(_here, name) |
|
|
if not os.path.exists(p): |
|
|
if allow_missing: |
|
|
return None |
|
|
raise FileNotFoundError(f"Required artifact not found in package: {p}") |
|
|
return joblib.load(p) |
|
|
|
|
|
|
|
|
WORD_VECT = _load(os.path.join("model_artifacts","word_vect.pkl"), allow_missing=False) |
|
|
CHAR_VECT = _load(os.path.join("model_artifacts","char_vect.pkl"), allow_missing=False) |
|
|
LABEL_ENCODER = _load(os.path.join("model_artifacts","label_encoder.pkl"), allow_missing=True) |
|
|
MODEL_LGB = _load(os.path.join("model_artifacts","lgb_final_holdout.pkl"), allow_missing=True) |
|
|
MODEL_SVM = _load(os.path.join("model_artifacts","svm_calibrated_holdout.pkl"), allow_missing=True) |
|
|
ING_LOOKUP = _load(os.path.join("ingredient_artifacts","ingredient_lookup.pkl"), allow_missing=True) |
|
|
TRAINED_MODELS = _load(os.path.join("ingredient_artifacts","trained_models.pkl"), allow_missing=True) |
|
|
|
|
|
|
|
|
PRODUCTS_CSV_NAME = "final_products_with_category710-MERGED - final_products_with_category.csv.csv" |
|
|
PRODUCTS_DF = None |
|
|
prod_csv_path = os.path.join(_here, PRODUCTS_CSV_NAME) |
|
|
if os.path.exists(prod_csv_path): |
|
|
try: |
|
|
PRODUCTS_DF = pd.read_csv(prod_csv_path) |
|
|
except Exception: |
|
|
PRODUCTS_DF = None |
|
|
|
|
|
|
|
|
paren_re = re.compile(r'\([^)]*\)') |
|
|
unit_re = re.compile(r'\b(\d+ml|\d+mg|\d+g|\d+%)', flags=re.I) |
|
|
def normalize_ingredients_for_category(text): |
|
|
t = str(text).lower() |
|
|
t = paren_re.sub(" ", t) |
|
|
t = unit_re.sub(" ", t) |
|
|
t = re.sub(r"[^a-z0-9,;\-/%]+", " ", t) |
|
|
t = t.replace(";", ",") |
|
|
t = re.sub(r",\s*,", ",", t) |
|
|
t = " ".join(t.split()) |
|
|
return t.strip() |
|
|
def normalize_name_for_category(text): |
|
|
t = str(text).lower() |
|
|
t = re.sub(r"[^a-z0-9\-\s]", " ", t) |
|
|
t = " ".join(t.split()) |
|
|
return t.strip() |
|
|
def normalize_text(s): |
|
|
if s is None: return "" |
|
|
s = str(s).lower() |
|
|
s = re.sub(r'\([^)]*\)', ' ', s) |
|
|
s = re.sub(r'[^a-z0-9\-\s]', ' ', s) |
|
|
s = re.sub(r'\s+', ' ', s).strip() |
|
|
return s |
|
|
def parse_ingredients(text): |
|
|
if not text or pd.isna(text): return [] |
|
|
t = str(text) |
|
|
t = re.sub(r'\s*\([^)]*\)', '', t) |
|
|
t = t.replace(';', ',').replace('/', ',') |
|
|
items = [i.strip() for i in t.split(',') if i.strip()] |
|
|
return items |
|
|
def generate_engineered_features(normalized_product_name, normalized_ingredients_text): |
|
|
ingredient_count = len(normalized_ingredients_text.split(",")) if normalized_ingredients_text else 0 |
|
|
name_len = len(normalized_product_name.split()) if normalized_product_name else 0 |
|
|
has_aqua = int("aqua" in normalized_ingredients_text) |
|
|
has_sorbitol = int("sorbitol" in normalized_ingredients_text) |
|
|
return np.array([ingredient_count, name_len, has_aqua, has_sorbitol]) |
|
|
|
|
|
LOOKUP_DICT = ING_LOOKUP if ING_LOOKUP is not None else {} |
|
|
UNIQUE_ING_NORMS = list(LOOKUP_DICT.keys()) if LOOKUP_DICT else [] |
|
|
|
|
|
def get_best_fuzzy_match(query, choices, lookup_dict, cutoff): |
|
|
matches = get_close_matches(query, choices, n=1, cutoff=cutoff) |
|
|
if matches: |
|
|
matched_norm = matches[0] |
|
|
return {"match_norm": matched_norm, "data": lookup_dict[matched_norm]} |
|
|
return None |
|
|
|
|
|
def map_harm_text(v): |
|
|
if pd.isna(v): return None |
|
|
s = str(v).strip().lower() |
|
|
if s in ("1","2","3","4","5","6","7","8","9","10"): |
|
|
return float(s) |
|
|
if s in ("yes","y","true","t","harmful","toxic","unsafe","dangerous"): |
|
|
return 1.0 |
|
|
if s in ("no","n","false","f","safe","not harmful","none","na","0"): |
|
|
return 0.0 |
|
|
try: |
|
|
return float(s) |
|
|
except: |
|
|
return None |
|
|
|
|
|
def _predict_category(norm_name, norm_ing): |
|
|
combined = norm_name + " | " + norm_ing |
|
|
Xw = WORD_VECT.transform([combined]) |
|
|
Xc = CHAR_VECT.transform([combined]) |
|
|
X_comb = hstack([Xw, Xc, csr_matrix(generate_engineered_features(norm_name, norm_ing).reshape(1,-1))]).tocsr() |
|
|
probs_svm = None |
|
|
probs_lgb = None |
|
|
if MODEL_SVM is not None: |
|
|
try: probs_svm = MODEL_SVM.predict_proba(X_comb) |
|
|
except: probs_svm = None |
|
|
if MODEL_LGB is not None: |
|
|
try: probs_lgb = MODEL_LGB.predict_proba(X_comb) |
|
|
except: probs_lgb = None |
|
|
if probs_svm is not None and probs_lgb is not None: |
|
|
probs = (probs_svm + probs_lgb) / 2.0 |
|
|
else: |
|
|
probs = probs_svm if probs_svm is not None else probs_lgb |
|
|
if probs is None: |
|
|
return None, None |
|
|
code = int(np.argmax(probs, axis=1)[0]) |
|
|
label = LABEL_ENCODER.inverse_transform([code])[0] if LABEL_ENCODER is not None else str(code) |
|
|
return label, (probs.tolist() if probs is not None else None) |
|
|
|
|
|
def _analyze_rows(items_raw, fuzzy_cutoff=0.85): |
|
|
items_norm = [normalize_text(x) for x in items_raw] |
|
|
rows = [] |
|
|
for raw, norm in zip(items_raw, items_norm): |
|
|
entry = { |
|
|
"ingredient_raw": raw, |
|
|
"ingredient_norm": norm, |
|
|
"source": None,"function": None,"benefits": None,"explanation": None, |
|
|
"harm_label": None,"harm_score": None,"harm_pred_prob": None |
|
|
} |
|
|
if LOOKUP_DICT and norm in LOOKUP_DICT: |
|
|
r = LOOKUP_DICT[norm]; entry['source']='exact' |
|
|
for k in ("Function","function","function_name","Function "): |
|
|
if k in r: entry['function']=r.get(k); break |
|
|
for k in ("Benefits","benefit","short explanation","Short Explanation"): |
|
|
if k in r: entry['benefits']=r.get(k); break |
|
|
harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)] |
|
|
if harm_candidates: |
|
|
mapped = map_harm_text(r.get(harm_candidates[0])) |
|
|
if mapped is not None: |
|
|
entry['harm_score'] = float(mapped)/10.0; entry['harm_label']=mapped |
|
|
else: |
|
|
fuzzy = get_best_fuzzy_match(norm, UNIQUE_ING_NORMS, LOOKUP_DICT, fuzzy_cutoff) if UNIQUE_ING_NORMS else None |
|
|
if fuzzy: |
|
|
cand = fuzzy['match_norm']; r = LOOKUP_DICT[cand]; entry['source'] = f"fuzzy->{cand}" |
|
|
for k in ("Function","function","function_name"): |
|
|
if k in r: entry['function']=r.get(k); break |
|
|
for k in ("Benefits","benefit","short explanation"): |
|
|
if k in r: entry['benefits']=r.get(k); break |
|
|
harm_candidates=[c for c in r.keys() if 'harm' in str(c).lower() or 'risk' in str(c)] |
|
|
if harm_candidates: |
|
|
mapped = map_harm_text(r.get(harm_candidates[0])) |
|
|
if mapped is not None: |
|
|
entry['harm_score'] = float(mapped)/10.0; entry['harm_label'] = mapped |
|
|
else: |
|
|
entry['source'] = 'predicted' |
|
|
if TRAINED_MODELS and 'function' in TRAINED_MODELS: |
|
|
try: |
|
|
vect_f, clf_f = TRAINED_MODELS['function'] |
|
|
code = clf_f.predict(vect_f.transform([norm]))[0] |
|
|
entry['function'] = str(code) |
|
|
try: entry['function_prob'] = float(max(clf_f.predict_proba(vect_f.transform([norm]))[0])) |
|
|
except: entry['function_prob'] = None |
|
|
except: pass |
|
|
if TRAINED_MODELS and 'harmful' in TRAINED_MODELS: |
|
|
try: |
|
|
vect_h, clf_h = TRAINED_MODELS['harmful'] |
|
|
hp = clf_h.predict_proba(vect_h.transform([norm]))[0] |
|
|
entry['harm_pred_prob'] = float(hp[1]) if len(hp)>1 else float(max(hp)) |
|
|
entry['harm_score'] = entry['harm_pred_prob'] |
|
|
except: pass |
|
|
if entry['harm_score'] is None: entry['harm_score'] = 0.0 |
|
|
rows.append(entry) |
|
|
return pd.DataFrame(rows) |
|
|
|
|
|
def predict(inputs: dict) -> dict: |
|
|
fuzzy_cutoff = float(inputs.get("fuzzy_cutoff", 0.85)) |
|
|
prod_index = inputs.get("product_index", None) |
|
|
if prod_index is not None: |
|
|
if PRODUCTS_DF is None: |
|
|
return {"error":"Products CSV not in package; cannot use product_index"} |
|
|
try: prod_index = int(prod_index) |
|
|
except: return {"error":"product_index must be integer"} |
|
|
if prod_index < 0 or prod_index >= len(PRODUCTS_DF): |
|
|
return {"error": f"product_index out of range 0..{len(PRODUCTS_DF)-1}"} |
|
|
row = PRODUCTS_DF.iloc[prod_index] |
|
|
product_name = row.get("PRODUCT NAME","") if "PRODUCT NAME" in row.index else row.iloc[0] if len(row)>0 else "" |
|
|
ingredient_text = row.get("INGREDIENTS","") if "INGREDIENTS" in row.index else (row.iloc[1] if len(row)>1 else "") |
|
|
else: |
|
|
product_name = inputs.get("product_name","") |
|
|
ingredient_text = inputs.get("ingredient_text","") |
|
|
norm_name = normalize_name_for_category(product_name) |
|
|
norm_ing = normalize_ingredients_for_category(ingredient_text) |
|
|
predicted_category_label, category_probs = _predict_category(norm_name, norm_ing) |
|
|
items_raw = parse_ingredients(ingredient_text) |
|
|
df_rows = _analyze_rows(items_raw, fuzzy_cutoff=fuzzy_cutoff) |
|
|
df_rows['harm_score'] = df_rows['harm_score'].fillna(0.0).astype(float) |
|
|
avg_harm = float(df_rows['harm_score'].mean()) if len(df_rows)>0 else 0.0 |
|
|
rows_json = df_rows.to_dict(orient='records') |
|
|
out = { |
|
|
"product_index": prod_index, |
|
|
"product_name": product_name, |
|
|
"predicted_category": predicted_category_label, |
|
|
"category_probs": category_probs, |
|
|
"avg_harm": avg_harm, |
|
|
"rows": rows_json, |
|
|
"product_ingredient_count": len(rows_json) |
|
|
} |
|
|
return out |
|
|
|
|
|
if __name__ == "__main__": |
|
|
example = {"product_index": 0} if PRODUCTS_DF is not None and len(PRODUCTS_DF)>0 else {"product_name":"Test","ingredient_text":"Aqua, Glycerin, Alcohol"} |
|
|
import json |
|
|
print(json.dumps(predict(example), indent=2)) |
|
|
|