""" All graph node functions for the StateGraph pipeline. Each node takes an AgentState dict and returns a partial state update. """ from __future__ import annotations import logging from datetime import date from typing import Any, Dict, List, Optional from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from langchain_core.runnables import RunnableConfig from src.agents.config import Config from src.agents.metadata import metadata from src.agents.mode_directives import get_generic_directive, get_agent_directive from src.agents.routing.semantic_router import SemanticRouter from src.agents.routing.pre_extractor import pre_extract from src.agents.memory.windowing import prepare_context from src.agents.validation.preflight import run_preflight from src.graphs.state import AgentState from src.graphs.utils import ( build_defi_guidance, build_metadata, build_preflight_params, build_swap_detection_terms, build_lending_detection_terms, detect_pending_followups, extract_response_from_graph, get_text_content, is_swap_like_request, ) # --- Agent imports --- from src.agents.crypto_data.agent import CryptoDataAgent from src.agents.database.agent import DatabaseAgent from src.agents.default.agent import DefaultAgent from src.agents.swap.agent import SwapAgent from src.agents.swap.tools import swap_session from src.agents.swap.prompt import SWAP_AGENT_SYSTEM_PROMPT from src.agents.dca.agent import DcaAgent from src.agents.dca.tools import dca_session from src.agents.dca.prompt import DCA_AGENT_SYSTEM_PROMPT from src.agents.lending.agent import LendingAgent from src.agents.lending.tools import lending_session from src.agents.lending.prompt import LENDING_AGENT_SYSTEM_PROMPT from src.agents.staking.agent import StakingAgent from src.agents.staking.tools import staking_session from src.agents.staking.prompt import STAKING_AGENT_SYSTEM_PROMPT from src.agents.strategy.agent import StrategyAgent from src.agents.strategy.tools import strategy_session from src.agents.strategy.prompt import STRATEGY_AGENT_SYSTEM_PROMPT from src.agents.liquidity.agent import LiquidityAgent from src.agents.liquidity.tools import liquidity_session from src.agents.liquidity.prompt import LIQUIDITY_AGENT_SYSTEM_PROMPT from src.agents.search.agent import SearchAgent from src.agents.portfolio.agent import PortfolioAdvisorAgent from src.agents.portfolio.tools import portfolio_session from src.agents.portfolio.prompt import PORTFOLIO_ADVISOR_SYSTEM_PROMPT from src.agents.database.client import is_database_available logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Module-level singletons (initialised once at startup) # --------------------------------------------------------------------------- _agents: Dict[str, Any] = {} _reasoning_agents: Dict[str, Any] = {} # Lazy-built reasoning-tier agents _semantic_router: Optional[SemanticRouter] = None _swap_network_terms: set = set() _swap_token_terms: set = set() _lending_network_terms: set = set() _lending_asset_terms: set = set() # Maps agent keys to their builder classes for lazy reasoning agent creation _AGENT_BUILDERS: Dict[str, type] = {} def _get_agent(agent_key: str, mode: str = "fast"): """Return the agent runnable for the given key and response mode. Fast mode returns the startup-initialised singleton. Reasoning mode lazily creates and caches an agent built with the reasoning-tier LLM (gemini-3-flash-preview). """ if mode != "reasoning": return _agents.get(agent_key) if agent_key in _reasoning_agents: return _reasoning_agents[agent_key] builder_cls = _AGENT_BUILDERS.get(agent_key) if not builder_cls: # No builder registered — fall back to the fast agent logger.warning("No reasoning builder for %s; using fast agent", agent_key) return _agents.get(agent_key) reasoning_llm = Config.get_reasoning_llm(with_cost_tracking=True) agent_instance = builder_cls(reasoning_llm) _reasoning_agents[agent_key] = agent_instance.agent logger.info("Lazily built reasoning agent for %s", agent_key) return _reasoning_agents[agent_key] def initialize_agents() -> None: """Build all agent instances and the semantic router. Call once at startup.""" global _agents, _semantic_router, _AGENT_BUILDERS global _swap_network_terms, _swap_token_terms global _lending_network_terms, _lending_asset_terms llm = Config.get_fast_llm(with_cost_tracking=True) embeddings = Config.get_embeddings() # Semantic router _semantic_router = SemanticRouter(embeddings) try: _semantic_router.warm_up() except Exception: logger.warning("SemanticRouter warm-up failed; keyword fallback will be used.") # Build fast-tier agents (gemini-2.5-flash) _agents["crypto_agent"] = CryptoDataAgent(llm).agent _agents["search_agent"] = SearchAgent(llm).agent _agents["default_agent"] = DefaultAgent(llm).agent _agents["swap_agent"] = SwapAgent(llm).agent _agents["dca_agent"] = DcaAgent(llm).agent _agents["lending_agent"] = LendingAgent(llm).agent _agents["staking_agent"] = StakingAgent(llm).agent _agents["strategy_agent"] = StrategyAgent(llm).agent _agents["liquidity_agent"] = LiquidityAgent(llm).agent _agents["portfolio_advisor"] = PortfolioAdvisorAgent(llm).agent if is_database_available(): _agents["database_agent"] = DatabaseAgent(llm) else: logger.info("Database not available; database_agent disabled.") # Register builder classes for lazy reasoning-tier agent creation _AGENT_BUILDERS = { "crypto_agent": CryptoDataAgent, "search_agent": SearchAgent, "default_agent": DefaultAgent, "swap_agent": SwapAgent, "dca_agent": DcaAgent, "lending_agent": LendingAgent, "staking_agent": StakingAgent, "strategy_agent": StrategyAgent, "liquidity_agent": LiquidityAgent, "portfolio_advisor": PortfolioAdvisorAgent, } # Keyword detection terms _swap_network_terms, _swap_token_terms = build_swap_detection_terms() _lending_network_terms, _lending_asset_terms = build_lending_detection_terms() logger.info("All agents initialised: %s", list(_agents.keys())) # --------------------------------------------------------------------------- # Node: entry_node — zero LLM calls # --------------------------------------------------------------------------- def entry_node(state: AgentState) -> dict: """Windowing, DeFi state lookup, message building. Zero LLM calls.""" messages = state.get("messages", []) user_id = state.get("user_id") conversation_id = state.get("conversation_id") def _normalize_message_content(content: Any) -> Any: if isinstance(content, str): cleaned = content.strip() return cleaned or None if isinstance(content, list): cleaned_parts: List[Any] = [] for part in content: if isinstance(part, str): stripped = part.strip() if stripped: cleaned_parts.append(stripped) continue if isinstance(part, dict): normalized_part = dict(part) text_value = normalized_part.get("text") if isinstance(text_value, str): normalized_part["text"] = text_value.strip() has_text = bool(str(normalized_part.get("text", "")).strip()) has_data = bool(normalized_part.get("data")) if has_text or has_data: cleaned_parts.append(normalized_part) return cleaned_parts or None return content if content else None # Conversation windowing fast_llm = Config.get_fast_llm(with_cost_tracking=True) windowed = prepare_context(messages, max_recent=8, summarizer_llm=fast_llm) # Detect pending followups awaiting_swap, awaiting_dca, awaiting_liquidity = detect_pending_followups(messages) # Build LangChain messages langchain_messages: List[Any] = [] for msg in windowed: role = msg.get("role") content = _normalize_message_content(msg.get("content", "")) if content is None: continue if role == "user": langchain_messages.append(HumanMessage(content=content)) elif role == "system": langchain_messages.append(SystemMessage(content=content)) elif role == "assistant": langchain_messages.append(AIMessage(content=content)) today = date.today().strftime("%B %d, %Y") response_mode = state.get("response_mode", "fast") mode_directive = get_generic_directive(response_mode) base_instructions = ( f"Today's date is {today}.\n" "Always respond in English, regardless of the user's language." ) if mode_directive: base_instructions += f"\n\n{mode_directive}" langchain_messages.insert( 0, SystemMessage(content=base_instructions), ) # Existing DeFi states dca_state = metadata.get_dca_agent(user_id=user_id, conversation_id=conversation_id) swap_state = metadata.get_swap_agent(user_id=user_id, conversation_id=conversation_id) lending_state = metadata.get_lending_agent(user_id=user_id, conversation_id=conversation_id) staking_state = metadata.get_staking_agent(user_id=user_id, conversation_id=conversation_id) strategy_state = metadata.get_strategy_agent(user_id=user_id, conversation_id=conversation_id) liquidity_state = metadata.get_liquidity_agent(user_id=user_id, conversation_id=conversation_id) # Last user message last_user_msg = "" for msg in reversed(windowed): if msg.get("role") == "user": normalized = _normalize_message_content(msg.get("content") or "") if isinstance(normalized, str): last_user_msg = normalized break # Active DeFi flow? has_active_defi = any( s and s.get("status") in ("collecting", "consulting", "recommendation", "confirmation") for s in (swap_state, lending_state, staking_state, liquidity_state, dca_state) ) has_active_defi = has_active_defi or bool( strategy_state and strategy_state.get("status") in ("profiling", "discovery", "recommendation", "comparison", "confirmation") ) # Inject file attachments as multimodal content blocks into the last HumanMessage file_attachments = state.get("file_attachments") if file_attachments: for i in range(len(langchain_messages) - 1, -1, -1): if isinstance(langchain_messages[i], HumanMessage): original_content = langchain_messages[i].content # Ensure original_text is a string (content could already be a list) if isinstance(original_content, str): original_text = original_content.strip() elif isinstance(original_content, list): original_text = " ".join( p.get("text", "") if isinstance(p, dict) else str(p) for p in original_content ).strip() else: original_text = str(original_content).strip() content_blocks: List[Dict[str, Any]] = [] if original_text: content_blocks.append({"type": "text", "text": original_text}) for att in file_attachments: if att["type"] == "image": content_blocks.append({ "type": "media", "data": att["data"], "mime_type": att["mime_type"], }) elif att["type"] == "document": doc_text = att.get("text") or "" if doc_text.strip(): content_blocks.append({ "type": "text", "text": f"\n\n--- Document: {att['filename']} ---\n{doc_text[:30000]}\n--- End ---", }) else: content_blocks.append({ "type": "text", "text": f"\n\n[Document attached: {att['filename']} — text extraction failed or document is empty]", }) if content_blocks: langchain_messages[i] = HumanMessage(content=content_blocks) break return { "windowed_messages": windowed, "langchain_messages": langchain_messages, "last_user_message": last_user_msg, "swap_state": swap_state or None, "lending_state": lending_state or None, "staking_state": staking_state or None, "liquidity_state": liquidity_state or None, "dca_state": dca_state or None, "strategy_state": strategy_state or None, "strategy_preferences": (strategy_state or {}).get("overrides") if strategy_state else None, "awaiting_swap": awaiting_swap, "awaiting_dca": awaiting_dca, "awaiting_liquidity": awaiting_liquidity, "has_active_defi": has_active_defi, "preflight_errors": [], "nodes_executed": ["entry_node"], } # --------------------------------------------------------------------------- # Node: semantic_router_node — embedding classification + pre-extraction # --------------------------------------------------------------------------- def semantic_router_node(state: AgentState) -> dict: """Classify intent via embeddings, pre-extract params, run preflight. If ``route_intent`` is already populated (e.g. pre-classified from the audio transcription step), the embedding classification is skipped — saving ~200 ms. Pre-extraction and preflight still run normally. """ last_user_msg = state.get("last_user_message", "") has_active_defi = state.get("has_active_defi", False) nodes = list(state.get("nodes_executed", [])) nodes.append("semantic_router_node") # --- Check for pre-classified intent (audio path) --- pre_intent = state.get("route_intent") pre_confidence = state.get("route_confidence", 0.0) pre_agent = state.get("route_agent") if pre_intent and pre_confidence > 0: # Already classified (e.g. audio transcription + classification) logger.debug( "SemanticRouter SKIPPED (pre-classified): intent=%s confidence=%.3f agent=%s", pre_intent, pre_confidence, pre_agent, ) intent_str = pre_intent confidence = pre_confidence agent_name = pre_agent needs_confirm = confidence < SemanticRouter.HIGH_CONFIDENCE else: # Normal path: classify via embeddings route = None if last_user_msg and not has_active_defi and _semantic_router: route = _semantic_router.classify(last_user_msg) if route: logger.debug( "SemanticRouter: intent=%s confidence=%.3f agent=%s", route.intent.value, route.confidence, route.agent_name, ) intent_str = route.intent.value if route else None confidence = route.confidence if route else 0.0 agent_name = route.agent_name if route else None needs_confirm = route.needs_llm_confirmation if route else True # Pre-extraction + preflight (runs regardless of classification source) extracted = None preflight_errors: List[str] = [] pre_hint: Optional[str] = None if intent_str and confidence >= SemanticRouter.LOW_CONFIDENCE: if intent_str in ("swap", "lending", "staking", "liquidity", "dca"): extracted = pre_extract(last_user_msg, intent_str) # Preflight validation if extracted and extracted.has_any() and intent_str in ("swap", "lending", "staking"): preflight_params = build_preflight_params(intent_str, extracted) preflight_errors = run_preflight(intent_str, preflight_params) # Parameter hint for downstream agent if not preflight_errors and extracted and extracted.has_any(): pre_hint = extracted.to_hint() logger.info( "routing.semantic intent=%s confidence=%.3f agent=%s has_active_defi=%s preflight_errors=%d", intent_str or "none", confidence, agent_name or "none", has_active_defi, len(preflight_errors), ) return { "route_intent": intent_str, "route_confidence": confidence, "route_agent": agent_name, "needs_llm_confirmation": needs_confirm, "preflight_errors": preflight_errors, "pre_extracted_hint": pre_hint, "nodes_executed": nodes, } # --------------------------------------------------------------------------- # Node: llm_router_node — 1 LLM call for disambiguation # --------------------------------------------------------------------------- _LLM_ROUTER_PROMPT = """You are a routing assistant. Given the user's message, determine which agent should handle it. Available agents: - crypto_agent: Cryptocurrency prices, market data, NFT floor prices, DeFi TVL. - swap_agent: Token swap operations. - dca_agent: Dollar-cost averaging strategies. - lending_agent: Lending operations (supply, borrow, repay, withdraw). - staking_agent: Staking operations (stake ETH, unstake stETH via Lido). - strategy_agent: Avalanche yield strategy planning and allocation workflows. - liquidity_agent: Liquidity operations (add/remove liquidity, stake/unstake LP tokens, claim rewards on Aerodrome/Base). - portfolio_advisor: Portfolio analysis, risk assessment, wallet holdings, rebalancing advice. - search_agent: Web search for current events and factual lookups. - database_agent: Database queries and data analysis. - default_agent: General conversation, education, greetings. Respond with ONLY the agent name (e.g. "crypto_agent"). Nothing else.""" def llm_router_node(state: AgentState) -> dict: """Use a single LLM call to disambiguate low-confidence intents.""" raw_last_msg = state.get("last_user_message") last_msg = raw_last_msg.strip() if isinstance(raw_last_msg, str) else "" nodes = list(state.get("nodes_executed", [])) nodes.append("llm_router_node") llm = Config.get_fast_llm(with_cost_tracking=True) if not last_msg: logger.warning("LLM router skipped because last_user_message is empty; defaulting to default_agent.") return { "route_agent": "default_agent", "route_confidence": 0.0, "needs_llm_confirmation": False, "nodes_executed": nodes, } try: response = llm.invoke([ SystemMessage(content=_LLM_ROUTER_PROMPT), HumanMessage(content=last_msg), ]) raw = get_text_content(response) or "default_agent" chosen = raw.strip().lower().replace(" ", "_") # Validate valid_agents = { "crypto_agent", "swap_agent", "dca_agent", "lending_agent", "staking_agent", "strategy_agent", "liquidity_agent", "portfolio_advisor", "search_agent", "database_agent", "default_agent", } if chosen not in valid_agents: chosen = "default_agent" except Exception: logger.exception("LLM router failed; defaulting to default_agent.") chosen = "default_agent" return { "route_agent": chosen, "route_confidence": 1.0, "needs_llm_confirmation": False, "nodes_executed": nodes, } # --------------------------------------------------------------------------- # Node: error_node — return preflight errors (0 LLM calls) # --------------------------------------------------------------------------- def error_node(state: AgentState) -> dict: """Return preflight validation errors directly.""" errors = state.get("preflight_errors", []) nodes = list(state.get("nodes_executed", [])) nodes.append("error_node") friendly = "; ".join(errors) return { "final_response": f"I can't proceed with that request: {friendly}. Please correct the details and try again.", "response_agent": "supervisor", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } # --------------------------------------------------------------------------- # Agent wrapper nodes # --------------------------------------------------------------------------- def _invoke_defi_agent( agent_key: str, system_prompt: str, session_ctx, state: AgentState, intent_type: str, config: RunnableConfig | None = None, ) -> dict: """Shared logic for invoking a DeFi agent with session scoping.""" user_id = state.get("user_id") conversation_id = state.get("conversation_id") response_mode = state.get("response_mode", "fast") langchain_messages = list(state.get("langchain_messages", [])) nodes = list(state.get("nodes_executed", [])) nodes.append(f"{agent_key}_node") agent = _get_agent(agent_key, response_mode) if not agent: return { "final_response": "Agent not available.", "response_agent": agent_key, "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } # Inject system prompt scoped_messages = [SystemMessage(content=system_prompt)] # Inject per-agent mode directive agent_directive = get_agent_directive(agent_key, response_mode) if agent_directive: scoped_messages.append(SystemMessage(content=agent_directive)) # Inject DeFi guidance if in-progress defi_state = state.get(f"{intent_type}_state") guidance = build_defi_guidance(intent_type, defi_state) if guidance: scoped_messages.append(SystemMessage(content=guidance)) # Inject pre-extracted hint hint = state.get("pre_extracted_hint") if hint: scoped_messages.append(SystemMessage(content=hint)) scoped_messages.extend(langchain_messages) wallet_address = state.get("wallet_address") try: with session_ctx(user_id=user_id, conversation_id=conversation_id): with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address): response = agent.invoke({"messages": scoped_messages}, config=config) except Exception: logger.exception("Error invoking %s", agent_key) return { "final_response": "Sorry, an error occurred while processing your request.", "response_agent": agent_key, "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } agent_name, text, messages_out = extract_response_from_graph(response) resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else agent_key meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out) return { "final_response": text, "response_agent": resolved_agent_name, "response_metadata": meta, "raw_agent_messages": messages_out, "nodes_executed": nodes, } def swap_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: """Invoke swap agent with both swap and portfolio session contexts. The portfolio session gives the swap agent access to ``get_user_portfolio`` so users can check balances mid-swap without leaving the flow. """ user_id = state.get("user_id") conversation_id = state.get("conversation_id") wallet_address = state.get("wallet_address") response_mode = state.get("response_mode", "fast") langchain_messages = list(state.get("langchain_messages", [])) nodes = list(state.get("nodes_executed", [])) nodes.append("swap_agent_node") agent = _get_agent("swap_agent", response_mode) if not agent: return { "final_response": "Agent not available.", "response_agent": "swap_agent", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } # Inject system prompt + per-agent mode directive + DeFi guidance scoped_messages = [SystemMessage(content=SWAP_AGENT_SYSTEM_PROMPT)] agent_directive = get_agent_directive("swap_agent", response_mode) if agent_directive: scoped_messages.append(SystemMessage(content=agent_directive)) defi_state = state.get("swap_state") guidance = build_defi_guidance("swap", defi_state) if guidance: scoped_messages.append(SystemMessage(content=guidance)) hint = state.get("pre_extracted_hint") if hint: scoped_messages.append(SystemMessage(content=hint)) scoped_messages.extend(langchain_messages) try: with swap_session(user_id=user_id, conversation_id=conversation_id): with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address): response = agent.invoke({"messages": scoped_messages}, config=config) except Exception: logger.exception("Error invoking swap_agent") return { "final_response": "Sorry, an error occurred while processing your request.", "response_agent": "swap_agent", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } agent_name, text, messages_out = extract_response_from_graph(response) resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else "swap_agent" meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out) return { "final_response": text, "response_agent": resolved_agent_name, "response_metadata": meta, "raw_agent_messages": messages_out, "nodes_executed": nodes, } def lending_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_defi_agent("lending_agent", LENDING_AGENT_SYSTEM_PROMPT, lending_session, state, "lending", config) def staking_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_defi_agent("staking_agent", STAKING_AGENT_SYSTEM_PROMPT, staking_session, state, "staking", config) def liquidity_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_defi_agent("liquidity_agent", LIQUIDITY_AGENT_SYSTEM_PROMPT, liquidity_session, state, "liquidity", config) def dca_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_defi_agent("dca_agent", DCA_AGENT_SYSTEM_PROMPT, dca_session, state, "dca", config) def strategy_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_defi_agent("strategy_agent", STRATEGY_AGENT_SYSTEM_PROMPT, strategy_session, state, "strategy", config) def _invoke_simple_agent(agent_key: str, state: AgentState, config: RunnableConfig | None = None) -> dict: """Shared logic for invoking a non-DeFi agent with portfolio session scoping.""" user_id = state.get("user_id") conversation_id = state.get("conversation_id") wallet_address = state.get("wallet_address") response_mode = state.get("response_mode", "fast") langchain_messages = list(state.get("langchain_messages", [])) nodes = list(state.get("nodes_executed", [])) nodes.append(f"{agent_key}_node") agent = _get_agent(agent_key, response_mode) if not agent: return { "final_response": "Agent not available.", "response_agent": agent_key, "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } # Inject per-agent mode directive if available agent_directive = get_agent_directive(agent_key, response_mode) if agent_directive: invoke_messages = [SystemMessage(content=agent_directive)] + langchain_messages else: invoke_messages = langchain_messages def _has_user_content(message: Any) -> bool: if not isinstance(message, HumanMessage): return False content = message.content if isinstance(content, str): return bool(content.strip()) if isinstance(content, list): return any( ( isinstance(part, str) and bool(part.strip()) ) or ( isinstance(part, dict) and ( bool(str(part.get("text", "")).strip()) or bool(part.get("data")) ) ) for part in content ) return bool(content) if not any(_has_user_content(msg) for msg in invoke_messages): logger.warning( "Skipping %s invocation because no non-empty HumanMessage is available in state.", agent_key, ) return { "final_response": "I didn't receive a valid user message for this turn. Please send your request again.", "response_agent": agent_key, "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } try: with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address): response = agent.invoke({"messages": invoke_messages}, config=config) except Exception: logger.exception("Error invoking %s", agent_key) return { "final_response": "Sorry, an error occurred while processing your request.", "response_agent": agent_key, "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } agent_name, text, messages_out = extract_response_from_graph(response) resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else agent_key meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out) return { "final_response": text, "response_agent": resolved_agent_name, "response_metadata": meta, "raw_agent_messages": messages_out, "nodes_executed": nodes, } def crypto_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_simple_agent("crypto_agent", state, config) def search_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_simple_agent("search_agent", state, config) def default_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_simple_agent("default_agent", state, config) def portfolio_advisor_node(state: AgentState, config: RunnableConfig | None = None) -> dict: """Invoke the portfolio advisor with wallet_address session context.""" user_id = state.get("user_id") conversation_id = state.get("conversation_id") wallet_address = state.get("wallet_address") response_mode = state.get("response_mode", "fast") langchain_messages = list(state.get("langchain_messages", [])) nodes = list(state.get("nodes_executed", [])) nodes.append("portfolio_advisor_node") agent = _get_agent("portfolio_advisor", response_mode) if not agent: return { "final_response": "Portfolio advisor is not available.", "response_agent": "portfolio_advisor", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } # Inject system prompt + per-agent mode directive scoped_messages = [SystemMessage(content=PORTFOLIO_ADVISOR_SYSTEM_PROMPT)] agent_directive = get_agent_directive("portfolio_advisor", response_mode) if agent_directive: scoped_messages.append(SystemMessage(content=agent_directive)) scoped_messages.extend(langchain_messages) if not any(isinstance(msg, HumanMessage) and ( bool(msg.content.strip()) if isinstance(msg.content, str) else any( (isinstance(part, str) and bool(part.strip())) or (isinstance(part, dict) and (bool(str(part.get("text", "")).strip()) or bool(part.get("data")))) for part in (msg.content or []) ) ) for msg in scoped_messages): logger.warning("Skipping portfolio_advisor invocation because no non-empty HumanMessage is available in state.") return { "final_response": "I didn't receive a valid user message for this turn. Please send your request again.", "response_agent": "portfolio_advisor", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } try: with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address): response = agent.invoke({"messages": scoped_messages}, config=config) except Exception: logger.exception("Error invoking portfolio_advisor") return { "final_response": "Sorry, an error occurred while analyzing your portfolio.", "response_agent": "portfolio_advisor", "response_metadata": {}, "raw_agent_messages": [], "nodes_executed": nodes, } agent_name, text, messages_out = extract_response_from_graph(response) resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else "portfolio_advisor" meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out) return { "final_response": text, "response_agent": resolved_agent_name, "response_metadata": meta, "raw_agent_messages": messages_out, "nodes_executed": nodes, } def database_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict: return _invoke_simple_agent("database_agent", state, config)