Spaces:
Running
Running
| """Tool implementations used by the agents. | |
| Two tools, both safe by construction: | |
| * :class:`QuantitiesFinder` — PuLP linear-program solver. Given a list of | |
| candidate foods (with per-100g macros and an ``estimated_g`` anchor) and | |
| the daily targets, it returns gram quantities that minimise the weighted | |
| deviation from both the daily macro targets and the per-item anchors. | |
| Optional ``meal_constraints`` express per-meal macro caps / floors. | |
| * :class:`WebSearchTool` — single-pass wrapper around Gemini's built-in | |
| ``google_search`` grounding. Gemini decides the queries, runs them, | |
| synthesises a cited answer, and returns ``grounding_metadata`` in one | |
| round-trip. Citations are appended to the answer string so downstream | |
| code can persist them with the plan. | |
| No LLM-generated code path exists anywhere in this module: clinical math | |
| runs through :mod:`nutrition_formulas` (called directly by the agents) and | |
| ``QuantitiesFinder`` is a pure LP. There is no ``eval``, no subprocess, | |
| and no filesystem or network access beyond the Gemini SDK. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from datetime import datetime | |
| from typing import Any, Dict, List | |
| from pulp import ( | |
| LpMinimize, | |
| LpProblem, | |
| LpStatus, | |
| LpVariable, | |
| PULP_CBC_CMD, | |
| lpSum, | |
| value, | |
| ) | |
| from logging_setup import get_logger | |
| from utils import save_to_json | |
| _qf_logger = get_logger("tools.quantities_finder") | |
| _web_logger = get_logger("tools.web_search") | |
| # --------------------------------------------------------------------------- | |
| # QuantitiesFinder (PuLP LP solver) | |
| # --------------------------------------------------------------------------- | |
| class QuantitiesFinder: | |
| """Linear-program solver that turns an LLM-drafted plan into precise grams. | |
| Input schema (tool_task must be a JSON string): | |
| { | |
| "foods": [ | |
| { | |
| "name": str, | |
| "calories": float, # per 100g | |
| "protein": float, # per 100g | |
| "fat": float, # per 100g | |
| "carbohydrates": float, # per 100g | |
| "estimated_g": float, # LLM's anchor weight | |
| # optional: | |
| "min_g": float, | |
| "max_g": float, | |
| "meal_group": str, | |
| "estimate_weight": float, | |
| }, | |
| ... | |
| ], | |
| "targets": { | |
| "calories": float, | |
| "protein": float, | |
| "fat": float, | |
| "carbohydrates": float, | |
| }, | |
| "meal_constraints": [ # optional | |
| {"group_name": str, "max_<nut>": float, "min_<nut>": float}, | |
| ... | |
| ] | |
| } | |
| Default per-food bounds (when ``min_g``/``max_g`` are not supplied):: | |
| min_g = max(20, estimated_g * 0.3) | |
| max_g = min(400, estimated_g * 2.5) | |
| These stop the LP from suggesting 1 g of butter or 900 g of broccoli | |
| to chase a macro target. The estimate-anchor weight defaults to 0.3 | |
| (was 0.1 in earlier revisions) so the LP must have a strong reason to | |
| drift away from the LLM's drafted serving sizes — small deviations are | |
| penalised harder, which keeps the output realistic. | |
| """ | |
| def __init__(self) -> None: | |
| pass | |
| # Priority 1: hit daily totals; Priority 2: stay close to per-item | |
| # estimates. The default estimate weight is intentionally non-trivial | |
| # so the LP cannot wander far from the LLM's draft. | |
| W_NUTRITION = 1.0 | |
| W_ESTIMATE_DEFAULT = 0.3 | |
| MIN_BOUND_FLOOR = 20.0 | |
| MAX_BOUND_CAP = 400.0 | |
| MIN_BOUND_RATIO = 0.3 | |
| MAX_BOUND_RATIO = 2.5 | |
| def _round(v: Any) -> float: | |
| if v is None: | |
| return 0.0 | |
| return round(float(v), 2) | |
| def _round_structure(obj: Any) -> Any: | |
| if isinstance(obj, dict): | |
| return {k: QuantitiesFinder._round_structure(v) for k, v in obj.items()} | |
| if isinstance(obj, list): | |
| return [QuantitiesFinder._round_structure(v) for v in obj] | |
| if isinstance(obj, (int, float)): | |
| return QuantitiesFinder._round(obj) | |
| return obj | |
| def _default_bounds(cls, est_g: float) -> tuple[float, float]: | |
| """Default ``(min_g, max_g)`` derived from ``estimated_g``.""" | |
| if est_g <= 0: | |
| return cls.MIN_BOUND_FLOOR, cls.MAX_BOUND_CAP | |
| min_g = max(cls.MIN_BOUND_FLOOR, est_g * cls.MIN_BOUND_RATIO) | |
| max_g = min(cls.MAX_BOUND_CAP, est_g * cls.MAX_BOUND_RATIO) | |
| if min_g > max_g: | |
| # Degenerate case: bounds collide. Fall back to the est anchor. | |
| min_g, max_g = max(0.0, est_g - 1), est_g + 1 | |
| return min_g, max_g | |
| def handle_task(self, task: str) -> str: | |
| _qf_logger.info("\n📊 QUANTITIES FINDER TOOL STARTED") | |
| try: | |
| data = json.loads(task) | |
| foods = data["foods"] | |
| targets = data["targets"] | |
| # 1. Validation | |
| required_nutrients = ["calories", "protein", "fat", "carbohydrates"] | |
| for food in foods: | |
| if not all( | |
| key in food for key in ["name"] + required_nutrients + ["estimated_g"] | |
| ): | |
| raise ValueError( | |
| "Each food must have name, calories, protein, fat, " | |
| "carbohydrates, and estimated_g." | |
| ) | |
| if not all(key in targets for key in required_nutrients): | |
| raise ValueError( | |
| "Targets must include calories, protein, fat, carbohydrates." | |
| ) | |
| prob = LpProblem("Nutrient_Optimization", LpMinimize) | |
| # 2. Variables (with realistic default bounds) | |
| g: Dict[str, LpVariable] = {} | |
| for food in foods: | |
| est = float(food["estimated_g"]) | |
| default_min, default_max = self._default_bounds(est) | |
| min_g = float(food.get("min_g", default_min)) | |
| max_g = float(food.get("max_g", default_max)) | |
| g[food["name"]] = LpVariable( | |
| f"g_{food['name']}", | |
| lowBound=min_g, | |
| upBound=max_g, | |
| ) | |
| # 3. Nutrition deviations. | |
| # ``g[name] * (per100 / 100)`` keeps the LpVariable on the LEFT of | |
| # the multiplication so PuLP returns an LpAffineExpression. The | |
| # earlier ``g[name] / 100 * per100`` form trips Python's operator | |
| # precedence: ``g / 100`` raises ``LpVariable / int`` which is | |
| # rejected by PuLP at expression-build time. | |
| totals = { | |
| nut: lpSum(g[f["name"]] * (float(f[nut]) / 100.0) for f in foods) | |
| for nut in required_nutrients | |
| } | |
| d_pos = {nut: LpVariable(f"d_pos_{nut}", lowBound=0) for nut in required_nutrients} | |
| d_neg = {nut: LpVariable(f"d_neg_{nut}", lowBound=0) for nut in required_nutrients} | |
| for nut in required_nutrients: | |
| prob += totals[nut] - float(targets[nut]) <= d_pos[nut] | |
| prob += float(targets[nut]) - totals[nut] <= d_neg[nut] | |
| # 3.5 Optional meal-level constraints | |
| for constraint in data.get("meal_constraints", []) or []: | |
| group_name = constraint.get("group_name") | |
| if not group_name: | |
| continue | |
| group_foods = [f for f in foods if f.get("meal_group") == group_name] | |
| if not group_foods: | |
| _qf_logger.warning("No foods found for meal_group '%s'", group_name) | |
| continue | |
| for nut in required_nutrients: | |
| meal_total = lpSum( | |
| g[f["name"]] * (float(f[nut]) / 100.0) for f in group_foods | |
| ) | |
| if (max_val := constraint.get(f"max_{nut}")) is not None: | |
| prob += (meal_total <= max_val, f"Meal_{group_name}_max_{nut}") | |
| if (min_val := constraint.get(f"min_{nut}")) is not None: | |
| prob += (meal_total >= min_val, f"Meal_{group_name}_min_{nut}") | |
| # 4. Estimate deviations (per-item soft anchor) | |
| dev_est_pos = {f["name"]: LpVariable(f"dev_est_pos_{f['name']}", lowBound=0) for f in foods} | |
| dev_est_neg = {f["name"]: LpVariable(f"dev_est_neg_{f['name']}", lowBound=0) for f in foods} | |
| for food in foods: | |
| name = food["name"] | |
| est = float(food["estimated_g"]) | |
| prob += g[name] - est <= dev_est_pos[name] | |
| prob += est - g[name] <= dev_est_neg[name] | |
| # 5. Objective | |
| nutrition_objective = lpSum( | |
| (d_pos[nut] + d_neg[nut]) / max(float(targets[nut]), 1.0) | |
| for nut in required_nutrients | |
| ) | |
| estimate_objective = lpSum( | |
| float(f.get("estimate_weight", self.W_ESTIMATE_DEFAULT)) | |
| * (dev_est_pos[f["name"]] + dev_est_neg[f["name"]]) | |
| / max(float(f["estimated_g"]), 1.0) | |
| for f in foods | |
| if float(f["estimated_g"]) > 0 | |
| ) | |
| prob += (self.W_NUTRITION * nutrition_objective) + estimate_objective | |
| # 6. Solve | |
| prob.solve(PULP_CBC_CMD(msg=0)) | |
| if LpStatus[prob.status] != "Optimal": | |
| raise ValueError( | |
| "No optimal solution found (problem may be infeasible). " | |
| "Check your targets and constraints." | |
| ) | |
| quantities = {name: value(g[name]) for name in g} | |
| achieved = {nut: value(totals[nut]) for nut in required_nutrients} | |
| result = QuantitiesFinder._round_structure( | |
| {"quantities": quantities, "achieved": achieved} | |
| ) | |
| _qf_logger.info("Solution Status: %s", LpStatus[prob.status]) | |
| _qf_logger.info("Quantities (g): %s", json.dumps(result["quantities"], indent=2)) | |
| _qf_logger.info( | |
| "Achieved Nutrition (around): %s", | |
| json.dumps(result["achieved"], indent=2), | |
| ) | |
| _qf_logger.info( | |
| "Target Nutrition: %s", | |
| json.dumps(QuantitiesFinder._round_structure(targets), indent=2), | |
| ) | |
| _qf_logger.info("\n📊 QUANTITIES FINDER COMPLETED") | |
| return json.dumps(result) | |
| except Exception as e: # noqa: BLE001 | |
| _qf_logger.error("QuantitiesFinder Error: %s", str(e)) | |
| return json.dumps({"error": str(e)}) | |
| # --------------------------------------------------------------------------- | |
| # WebSearchTool (Gemini google_search grounding) | |
| # --------------------------------------------------------------------------- | |
| class WebSearchTool: | |
| """Single-pass grounded web search. | |
| Backed by Gemini's built-in ``google_search`` tool: one round-trip in | |
| which Gemini decides which queries to run, searches Google for them, | |
| synthesises a cited answer, and returns ``groundingMetadata``. No | |
| third-party search provider, no separate query-generation or synthesis | |
| pass — the model owns the whole loop. | |
| The injected ``llm_instance`` must expose | |
| :meth:`utils.GeminiLLM.call_grounded`. In tests, the ``MockLLM`` fixture | |
| can stub the same surface. | |
| """ | |
| _SYSTEM_INSTRUCTION = ( | |
| "You are a nutrition / clinical research assistant. Answer the " | |
| "question below using up-to-date sources you can find via Google " | |
| "Search. Prefer authoritative domains (WHO, USDA / FDC, EFSA, NICE, " | |
| "ADA, NIH, MedlinePlus, peer-reviewed journals, government health " | |
| "agencies). Return a concise, factual answer; cite source URLs " | |
| "inline. If the question asks for nutrition facts, give per-100g " | |
| "values for calories, protein, fat, and carbohydrates when available." | |
| ) | |
| def __init__(self, llm_instance: Any) -> None: | |
| self.llm = llm_instance | |
| def handle_task(self, research_task: str) -> str: | |
| _web_logger.info("\n🌐 WEB SEARCH TOOL STARTED") | |
| question = self._extract_question(research_task) | |
| prompt = f"{self._SYSTEM_INSTRUCTION}\n\nQuestion: {question}\n\nAnswer:" | |
| if not hasattr(self.llm, "call_grounded"): | |
| msg = ( | |
| "WebSearchTool requires a GeminiLLM with call_grounded(); " | |
| f"got {type(self.llm).__name__}." | |
| ) | |
| _web_logger.error(msg) | |
| return msg | |
| text, citations, queries = self.llm.call_grounded(prompt) | |
| answer = self._append_sources(text, citations) | |
| timestamp = datetime.now().isoformat() | |
| save_to_json( | |
| { | |
| "input": research_task, | |
| "question": question, | |
| "queries_run": queries, | |
| "answer": answer, | |
| "citations": citations, | |
| "timestamp": timestamp, | |
| }, | |
| f"web_search_tool_{timestamp}.json", | |
| subdirectory="WebSearchTool", | |
| ) | |
| _web_logger.info( | |
| "🌐 WEB SEARCH TOOL completed (%d citations, queries=%s)", | |
| len(citations), | |
| queries, | |
| ) | |
| return answer | |
| # ------------------------------------------------------------------ | |
| def _extract_question(task: str) -> str: | |
| """Accept legacy ``{"queries": [...]}`` JSON or a free-form string.""" | |
| try: | |
| data = json.loads(task) | |
| except (json.JSONDecodeError, TypeError): | |
| return task | |
| if isinstance(data, dict): | |
| if isinstance(data.get("queries"), list) and data["queries"]: | |
| return " | ".join(str(q) for q in data["queries"]) | |
| if isinstance(data.get("query"), str): | |
| return data["query"] | |
| if isinstance(data.get("question"), str): | |
| return data["question"] | |
| return task | |
| def _append_sources(text: str, citations: List[Dict[str, str]]) -> str: | |
| if not citations: | |
| return text | |
| seen: set[str] = set() | |
| lines: List[str] = [] | |
| for c in citations: | |
| uri = c.get("uri", "") | |
| if not uri or uri in seen: | |
| continue | |
| seen.add(uri) | |
| title = c.get("title") or uri | |
| lines.append(f"- [{title}]({uri})") | |
| if not lines: | |
| return text | |
| return f"{text}\n\nSources:\n" + "\n".join(lines) | |
| __all__ = ["QuantitiesFinder", "WebSearchTool"] | |