import json import os from datetime import datetime from typing import Any, Dict, List, Optional from IPython.display import Markdown, display from agents import CoachAgent, MedicalAssessmentAgent, PlannerAgent from config import set_settings from logging_setup import get_logger, refresh_level from memory import LongTermMemory from state import initialize_empty_memory from tools import QuantitiesFinder, WebSearchTool from utils import APIPoolManager, create_llm from workflow import setup_workflow as setup_workflow_workflow _logger = get_logger("mealgraph") def debug(level: str = "full", scopes: Optional[Dict[str, List[str]]] = None) -> None: """Enable debug mode with the given level and scopes. Args: level: 'full' (default) to show inputs and outputs, or 'output' to show only outputs. scopes: Optional dict like ``{'agents': ['all'], 'tools': ['QuantitiesFinder']}``. If None, defaults to all agents and tools. """ if scopes is None: scopes = {"agents": ["all"], "tools": ["all"]} set_settings(debug_mode=True, debug_level=level, debug_scopes=scopes) refresh_level() def logging(log_dir: Optional[str] = None, persistence_dir: Optional[str] = None) -> None: # noqa: A001 - public name kept for backwards compat """Set directories for log files and LangGraph checkpoint persistence. If ``log_dir`` is provided, agent/tool I/O is dumped there as JSON. If ``persistence_dir`` is provided, LangGraph checkpoints are persisted to disk. If neither is set, logging is disabled and persistence is in-memory. """ updates: Dict[str, Any] = {} if log_dir is not None: os.makedirs(log_dir, exist_ok=True) updates["log_dir"] = log_dir if persistence_dir is not None: os.makedirs(persistence_dir, exist_ok=True) updates["persistence_dir"] = persistence_dir if updates: set_settings(**updates) # Default model configurations (without API keys, as they will be provided # by the user). Five LLM slots — no separate validation_agent any more # since the Validator was folded into the Planner / Coach. # # Targets the Gemini 3.x family via the rolling "*-latest" aliases: # - gemini-pro-latest (deep reasoning: Coach / Medical / Planner) # - gemini-flash-latest (mid-tier; reserved for overrides) # - gemini-flash-lite-latest (cheapest; tools + simulator) DEFAULT_MODEL_CONFIGS = { "main": { "type": "gemini", "model_name": "gemini-pro-latest", "structured_output": True, "thinking_budget": 600, "params": {"max_tokens": 5120, "temperature": 0.3}, }, "agents_llm": { "type": "gemini", "model_name": "gemini-pro-latest", "structured_output": True, "thinking_budget": 600, "params": {"max_tokens": 5120, "temperature": 0.3}, }, "planner_agent": { "type": "gemini", "model_name": "gemini-pro-latest", "structured_output": True, "thinking_budget": 600, "params": {"max_tokens": 5120, "temperature": 0.3}, }, "tools_llm": { "type": "gemini", "model_name": "gemini-flash-lite-latest", "structured_output": False, "thinking_budget": 600, "params": {"max_tokens": 5120, "temperature": 0.3}, }, "user_simulator": { "type": "gemini", "model_name": "gemini-flash-lite-latest", "structured_output": False, "thinking_budget": 300, "params": {"max_tokens": 5120, "temperature": 0.5}, }, } # Global variables to hold the system components LLM_INSTANCES = None TOOLS = None AGENTS = None APP = None def create_llm_instances( api_keys: list[str], model_overrides: Optional[Dict[str, Any]] = None, enable_rate_limiting: bool = True, ): """Create LLM instances using provided API keys list and optional model overrides. Args: api_keys: List of API keys to cycle through. model_overrides: Optional overrides for model configs. enable_rate_limiting: If True, apply rate limits (default). If False, disable rate limiting and just cycle keys. """ global LLM_INSTANCES if not api_keys: raise ValueError("At least one API key must be provided.") if enable_rate_limiting: rate_limits = { "gemini-pro-latest": (5, 100), "gemini-flash-latest": (10, 250), "gemini-flash-lite-latest": (15, 500), } else: rate_limits = None manager = APIPoolManager(api_keys, rate_limits) _logger.info( "APIPoolManager initialized with %s and %d API keys.", "rate limiting enabled" if enable_rate_limiting else "rate limiting disabled", len(api_keys), ) # ``cfg`` (not ``config``) — the latter would shadow the imported # :mod:`config` module inside the loop body. model_configs: Dict[str, Dict[str, Any]] = {} for key in DEFAULT_MODEL_CONFIGS: cfg = DEFAULT_MODEL_CONFIGS[key].copy() if model_overrides and key in model_overrides: override = model_overrides[key] if "model_name" in override: cfg["model_name"] = override["model_name"] if "params" in override: cfg["params"] = {**cfg.get("params", {}), **override["params"]} model_configs[key] = cfg LLM_INSTANCES = { "main": create_llm(model_configs["main"], manager), "agents_llm": create_llm(model_configs["agents_llm"], manager), "planner_agent": create_llm(model_configs["planner_agent"], manager), "tools_llm": create_llm(model_configs["tools_llm"], manager), "user_simulator": create_llm(model_configs["user_simulator"], manager), } def initialize_tools(): """Initialize tools using the LLM instances.""" global TOOLS if LLM_INSTANCES is None: raise RuntimeError("LLM instances must be created before initializing tools.") TOOLS_LLM = LLM_INSTANCES["tools_llm"] TOOLS = { "WebSearchTool": WebSearchTool(TOOLS_LLM), "QuantitiesFinder": QuantitiesFinder(), } def initialize_agents(): """Initialize agents using the LLM instances and tools.""" global AGENTS if LLM_INSTANCES is None or TOOLS is None: raise RuntimeError("LLM instances and tools must be initialized before agents.") MAIN_LLM = LLM_INSTANCES["main"] AGENTS_LLM = LLM_INSTANCES["agents_llm"] PLANNER_LLM = LLM_INSTANCES["planner_agent"] AGENTS = { "CoachAgent": CoachAgent(MAIN_LLM), "MedicalAssessmentAgent": MedicalAssessmentAgent( AGENTS_LLM, TOOLS["WebSearchTool"] ), "PlannerAgent": PlannerAgent( PLANNER_LLM, TOOLS["WebSearchTool"], TOOLS["QuantitiesFinder"] ), } # --------------------------------------------------------------------------- # Long-term memory singleton # --------------------------------------------------------------------------- LONG_TERM_MEMORY: Optional[LongTermMemory] = None def initialize_long_term_memory(db_path: Optional[str] = None) -> LongTermMemory: """Initialise the SQLite-backed three-tier memory. Pass a file path for cross-session persistence, or omit for an in-memory DB (default; tests / ephemeral demos). """ global LONG_TERM_MEMORY LONG_TERM_MEMORY = LongTermMemory(db_path=db_path) _logger.info("Long-term memory initialised at %s", db_path or ":memory:") return LONG_TERM_MEMORY def setup_workflow(): global APP if AGENTS is None or TOOLS is None: raise RuntimeError("Agents and tools must be initialized before setting up workflow.") APP = setup_workflow_workflow(AGENTS["CoachAgent"], AGENTS, TOOLS) class UserSimulator: def __init__(self, llm, user_profile, medical_history): self.llm = llm self.user_data = { "user_profile": user_profile, "medical_history": medical_history, } def get_response(self, assistant_message): user_data_str = json.dumps(self.user_data, indent=2) sim_prompt = f"""You are a user interacting with a nutrition app. Here is your profile and medical history: {user_data_str} The app has asked: {assistant_message} Provide a realistic answer based on your profile and medical history. Your response: """ response = self.llm(sim_prompt)[0] return response def initialize_user_data(): """Collect user information interactively to initialize memory.""" print("Let's collect some information about you to personalize your experience.") print("\nFirst, your demographics and anthropometrics:") name = input("What is your name? ") age = float(input("How old are you? (in years) ")) sex = input("What is your sex? (male/female) ") height = float(input("What is your height? (in cm) ")) weight = float(input("What is your weight? (in kg) ")) print("\nNext, about your lifestyle and goals:") activity_level = input( "What is your activity level? (e.g., sedentary, lightly active, " "moderately active, very active, extra active) " ) goal = input( "What is your primary goal? (e.g., lose weight, maintain weight, gain muscle) " ) job = input("What is your job or daily routine? ") print("\nDietary preferences:") dietary_restrictions = input( "Any dietary restrictions? (e.g., vegetarian, vegan, keto) " ) food_likes = input("Favorite foods? ") food_dislikes = input("Foods you dislike? ") allergies_input = input("Any allergies? (comma-separated) ") allergies_list = ( [a.strip() for a in allergies_input.split(",") if a.strip()] if allergies_input else [] ) print("\nLocation and budget:") country = input("Which country are you in? ") currency = input("Preferred currency? (e.g., USD, EGP) ") print("\nMedical history:") conditions_input = input("Any medical conditions? (comma-separated) ") conditions_list = ( [c.strip() for c in conditions_input.split(",") if c.strip()] if conditions_input else [] ) medications_input = input("Current medications? (comma-separated) ") medications_list = ( [m.strip() for m in medications_input.split(",") if m.strip()] if medications_input else [] ) past_issues_input = input("Past health issues? (comma-separated) ") past_issues_list = ( [p.strip() for p in past_issues_input.split(",") if p.strip()] if past_issues_input else [] ) lab_results = input("Any recent lab results? (e.g., cholesterol levels) ") user_profile = { "name": name, "age": age, "sex": sex, "height": height, "weight": weight, "activity_level": activity_level, "goal": goal, "job": job, "dietary_restrictions": dietary_restrictions, "food_likes": food_likes, "food_dislikes": food_dislikes, "allergies": allergies_list, "country": country, "currency": currency, "last_updated": datetime.now().isoformat(), } medical_history = { "conditions": conditions_list, "medications": medications_list, "past_issues": past_issues_list, "lab_results": lab_results, "last_updated": datetime.now().isoformat(), } memory = initialize_empty_memory() memory["user_profile"] = user_profile memory["medical_history"] = medical_history return memory def run(simulate=False, simulated_users=None): """Run the system in either simulation or interactive mode.""" if APP is None: raise RuntimeError("Workflow must be set up before running the system.") if simulate: print("\n" + "=" * 80) print("STARTING SIMULATION MODE") print("=" * 80) if not simulated_users: raise ValueError("simulated_users must be provided when simulate=True") for user in simulated_users: print(f"\nProcessing user: {user['user_profile']['name']}") user["user_profile"]["last_updated"] = datetime.now().isoformat() user["medical_history"]["last_updated"] = datetime.now().isoformat() memory = { "user_profile": user["user_profile"], "medical_history": user["medical_history"], "flags_and_assessments": {}, "plans": {}, } conversation_history = [] previous_actions = [] user_simulator = UserSimulator( LLM_INSTANCES["user_simulator"], user["user_profile"], user["medical_history"], ) for question in user["questions"]: print(f"\n🙍🏻‍♂️Asking: {question}") state = { "memory": memory, "user_question": question, "conversation_history": conversation_history + [{"role": "user", "content": question}], "current_action": None, "agent_result": None, "num_turns": 0, "max_turns": 10, "previous_actions": previous_actions, "response_steps": [], } while True: final_state = APP.invoke( state, config={ "configurable": { "thread_id": f"user_{user['user_profile']['name']}" } }, ) if final_state["num_turns"] >= final_state["max_turns"]: print("Max turns reached without composing a response.") break last_action = final_state["current_action"]["action"] if last_action == "compose_response": print(f"\n{'='*60}") display(Markdown(f"**Answer:**\n\n{final_state['agent_result']}")) conversation_history = final_state["conversation_history"] memory = final_state["memory"] previous_actions = final_state["previous_actions"] break elif last_action == "ask_user": prompt = final_state["agent_result"] print(f"System asks: {prompt}") response = user_simulator.get_response(prompt) print(f"Simulated user responds: {response}") state = { "memory": memory, "user_question": question, "conversation_history": conversation_history + [{"role": "user", "content": question}], "current_action": None, "agent_result": None, "num_turns": 0, "max_turns": 10, "previous_actions": previous_actions, "response_steps": [], } else: print(f"Unexpected action: {last_action}") break else: print("\n" + "=" * 80) print("STARTING INTERACTIVE MODE") print("=" * 80) memory = initialize_user_data() print("\n" + "=" * 80) print("WELCOME TO THE NUTRITION APP") print("=" * 80) initial_state = { "memory": memory, "user_question": "welcome", "conversation_history": [], "current_action": None, "agent_result": None, "num_turns": 0, "max_turns": 10, "previous_actions": [], "response_steps": [], } final_state = APP.invoke(initial_state, config={"configurable": {"thread_id": "user1"}}) if final_state["agent_result"]: display(Markdown(f"\n🤖 Coach: {final_state['agent_result']}")) memory = final_state["memory"] conversation_history = final_state["conversation_history"] while True: q = input("\n❓ Your question: ") if q.lower() == "exit": break state = { "memory": final_state["memory"], "user_question": q, "conversation_history": final_state["conversation_history"] + [{"role": "user", "content": q}], "current_action": None, "agent_result": None, "num_turns": 0, "max_turns": 10, "previous_actions": final_state["previous_actions"], "response_steps": [], } final_state = APP.invoke(state, config={"configurable": {"thread_id": "user1"}}) if final_state["agent_result"]: print(f"\n{'='*60}") display(Markdown(f"\n🤖 Coach: {final_state['agent_result']}")) memory = final_state["memory"] conversation_history = final_state["conversation_history"]