meltmind-ai / scripts /audit_knowledge.py
Haricharan Vallem
Codex: prepare MeltMind for Hugging Face Spaces
58fddaa
Raw
History Blame Contribute Delete
13.7 kB
import json
import re
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DATA = ROOT / "data"
MELTMIND = DATA / "meltmind"
def load(path: Path):
return json.loads(path.read_text())
def slug(value: str) -> str:
return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_")
menu = load(DATA / "menu.json")
products = load(MELTMIND / "product_knowledge.json")["products"]
ingredients_data = load(MELTMIND / "ingredients.json")
ingredients = ingredients_data["ingredients"]
allergens = ingredients_data["allergen_catalogue"]
add_ons = load(MELTMIND / "add_ons.json")["add_ons"]
language = load(MELTMIND / "language_dictionary.json")
rules = load(MELTMIND / "recommendation_rules.json")
faqs = load(MELTMIND / "faq.json")["faqs"]
examples = load(MELTMIND / "customer_examples.json")
evaluation = load(MELTMIND / "evaluation_cases.json")
operations = load(MELTMIND / "operations_and_policies.json")
business = load(MELTMIND / "business_profile.json")
menu_names = {item["name"] for item in menu}
menu_ids = {slug(item["name"]) for item in menu}
product_ids = {item["product_id"] for item in products}
product_names = {item["product_name"] for item in products}
ingredient_ids = {item["ingredient_id"] for item in ingredients}
allergen_ids = {item["allergen_id"] for item in allergens}
add_on_ids = {item["add_on_id"] for item in add_ons}
ingredients_by_id = {item["ingredient_id"]: item for item in ingredients}
issues = []
def issue(level: str, location: str, message: str):
issues.append((level, location, message))
allowed_menu_statuses = {"Yes", "No", "Unknown"}
for item in menu:
name = item["name"]
if not re.fullmatch(r"₹\d+", item["price"]):
issue("error", f"menu:{name}", f"Price must use the format ₹123, found {item['price']!r}.")
ordering = item["ordering_information"]
for field in ("currently_available", "bestseller", "premium_item"):
if ordering[field] not in allowed_menu_statuses:
issue(
"error",
f"menu:{name}",
f"{field} must be Yes, No, or Unknown; found {ordering[field]!r}.",
)
for field, values in (
("tags", item["tags"]),
("textures", item["taste_profile"]["textures"]),
("flavours", item["taste_profile"]["flavours"]),
("best_for", item["taste_profile"]["best_for"]),
("available_add_ons", ordering["available_add_ons"]),
("recommended_pairings", ordering["recommended_pairings"]),
):
if any(not str(value).strip() for value in values):
issue("error", f"menu:{name}", f"{field} contains a blank value.")
for name, values in (
("menu names", [item["name"] for item in menu]),
("product IDs", [item["product_id"] for item in products]),
("product names", [item["product_name"] for item in products]),
("ingredient IDs", [item["ingredient_id"] for item in ingredients]),
("add-on IDs", [item["add_on_id"] for item in add_ons]),
):
for value, count in Counter(values).items():
if count > 1:
issue("error", name, f"Duplicate value: {value!r} appears {count} times.")
for missing in sorted(menu_ids - product_ids):
issue("error", "product_knowledge.json", f"Missing product knowledge for menu ID {missing!r}.")
for extra in sorted(product_ids - menu_ids):
issue("error", "product_knowledge.json", f"Unknown product ID {extra!r}.")
for missing in sorted(menu_names - product_names):
issue("error", "product_knowledge.json", f"Missing exact menu product name {missing!r}.")
products_by_name = {item["product_name"]: item for item in products}
for item in menu:
serving_numbers = [int(value) for value in re.findall(r"\d+", item["can_be_served_for"])]
if not serving_numbers:
issue("error", f"menu:{item['name']}", "can_be_served_for contains no numeric serving range.")
continue
public_min = serving_numbers[0]
public_max = serving_numbers[1] if len(serving_numbers) > 1 else public_min
product = products_by_name.get(item["name"])
if product and (
product["serving"]["serves_min"] != public_min
or product["serving"]["serves_max"] != public_max
):
issue(
"error",
f"menu:{item['name']}",
"Public can_be_served_for does not match product knowledge serves_min/serves_max.",
)
for product in products:
pid = product["product_id"]
for iid in product["composition"]["ingredient_ids"]:
if iid not in ingredient_ids:
issue("error", f"product:{pid}", f"Unknown ingredient ID {iid!r}.")
safety = product["dietary_and_safety"]
if safety["gluten_free"] != "yes":
issue("error", f"product:{pid}", "MeltRoom products must preserve the confirmed gluten-free status.")
overlap = set(safety["removable_allergen_ids"]) & set(safety["non_removable_allergen_ids"])
if overlap:
issue(
"error",
f"product:{pid}",
f"Allergens cannot be both removable and non-removable: {sorted(overlap)!r}.",
)
if "milk" in safety["allergen_ids"] and "milk" not in safety["removable_allergen_ids"]:
issue("error", f"product:{pid}", "Milk must remain marked removable by substitution.")
for field in (
"allergen_ids",
"possible_cross_contact_allergen_ids",
"removable_allergen_ids",
"non_removable_allergen_ids",
):
for aid in safety[field]:
if aid not in allergen_ids:
issue("error", f"product:{pid}", f"{field} references unknown allergen {aid!r}.")
pairing = product["pairing_and_customization"]
for field in ("compatible_add_on_ids", "recommended_add_on_ids"):
for aid in pairing[field]:
if aid not in add_on_ids:
issue("error", f"product:{pid}", f"{field} references unknown add-on {aid!r}.")
for field in ("recommended_product_ids", "not_recommended_with_product_ids"):
for other in pairing[field]:
if other not in product_ids:
issue("error", f"product:{pid}", f"{field} references unknown product {other!r}.")
comparisons = product["comparison_guidance"]
for field in (
"similar_product_ids",
"richer_than_product_ids",
"lighter_than_product_ids",
"sweeter_than_product_ids",
"less_sweet_than_product_ids",
):
for other in comparisons[field]:
if other not in product_ids:
issue("error", f"product:{pid}", f"{field} references unknown product {other!r}.")
for score_field in (
"novelty_level",
"indulgence_level",
"premium_level",
"comfort_food_score",
):
value = product["recommendation_profile"][score_field]
if value is not None and not 0 <= value <= 10:
issue("error", f"product:{pid}", f"{score_field} must be between 0 and 10, found {value}.")
for taste_field in ("sweetness", "chocolate_intensity", "richness"):
value = product["sensory_profile"][taste_field]
if value is not None and not 0 <= value <= 10:
issue("error", f"product:{pid}", f"{taste_field} must be between 0 and 10, found {value}.")
if safety["gluten_free"] == "yes" and "gluten" not in allergen_ids:
issue(
"warning",
f"product:{pid}",
"Claims gluten-free, but the allergen catalogue has no gluten entry for structured verification.",
)
for ingredient in ingredients:
iid = ingredient["ingredient_id"]
for aid in ingredient["allergen_ids"]:
if aid not in allergen_ids:
issue("error", f"ingredient:{iid}", f"Unknown allergen ID {aid!r}.")
for pid in ingredient["used_in_product_ids"]:
if pid not in product_ids:
issue("error", f"ingredient:{iid}", f"Unknown product ID {pid!r}.")
for required_ingredient in ("oat_milk", "almond_milk"):
if required_ingredient not in ingredient_ids:
issue("error", "ingredients.json", f"Missing confirmed substitution ingredient {required_ingredient!r}.")
if "almond_milk" in ingredients_by_id and "tree_nuts" not in ingredients_by_id["almond_milk"]["allergen_ids"]:
issue("error", "ingredient:almond_milk", "Almond milk must preserve its tree-nut allergen.")
if "brownie_base" in ingredients_by_id:
labels = set(ingredients_by_id["brownie_base"]["dietary_labels"])
for required_label in ("gluten_free", "guilt_free"):
if required_label not in labels:
issue("error", "ingredient:brownie_base", f"Missing confirmed dietary label {required_label!r}.")
for add_on in add_ons:
aid = add_on["add_on_id"]
for pid in add_on["compatible_product_ids"] + add_on["incompatible_product_ids"]:
if pid not in product_ids:
issue("error", f"add_on:{aid}", f"Unknown product ID {pid!r}.")
for allergen in add_on["allergen_ids"]:
if allergen not in allergen_ids:
issue("error", f"add_on:{aid}", f"Unknown allergen ID {allergen!r}.")
alias_ids = {item["product_id"] for item in language["product_aliases"]}
for missing in sorted(product_ids - alias_ids):
issue("error", "language_dictionary.json", f"Missing alias entry for {missing!r}.")
for extra in sorted(alias_ids - product_ids):
issue("error", "language_dictionary.json", f"Unknown alias product ID {extra!r}.")
strategy_names = {item["product_name"] for item in rules["product_strategy"]}
for missing in sorted(menu_names - strategy_names):
issue("error", "recommendation_rules.json", f"Missing product strategy for {missing!r}.")
for extra in sorted(strategy_names - menu_names):
issue("error", "recommendation_rules.json", f"Unknown product strategy {extra!r}.")
for faq in faqs:
faq_id = faq["faq_id"]
for pid in faq["related_product_ids"]:
if pid not in product_ids:
issue("error", f"faq:{faq_id}", f"Unknown related product ID {pid!r}.")
for iid in faq["related_ingredient_ids"]:
if iid not in ingredient_ids:
issue("error", f"faq:{faq_id}", f"Unknown related ingredient ID {iid!r}.")
policy_section = faq["related_policy_section"]
if policy_section:
value = operations
for key in policy_section.split("."):
if not isinstance(value, dict) or key not in value:
issue("error", f"faq:{faq_id}", f"Unknown related policy section {policy_section!r}.")
break
value = value[key]
for example in examples["designer_examples"]:
eid = example["example_id"]
for field in ("expected_product_ids", "forbidden_product_ids"):
for pid in example[field]:
if pid not in product_ids:
issue("error", f"example:{eid}", f"{field} references unknown product {pid!r}.")
for aid in example["expected_add_on_ids"]:
if aid not in add_on_ids:
issue("error", f"example:{eid}", f"Unknown expected add-on ID {aid!r}.")
for allergen in example["extracted_preferences"]["allergen_ids"]:
if allergen not in allergen_ids:
issue("error", f"example:{eid}", f"Unknown allergen ID {allergen!r}.")
for test in evaluation["chat_tests"]:
tid = test["test_id"]
for pid in test["must_use_product_ids"]:
if pid not in product_ids:
issue("error", f"evaluation:{tid}", f"Unknown required product ID {pid!r}.")
for test in evaluation["designer_tests"]:
tid = test["test_id"]
for field in ("required_product_ids", "forbidden_product_ids"):
for pid in test[field]:
if pid not in product_ids:
issue("error", f"evaluation:{tid}", f"{field} references unknown product {pid!r}.")
for allergen in test["must_respect_allergen_ids"]:
if allergen not in allergen_ids:
issue("error", f"evaluation:{tid}", f"Unknown allergen ID {allergen!r}.")
if operations["ordering"]["large_order_notice_hours"] != 6:
issue("error", "operations_and_policies.json", "Large-order notice must preserve the confirmed 6-hour conservative planning value.")
if not operations["delivery_and_pickup"]["delivery_available"]:
issue("error", "operations_and_policies.json", "Rapido delivery must remain available.")
if not operations["delivery_and_pickup"]["pickup_available"]:
issue("error", "operations_and_policies.json", "KLN Reddy Colony pickup must remain available.")
if "40%" not in operations["delivery_and_pickup"]["delivery_fee_policy"]:
issue("error", "operations_and_policies.json", "Delivery-fee policy must preserve MeltRoom's 40% contribution above ₹100.")
if "60 minutes" not in operations["food_safety"]["recommended_consumption_window"]:
issue("error", "operations_and_policies.json", "Best-taste consumption guidance must preserve the confirmed 60-minute window.")
if business["contact"]["phone"] != "+91 77805 34935":
issue("error", "business_profile.json", "MeltRoom contact phone does not match the verified number.")
if business["contact"]["whatsapp_url"] != "https://wa.me/917780534935":
issue("error", "business_profile.json", "MeltRoom WhatsApp URL does not match the verified number.")
for level, location, message in issues:
print(f"{level.upper():7} {location}: {message}")
print()
counts = Counter(level for level, _, _ in issues)
print(
f"Audited {len(menu)} menu items, {len(products)} product profiles, "
f"{len(ingredients)} ingredients, and {len(add_ons)} add-ons."
)
print(f"Found {counts['error']} errors and {counts['warning']} warnings.")
raise SystemExit(1 if counts["error"] else 0)