Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from collections import Counter | |
| from pathlib import Path | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DATA = ROOT / "data" | |
| MELTMIND = DATA / "meltmind" | |
| def load(path: Path): | |
| return json.loads(path.read_text()) | |
| def slug(value: str) -> str: | |
| return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_") | |
| menu = load(DATA / "menu.json") | |
| products = load(MELTMIND / "product_knowledge.json")["products"] | |
| ingredients_data = load(MELTMIND / "ingredients.json") | |
| ingredients = ingredients_data["ingredients"] | |
| allergens = ingredients_data["allergen_catalogue"] | |
| add_ons = load(MELTMIND / "add_ons.json")["add_ons"] | |
| language = load(MELTMIND / "language_dictionary.json") | |
| rules = load(MELTMIND / "recommendation_rules.json") | |
| faqs = load(MELTMIND / "faq.json")["faqs"] | |
| examples = load(MELTMIND / "customer_examples.json") | |
| evaluation = load(MELTMIND / "evaluation_cases.json") | |
| operations = load(MELTMIND / "operations_and_policies.json") | |
| business = load(MELTMIND / "business_profile.json") | |
| menu_names = {item["name"] for item in menu} | |
| menu_ids = {slug(item["name"]) for item in menu} | |
| product_ids = {item["product_id"] for item in products} | |
| product_names = {item["product_name"] for item in products} | |
| ingredient_ids = {item["ingredient_id"] for item in ingredients} | |
| allergen_ids = {item["allergen_id"] for item in allergens} | |
| add_on_ids = {item["add_on_id"] for item in add_ons} | |
| ingredients_by_id = {item["ingredient_id"]: item for item in ingredients} | |
| issues = [] | |
| def issue(level: str, location: str, message: str): | |
| issues.append((level, location, message)) | |
| allowed_menu_statuses = {"Yes", "No", "Unknown"} | |
| for item in menu: | |
| name = item["name"] | |
| if not re.fullmatch(r"₹\d+", item["price"]): | |
| issue("error", f"menu:{name}", f"Price must use the format ₹123, found {item['price']!r}.") | |
| ordering = item["ordering_information"] | |
| for field in ("currently_available", "bestseller", "premium_item"): | |
| if ordering[field] not in allowed_menu_statuses: | |
| issue( | |
| "error", | |
| f"menu:{name}", | |
| f"{field} must be Yes, No, or Unknown; found {ordering[field]!r}.", | |
| ) | |
| for field, values in ( | |
| ("tags", item["tags"]), | |
| ("textures", item["taste_profile"]["textures"]), | |
| ("flavours", item["taste_profile"]["flavours"]), | |
| ("best_for", item["taste_profile"]["best_for"]), | |
| ("available_add_ons", ordering["available_add_ons"]), | |
| ("recommended_pairings", ordering["recommended_pairings"]), | |
| ): | |
| if any(not str(value).strip() for value in values): | |
| issue("error", f"menu:{name}", f"{field} contains a blank value.") | |
| for name, values in ( | |
| ("menu names", [item["name"] for item in menu]), | |
| ("product IDs", [item["product_id"] for item in products]), | |
| ("product names", [item["product_name"] for item in products]), | |
| ("ingredient IDs", [item["ingredient_id"] for item in ingredients]), | |
| ("add-on IDs", [item["add_on_id"] for item in add_ons]), | |
| ): | |
| for value, count in Counter(values).items(): | |
| if count > 1: | |
| issue("error", name, f"Duplicate value: {value!r} appears {count} times.") | |
| for missing in sorted(menu_ids - product_ids): | |
| issue("error", "product_knowledge.json", f"Missing product knowledge for menu ID {missing!r}.") | |
| for extra in sorted(product_ids - menu_ids): | |
| issue("error", "product_knowledge.json", f"Unknown product ID {extra!r}.") | |
| for missing in sorted(menu_names - product_names): | |
| issue("error", "product_knowledge.json", f"Missing exact menu product name {missing!r}.") | |
| products_by_name = {item["product_name"]: item for item in products} | |
| for item in menu: | |
| serving_numbers = [int(value) for value in re.findall(r"\d+", item["can_be_served_for"])] | |
| if not serving_numbers: | |
| issue("error", f"menu:{item['name']}", "can_be_served_for contains no numeric serving range.") | |
| continue | |
| public_min = serving_numbers[0] | |
| public_max = serving_numbers[1] if len(serving_numbers) > 1 else public_min | |
| product = products_by_name.get(item["name"]) | |
| if product and ( | |
| product["serving"]["serves_min"] != public_min | |
| or product["serving"]["serves_max"] != public_max | |
| ): | |
| issue( | |
| "error", | |
| f"menu:{item['name']}", | |
| "Public can_be_served_for does not match product knowledge serves_min/serves_max.", | |
| ) | |
| for product in products: | |
| pid = product["product_id"] | |
| for iid in product["composition"]["ingredient_ids"]: | |
| if iid not in ingredient_ids: | |
| issue("error", f"product:{pid}", f"Unknown ingredient ID {iid!r}.") | |
| safety = product["dietary_and_safety"] | |
| if safety["gluten_free"] != "yes": | |
| issue("error", f"product:{pid}", "MeltRoom products must preserve the confirmed gluten-free status.") | |
| overlap = set(safety["removable_allergen_ids"]) & set(safety["non_removable_allergen_ids"]) | |
| if overlap: | |
| issue( | |
| "error", | |
| f"product:{pid}", | |
| f"Allergens cannot be both removable and non-removable: {sorted(overlap)!r}.", | |
| ) | |
| if "milk" in safety["allergen_ids"] and "milk" not in safety["removable_allergen_ids"]: | |
| issue("error", f"product:{pid}", "Milk must remain marked removable by substitution.") | |
| for field in ( | |
| "allergen_ids", | |
| "possible_cross_contact_allergen_ids", | |
| "removable_allergen_ids", | |
| "non_removable_allergen_ids", | |
| ): | |
| for aid in safety[field]: | |
| if aid not in allergen_ids: | |
| issue("error", f"product:{pid}", f"{field} references unknown allergen {aid!r}.") | |
| pairing = product["pairing_and_customization"] | |
| for field in ("compatible_add_on_ids", "recommended_add_on_ids"): | |
| for aid in pairing[field]: | |
| if aid not in add_on_ids: | |
| issue("error", f"product:{pid}", f"{field} references unknown add-on {aid!r}.") | |
| for field in ("recommended_product_ids", "not_recommended_with_product_ids"): | |
| for other in pairing[field]: | |
| if other not in product_ids: | |
| issue("error", f"product:{pid}", f"{field} references unknown product {other!r}.") | |
| comparisons = product["comparison_guidance"] | |
| for field in ( | |
| "similar_product_ids", | |
| "richer_than_product_ids", | |
| "lighter_than_product_ids", | |
| "sweeter_than_product_ids", | |
| "less_sweet_than_product_ids", | |
| ): | |
| for other in comparisons[field]: | |
| if other not in product_ids: | |
| issue("error", f"product:{pid}", f"{field} references unknown product {other!r}.") | |
| for score_field in ( | |
| "novelty_level", | |
| "indulgence_level", | |
| "premium_level", | |
| "comfort_food_score", | |
| ): | |
| value = product["recommendation_profile"][score_field] | |
| if value is not None and not 0 <= value <= 10: | |
| issue("error", f"product:{pid}", f"{score_field} must be between 0 and 10, found {value}.") | |
| for taste_field in ("sweetness", "chocolate_intensity", "richness"): | |
| value = product["sensory_profile"][taste_field] | |
| if value is not None and not 0 <= value <= 10: | |
| issue("error", f"product:{pid}", f"{taste_field} must be between 0 and 10, found {value}.") | |
| if safety["gluten_free"] == "yes" and "gluten" not in allergen_ids: | |
| issue( | |
| "warning", | |
| f"product:{pid}", | |
| "Claims gluten-free, but the allergen catalogue has no gluten entry for structured verification.", | |
| ) | |
| for ingredient in ingredients: | |
| iid = ingredient["ingredient_id"] | |
| for aid in ingredient["allergen_ids"]: | |
| if aid not in allergen_ids: | |
| issue("error", f"ingredient:{iid}", f"Unknown allergen ID {aid!r}.") | |
| for pid in ingredient["used_in_product_ids"]: | |
| if pid not in product_ids: | |
| issue("error", f"ingredient:{iid}", f"Unknown product ID {pid!r}.") | |
| for required_ingredient in ("oat_milk", "almond_milk"): | |
| if required_ingredient not in ingredient_ids: | |
| issue("error", "ingredients.json", f"Missing confirmed substitution ingredient {required_ingredient!r}.") | |
| if "almond_milk" in ingredients_by_id and "tree_nuts" not in ingredients_by_id["almond_milk"]["allergen_ids"]: | |
| issue("error", "ingredient:almond_milk", "Almond milk must preserve its tree-nut allergen.") | |
| if "brownie_base" in ingredients_by_id: | |
| labels = set(ingredients_by_id["brownie_base"]["dietary_labels"]) | |
| for required_label in ("gluten_free", "guilt_free"): | |
| if required_label not in labels: | |
| issue("error", "ingredient:brownie_base", f"Missing confirmed dietary label {required_label!r}.") | |
| for add_on in add_ons: | |
| aid = add_on["add_on_id"] | |
| for pid in add_on["compatible_product_ids"] + add_on["incompatible_product_ids"]: | |
| if pid not in product_ids: | |
| issue("error", f"add_on:{aid}", f"Unknown product ID {pid!r}.") | |
| for allergen in add_on["allergen_ids"]: | |
| if allergen not in allergen_ids: | |
| issue("error", f"add_on:{aid}", f"Unknown allergen ID {allergen!r}.") | |
| alias_ids = {item["product_id"] for item in language["product_aliases"]} | |
| for missing in sorted(product_ids - alias_ids): | |
| issue("error", "language_dictionary.json", f"Missing alias entry for {missing!r}.") | |
| for extra in sorted(alias_ids - product_ids): | |
| issue("error", "language_dictionary.json", f"Unknown alias product ID {extra!r}.") | |
| strategy_names = {item["product_name"] for item in rules["product_strategy"]} | |
| for missing in sorted(menu_names - strategy_names): | |
| issue("error", "recommendation_rules.json", f"Missing product strategy for {missing!r}.") | |
| for extra in sorted(strategy_names - menu_names): | |
| issue("error", "recommendation_rules.json", f"Unknown product strategy {extra!r}.") | |
| for faq in faqs: | |
| faq_id = faq["faq_id"] | |
| for pid in faq["related_product_ids"]: | |
| if pid not in product_ids: | |
| issue("error", f"faq:{faq_id}", f"Unknown related product ID {pid!r}.") | |
| for iid in faq["related_ingredient_ids"]: | |
| if iid not in ingredient_ids: | |
| issue("error", f"faq:{faq_id}", f"Unknown related ingredient ID {iid!r}.") | |
| policy_section = faq["related_policy_section"] | |
| if policy_section: | |
| value = operations | |
| for key in policy_section.split("."): | |
| if not isinstance(value, dict) or key not in value: | |
| issue("error", f"faq:{faq_id}", f"Unknown related policy section {policy_section!r}.") | |
| break | |
| value = value[key] | |
| for example in examples["designer_examples"]: | |
| eid = example["example_id"] | |
| for field in ("expected_product_ids", "forbidden_product_ids"): | |
| for pid in example[field]: | |
| if pid not in product_ids: | |
| issue("error", f"example:{eid}", f"{field} references unknown product {pid!r}.") | |
| for aid in example["expected_add_on_ids"]: | |
| if aid not in add_on_ids: | |
| issue("error", f"example:{eid}", f"Unknown expected add-on ID {aid!r}.") | |
| for allergen in example["extracted_preferences"]["allergen_ids"]: | |
| if allergen not in allergen_ids: | |
| issue("error", f"example:{eid}", f"Unknown allergen ID {allergen!r}.") | |
| for test in evaluation["chat_tests"]: | |
| tid = test["test_id"] | |
| for pid in test["must_use_product_ids"]: | |
| if pid not in product_ids: | |
| issue("error", f"evaluation:{tid}", f"Unknown required product ID {pid!r}.") | |
| for test in evaluation["designer_tests"]: | |
| tid = test["test_id"] | |
| for field in ("required_product_ids", "forbidden_product_ids"): | |
| for pid in test[field]: | |
| if pid not in product_ids: | |
| issue("error", f"evaluation:{tid}", f"{field} references unknown product {pid!r}.") | |
| for allergen in test["must_respect_allergen_ids"]: | |
| if allergen not in allergen_ids: | |
| issue("error", f"evaluation:{tid}", f"Unknown allergen ID {allergen!r}.") | |
| if operations["ordering"]["large_order_notice_hours"] != 6: | |
| issue("error", "operations_and_policies.json", "Large-order notice must preserve the confirmed 6-hour conservative planning value.") | |
| if not operations["delivery_and_pickup"]["delivery_available"]: | |
| issue("error", "operations_and_policies.json", "Rapido delivery must remain available.") | |
| if not operations["delivery_and_pickup"]["pickup_available"]: | |
| issue("error", "operations_and_policies.json", "KLN Reddy Colony pickup must remain available.") | |
| if "40%" not in operations["delivery_and_pickup"]["delivery_fee_policy"]: | |
| issue("error", "operations_and_policies.json", "Delivery-fee policy must preserve MeltRoom's 40% contribution above ₹100.") | |
| if "60 minutes" not in operations["food_safety"]["recommended_consumption_window"]: | |
| issue("error", "operations_and_policies.json", "Best-taste consumption guidance must preserve the confirmed 60-minute window.") | |
| if business["contact"]["phone"] != "+91 77805 34935": | |
| issue("error", "business_profile.json", "MeltRoom contact phone does not match the verified number.") | |
| if business["contact"]["whatsapp_url"] != "https://wa.me/917780534935": | |
| issue("error", "business_profile.json", "MeltRoom WhatsApp URL does not match the verified number.") | |
| for level, location, message in issues: | |
| print(f"{level.upper():7} {location}: {message}") | |
| print() | |
| counts = Counter(level for level, _, _ in issues) | |
| print( | |
| f"Audited {len(menu)} menu items, {len(products)} product profiles, " | |
| f"{len(ingredients)} ingredients, and {len(add_ons)} add-ons." | |
| ) | |
| print(f"Found {counts['error']} errors and {counts['warning']} warnings.") | |
| raise SystemExit(1 if counts["error"] else 0) | |