zico-agent / src /graphs /nodes.py
github-actions[bot]
Deploy from GitHub Actions: 1ded9e69a5887b83053b5ff8ad8fcab623490f67
020228b
"""
All graph node functions for the StateGraph pipeline.
Each node takes an AgentState dict and returns a partial state update.
"""
from __future__ import annotations
import logging
from datetime import date
from typing import Any, Dict, List, Optional
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from src.agents.config import Config
from src.agents.metadata import metadata
from src.agents.mode_directives import get_generic_directive, get_agent_directive
from src.agents.routing.semantic_router import SemanticRouter
from src.agents.routing.pre_extractor import pre_extract
from src.agents.memory.windowing import prepare_context
from src.agents.validation.preflight import run_preflight
from src.graphs.state import AgentState
from src.graphs.utils import (
build_defi_guidance,
build_metadata,
build_preflight_params,
build_swap_detection_terms,
build_lending_detection_terms,
detect_pending_followups,
extract_response_from_graph,
get_text_content,
is_swap_like_request,
)
# --- Agent imports ---
from src.agents.crypto_data.agent import CryptoDataAgent
from src.agents.database.agent import DatabaseAgent
from src.agents.default.agent import DefaultAgent
from src.agents.swap.agent import SwapAgent
from src.agents.swap.tools import swap_session
from src.agents.swap.prompt import SWAP_AGENT_SYSTEM_PROMPT
from src.agents.dca.agent import DcaAgent
from src.agents.dca.tools import dca_session
from src.agents.dca.prompt import DCA_AGENT_SYSTEM_PROMPT
from src.agents.lending.agent import LendingAgent
from src.agents.lending.tools import lending_session
from src.agents.lending.prompt import LENDING_AGENT_SYSTEM_PROMPT
from src.agents.staking.agent import StakingAgent
from src.agents.staking.tools import staking_session
from src.agents.staking.prompt import STAKING_AGENT_SYSTEM_PROMPT
from src.agents.strategy.agent import StrategyAgent
from src.agents.strategy.tools import strategy_session
from src.agents.strategy.prompt import STRATEGY_AGENT_SYSTEM_PROMPT
from src.agents.liquidity.agent import LiquidityAgent
from src.agents.liquidity.tools import liquidity_session
from src.agents.liquidity.prompt import LIQUIDITY_AGENT_SYSTEM_PROMPT
from src.agents.search.agent import SearchAgent
from src.agents.portfolio.agent import PortfolioAdvisorAgent
from src.agents.portfolio.tools import portfolio_session
from src.agents.portfolio.prompt import PORTFOLIO_ADVISOR_SYSTEM_PROMPT
from src.agents.database.client import is_database_available
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Module-level singletons (initialised once at startup)
# ---------------------------------------------------------------------------
_agents: Dict[str, Any] = {}
_reasoning_agents: Dict[str, Any] = {} # Lazy-built reasoning-tier agents
_semantic_router: Optional[SemanticRouter] = None
_swap_network_terms: set = set()
_swap_token_terms: set = set()
_lending_network_terms: set = set()
_lending_asset_terms: set = set()
# Maps agent keys to their builder classes for lazy reasoning agent creation
_AGENT_BUILDERS: Dict[str, type] = {}
def _get_agent(agent_key: str, mode: str = "fast"):
"""Return the agent runnable for the given key and response mode.
Fast mode returns the startup-initialised singleton.
Reasoning mode lazily creates and caches an agent built with the
reasoning-tier LLM (gemini-3-flash-preview).
"""
if mode != "reasoning":
return _agents.get(agent_key)
if agent_key in _reasoning_agents:
return _reasoning_agents[agent_key]
builder_cls = _AGENT_BUILDERS.get(agent_key)
if not builder_cls:
# No builder registered — fall back to the fast agent
logger.warning("No reasoning builder for %s; using fast agent", agent_key)
return _agents.get(agent_key)
reasoning_llm = Config.get_reasoning_llm(with_cost_tracking=True)
agent_instance = builder_cls(reasoning_llm)
_reasoning_agents[agent_key] = agent_instance.agent
logger.info("Lazily built reasoning agent for %s", agent_key)
return _reasoning_agents[agent_key]
def initialize_agents() -> None:
"""Build all agent instances and the semantic router. Call once at startup."""
global _agents, _semantic_router, _AGENT_BUILDERS
global _swap_network_terms, _swap_token_terms
global _lending_network_terms, _lending_asset_terms
llm = Config.get_fast_llm(with_cost_tracking=True)
embeddings = Config.get_embeddings()
# Semantic router
_semantic_router = SemanticRouter(embeddings)
try:
_semantic_router.warm_up()
except Exception:
logger.warning("SemanticRouter warm-up failed; keyword fallback will be used.")
# Build fast-tier agents (gemini-2.5-flash)
_agents["crypto_agent"] = CryptoDataAgent(llm).agent
_agents["search_agent"] = SearchAgent(llm).agent
_agents["default_agent"] = DefaultAgent(llm).agent
_agents["swap_agent"] = SwapAgent(llm).agent
_agents["dca_agent"] = DcaAgent(llm).agent
_agents["lending_agent"] = LendingAgent(llm).agent
_agents["staking_agent"] = StakingAgent(llm).agent
_agents["strategy_agent"] = StrategyAgent(llm).agent
_agents["liquidity_agent"] = LiquidityAgent(llm).agent
_agents["portfolio_advisor"] = PortfolioAdvisorAgent(llm).agent
if is_database_available():
_agents["database_agent"] = DatabaseAgent(llm)
else:
logger.info("Database not available; database_agent disabled.")
# Register builder classes for lazy reasoning-tier agent creation
_AGENT_BUILDERS = {
"crypto_agent": CryptoDataAgent,
"search_agent": SearchAgent,
"default_agent": DefaultAgent,
"swap_agent": SwapAgent,
"dca_agent": DcaAgent,
"lending_agent": LendingAgent,
"staking_agent": StakingAgent,
"strategy_agent": StrategyAgent,
"liquidity_agent": LiquidityAgent,
"portfolio_advisor": PortfolioAdvisorAgent,
}
# Keyword detection terms
_swap_network_terms, _swap_token_terms = build_swap_detection_terms()
_lending_network_terms, _lending_asset_terms = build_lending_detection_terms()
logger.info("All agents initialised: %s", list(_agents.keys()))
# ---------------------------------------------------------------------------
# Node: entry_node — zero LLM calls
# ---------------------------------------------------------------------------
def entry_node(state: AgentState) -> dict:
"""Windowing, DeFi state lookup, message building. Zero LLM calls."""
messages = state.get("messages", [])
user_id = state.get("user_id")
conversation_id = state.get("conversation_id")
def _normalize_message_content(content: Any) -> Any:
if isinstance(content, str):
cleaned = content.strip()
return cleaned or None
if isinstance(content, list):
cleaned_parts: List[Any] = []
for part in content:
if isinstance(part, str):
stripped = part.strip()
if stripped:
cleaned_parts.append(stripped)
continue
if isinstance(part, dict):
normalized_part = dict(part)
text_value = normalized_part.get("text")
if isinstance(text_value, str):
normalized_part["text"] = text_value.strip()
has_text = bool(str(normalized_part.get("text", "")).strip())
has_data = bool(normalized_part.get("data"))
if has_text or has_data:
cleaned_parts.append(normalized_part)
return cleaned_parts or None
return content if content else None
# Conversation windowing
fast_llm = Config.get_fast_llm(with_cost_tracking=True)
windowed = prepare_context(messages, max_recent=8, summarizer_llm=fast_llm)
# Detect pending followups
awaiting_swap, awaiting_dca, awaiting_liquidity = detect_pending_followups(messages)
# Build LangChain messages
langchain_messages: List[Any] = []
for msg in windowed:
role = msg.get("role")
content = _normalize_message_content(msg.get("content", ""))
if content is None:
continue
if role == "user":
langchain_messages.append(HumanMessage(content=content))
elif role == "system":
langchain_messages.append(SystemMessage(content=content))
elif role == "assistant":
langchain_messages.append(AIMessage(content=content))
today = date.today().strftime("%B %d, %Y")
response_mode = state.get("response_mode", "fast")
mode_directive = get_generic_directive(response_mode)
base_instructions = (
f"Today's date is {today}.\n"
"Always respond in English, regardless of the user's language."
)
if mode_directive:
base_instructions += f"\n\n{mode_directive}"
langchain_messages.insert(
0,
SystemMessage(content=base_instructions),
)
# Existing DeFi states
dca_state = metadata.get_dca_agent(user_id=user_id, conversation_id=conversation_id)
swap_state = metadata.get_swap_agent(user_id=user_id, conversation_id=conversation_id)
lending_state = metadata.get_lending_agent(user_id=user_id, conversation_id=conversation_id)
staking_state = metadata.get_staking_agent(user_id=user_id, conversation_id=conversation_id)
strategy_state = metadata.get_strategy_agent(user_id=user_id, conversation_id=conversation_id)
liquidity_state = metadata.get_liquidity_agent(user_id=user_id, conversation_id=conversation_id)
# Last user message
last_user_msg = ""
for msg in reversed(windowed):
if msg.get("role") == "user":
normalized = _normalize_message_content(msg.get("content") or "")
if isinstance(normalized, str):
last_user_msg = normalized
break
# Active DeFi flow?
has_active_defi = any(
s and s.get("status") in ("collecting", "consulting", "recommendation", "confirmation")
for s in (swap_state, lending_state, staking_state, liquidity_state, dca_state)
)
has_active_defi = has_active_defi or bool(
strategy_state
and strategy_state.get("status") in ("profiling", "discovery", "recommendation", "comparison", "confirmation")
)
# Inject file attachments as multimodal content blocks into the last HumanMessage
file_attachments = state.get("file_attachments")
if file_attachments:
for i in range(len(langchain_messages) - 1, -1, -1):
if isinstance(langchain_messages[i], HumanMessage):
original_content = langchain_messages[i].content
# Ensure original_text is a string (content could already be a list)
if isinstance(original_content, str):
original_text = original_content.strip()
elif isinstance(original_content, list):
original_text = " ".join(
p.get("text", "") if isinstance(p, dict) else str(p)
for p in original_content
).strip()
else:
original_text = str(original_content).strip()
content_blocks: List[Dict[str, Any]] = []
if original_text:
content_blocks.append({"type": "text", "text": original_text})
for att in file_attachments:
if att["type"] == "image":
content_blocks.append({
"type": "media",
"data": att["data"],
"mime_type": att["mime_type"],
})
elif att["type"] == "document":
doc_text = att.get("text") or ""
if doc_text.strip():
content_blocks.append({
"type": "text",
"text": f"\n\n--- Document: {att['filename']} ---\n{doc_text[:30000]}\n--- End ---",
})
else:
content_blocks.append({
"type": "text",
"text": f"\n\n[Document attached: {att['filename']} — text extraction failed or document is empty]",
})
if content_blocks:
langchain_messages[i] = HumanMessage(content=content_blocks)
break
return {
"windowed_messages": windowed,
"langchain_messages": langchain_messages,
"last_user_message": last_user_msg,
"swap_state": swap_state or None,
"lending_state": lending_state or None,
"staking_state": staking_state or None,
"liquidity_state": liquidity_state or None,
"dca_state": dca_state or None,
"strategy_state": strategy_state or None,
"strategy_preferences": (strategy_state or {}).get("overrides") if strategy_state else None,
"awaiting_swap": awaiting_swap,
"awaiting_dca": awaiting_dca,
"awaiting_liquidity": awaiting_liquidity,
"has_active_defi": has_active_defi,
"preflight_errors": [],
"nodes_executed": ["entry_node"],
}
# ---------------------------------------------------------------------------
# Node: semantic_router_node — embedding classification + pre-extraction
# ---------------------------------------------------------------------------
def semantic_router_node(state: AgentState) -> dict:
"""Classify intent via embeddings, pre-extract params, run preflight.
If ``route_intent`` is already populated (e.g. pre-classified from the
audio transcription step), the embedding classification is skipped —
saving ~200 ms. Pre-extraction and preflight still run normally.
"""
last_user_msg = state.get("last_user_message", "")
has_active_defi = state.get("has_active_defi", False)
nodes = list(state.get("nodes_executed", []))
nodes.append("semantic_router_node")
# --- Check for pre-classified intent (audio path) ---
pre_intent = state.get("route_intent")
pre_confidence = state.get("route_confidence", 0.0)
pre_agent = state.get("route_agent")
if pre_intent and pre_confidence > 0:
# Already classified (e.g. audio transcription + classification)
logger.debug(
"SemanticRouter SKIPPED (pre-classified): intent=%s confidence=%.3f agent=%s",
pre_intent, pre_confidence, pre_agent,
)
intent_str = pre_intent
confidence = pre_confidence
agent_name = pre_agent
needs_confirm = confidence < SemanticRouter.HIGH_CONFIDENCE
else:
# Normal path: classify via embeddings
route = None
if last_user_msg and not has_active_defi and _semantic_router:
route = _semantic_router.classify(last_user_msg)
if route:
logger.debug(
"SemanticRouter: intent=%s confidence=%.3f agent=%s",
route.intent.value,
route.confidence,
route.agent_name,
)
intent_str = route.intent.value if route else None
confidence = route.confidence if route else 0.0
agent_name = route.agent_name if route else None
needs_confirm = route.needs_llm_confirmation if route else True
# Pre-extraction + preflight (runs regardless of classification source)
extracted = None
preflight_errors: List[str] = []
pre_hint: Optional[str] = None
if intent_str and confidence >= SemanticRouter.LOW_CONFIDENCE:
if intent_str in ("swap", "lending", "staking", "liquidity", "dca"):
extracted = pre_extract(last_user_msg, intent_str)
# Preflight validation
if extracted and extracted.has_any() and intent_str in ("swap", "lending", "staking"):
preflight_params = build_preflight_params(intent_str, extracted)
preflight_errors = run_preflight(intent_str, preflight_params)
# Parameter hint for downstream agent
if not preflight_errors and extracted and extracted.has_any():
pre_hint = extracted.to_hint()
logger.info(
"routing.semantic intent=%s confidence=%.3f agent=%s has_active_defi=%s preflight_errors=%d",
intent_str or "none",
confidence,
agent_name or "none",
has_active_defi,
len(preflight_errors),
)
return {
"route_intent": intent_str,
"route_confidence": confidence,
"route_agent": agent_name,
"needs_llm_confirmation": needs_confirm,
"preflight_errors": preflight_errors,
"pre_extracted_hint": pre_hint,
"nodes_executed": nodes,
}
# ---------------------------------------------------------------------------
# Node: llm_router_node — 1 LLM call for disambiguation
# ---------------------------------------------------------------------------
_LLM_ROUTER_PROMPT = """You are a routing assistant. Given the user's message, determine which agent should handle it.
Available agents:
- crypto_agent: Cryptocurrency prices, market data, NFT floor prices, DeFi TVL.
- swap_agent: Token swap operations.
- dca_agent: Dollar-cost averaging strategies.
- lending_agent: Lending operations (supply, borrow, repay, withdraw).
- staking_agent: Staking operations (stake ETH, unstake stETH via Lido).
- strategy_agent: Avalanche yield strategy planning and allocation workflows.
- liquidity_agent: Liquidity operations (add/remove liquidity, stake/unstake LP tokens, claim rewards on Aerodrome/Base).
- portfolio_advisor: Portfolio analysis, risk assessment, wallet holdings, rebalancing advice.
- search_agent: Web search for current events and factual lookups.
- database_agent: Database queries and data analysis.
- default_agent: General conversation, education, greetings.
Respond with ONLY the agent name (e.g. "crypto_agent"). Nothing else."""
def llm_router_node(state: AgentState) -> dict:
"""Use a single LLM call to disambiguate low-confidence intents."""
raw_last_msg = state.get("last_user_message")
last_msg = raw_last_msg.strip() if isinstance(raw_last_msg, str) else ""
nodes = list(state.get("nodes_executed", []))
nodes.append("llm_router_node")
llm = Config.get_fast_llm(with_cost_tracking=True)
if not last_msg:
logger.warning("LLM router skipped because last_user_message is empty; defaulting to default_agent.")
return {
"route_agent": "default_agent",
"route_confidence": 0.0,
"needs_llm_confirmation": False,
"nodes_executed": nodes,
}
try:
response = llm.invoke([
SystemMessage(content=_LLM_ROUTER_PROMPT),
HumanMessage(content=last_msg),
])
raw = get_text_content(response) or "default_agent"
chosen = raw.strip().lower().replace(" ", "_")
# Validate
valid_agents = {
"crypto_agent", "swap_agent", "dca_agent", "lending_agent",
"staking_agent", "strategy_agent", "liquidity_agent", "portfolio_advisor",
"search_agent", "database_agent", "default_agent",
}
if chosen not in valid_agents:
chosen = "default_agent"
except Exception:
logger.exception("LLM router failed; defaulting to default_agent.")
chosen = "default_agent"
return {
"route_agent": chosen,
"route_confidence": 1.0,
"needs_llm_confirmation": False,
"nodes_executed": nodes,
}
# ---------------------------------------------------------------------------
# Node: error_node — return preflight errors (0 LLM calls)
# ---------------------------------------------------------------------------
def error_node(state: AgentState) -> dict:
"""Return preflight validation errors directly."""
errors = state.get("preflight_errors", [])
nodes = list(state.get("nodes_executed", []))
nodes.append("error_node")
friendly = "; ".join(errors)
return {
"final_response": f"I can't proceed with that request: {friendly}. Please correct the details and try again.",
"response_agent": "supervisor",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
# ---------------------------------------------------------------------------
# Agent wrapper nodes
# ---------------------------------------------------------------------------
def _invoke_defi_agent(
agent_key: str,
system_prompt: str,
session_ctx,
state: AgentState,
intent_type: str,
config: RunnableConfig | None = None,
) -> dict:
"""Shared logic for invoking a DeFi agent with session scoping."""
user_id = state.get("user_id")
conversation_id = state.get("conversation_id")
response_mode = state.get("response_mode", "fast")
langchain_messages = list(state.get("langchain_messages", []))
nodes = list(state.get("nodes_executed", []))
nodes.append(f"{agent_key}_node")
agent = _get_agent(agent_key, response_mode)
if not agent:
return {
"final_response": "Agent not available.",
"response_agent": agent_key,
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
# Inject system prompt
scoped_messages = [SystemMessage(content=system_prompt)]
# Inject per-agent mode directive
agent_directive = get_agent_directive(agent_key, response_mode)
if agent_directive:
scoped_messages.append(SystemMessage(content=agent_directive))
# Inject DeFi guidance if in-progress
defi_state = state.get(f"{intent_type}_state")
guidance = build_defi_guidance(intent_type, defi_state)
if guidance:
scoped_messages.append(SystemMessage(content=guidance))
# Inject pre-extracted hint
hint = state.get("pre_extracted_hint")
if hint:
scoped_messages.append(SystemMessage(content=hint))
scoped_messages.extend(langchain_messages)
wallet_address = state.get("wallet_address")
try:
with session_ctx(user_id=user_id, conversation_id=conversation_id):
with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address):
response = agent.invoke({"messages": scoped_messages}, config=config)
except Exception:
logger.exception("Error invoking %s", agent_key)
return {
"final_response": "Sorry, an error occurred while processing your request.",
"response_agent": agent_key,
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
agent_name, text, messages_out = extract_response_from_graph(response)
resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else agent_key
meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out)
return {
"final_response": text,
"response_agent": resolved_agent_name,
"response_metadata": meta,
"raw_agent_messages": messages_out,
"nodes_executed": nodes,
}
def swap_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
"""Invoke swap agent with both swap and portfolio session contexts.
The portfolio session gives the swap agent access to ``get_user_portfolio``
so users can check balances mid-swap without leaving the flow.
"""
user_id = state.get("user_id")
conversation_id = state.get("conversation_id")
wallet_address = state.get("wallet_address")
response_mode = state.get("response_mode", "fast")
langchain_messages = list(state.get("langchain_messages", []))
nodes = list(state.get("nodes_executed", []))
nodes.append("swap_agent_node")
agent = _get_agent("swap_agent", response_mode)
if not agent:
return {
"final_response": "Agent not available.",
"response_agent": "swap_agent",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
# Inject system prompt + per-agent mode directive + DeFi guidance
scoped_messages = [SystemMessage(content=SWAP_AGENT_SYSTEM_PROMPT)]
agent_directive = get_agent_directive("swap_agent", response_mode)
if agent_directive:
scoped_messages.append(SystemMessage(content=agent_directive))
defi_state = state.get("swap_state")
guidance = build_defi_guidance("swap", defi_state)
if guidance:
scoped_messages.append(SystemMessage(content=guidance))
hint = state.get("pre_extracted_hint")
if hint:
scoped_messages.append(SystemMessage(content=hint))
scoped_messages.extend(langchain_messages)
try:
with swap_session(user_id=user_id, conversation_id=conversation_id):
with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address):
response = agent.invoke({"messages": scoped_messages}, config=config)
except Exception:
logger.exception("Error invoking swap_agent")
return {
"final_response": "Sorry, an error occurred while processing your request.",
"response_agent": "swap_agent",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
agent_name, text, messages_out = extract_response_from_graph(response)
resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else "swap_agent"
meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out)
return {
"final_response": text,
"response_agent": resolved_agent_name,
"response_metadata": meta,
"raw_agent_messages": messages_out,
"nodes_executed": nodes,
}
def lending_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_defi_agent("lending_agent", LENDING_AGENT_SYSTEM_PROMPT, lending_session, state, "lending", config)
def staking_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_defi_agent("staking_agent", STAKING_AGENT_SYSTEM_PROMPT, staking_session, state, "staking", config)
def liquidity_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_defi_agent("liquidity_agent", LIQUIDITY_AGENT_SYSTEM_PROMPT, liquidity_session, state, "liquidity", config)
def dca_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_defi_agent("dca_agent", DCA_AGENT_SYSTEM_PROMPT, dca_session, state, "dca", config)
def strategy_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_defi_agent("strategy_agent", STRATEGY_AGENT_SYSTEM_PROMPT, strategy_session, state, "strategy", config)
def _invoke_simple_agent(agent_key: str, state: AgentState, config: RunnableConfig | None = None) -> dict:
"""Shared logic for invoking a non-DeFi agent with portfolio session scoping."""
user_id = state.get("user_id")
conversation_id = state.get("conversation_id")
wallet_address = state.get("wallet_address")
response_mode = state.get("response_mode", "fast")
langchain_messages = list(state.get("langchain_messages", []))
nodes = list(state.get("nodes_executed", []))
nodes.append(f"{agent_key}_node")
agent = _get_agent(agent_key, response_mode)
if not agent:
return {
"final_response": "Agent not available.",
"response_agent": agent_key,
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
# Inject per-agent mode directive if available
agent_directive = get_agent_directive(agent_key, response_mode)
if agent_directive:
invoke_messages = [SystemMessage(content=agent_directive)] + langchain_messages
else:
invoke_messages = langchain_messages
def _has_user_content(message: Any) -> bool:
if not isinstance(message, HumanMessage):
return False
content = message.content
if isinstance(content, str):
return bool(content.strip())
if isinstance(content, list):
return any(
(
isinstance(part, str) and bool(part.strip())
) or (
isinstance(part, dict) and (
bool(str(part.get("text", "")).strip())
or bool(part.get("data"))
)
)
for part in content
)
return bool(content)
if not any(_has_user_content(msg) for msg in invoke_messages):
logger.warning(
"Skipping %s invocation because no non-empty HumanMessage is available in state.",
agent_key,
)
return {
"final_response": "I didn't receive a valid user message for this turn. Please send your request again.",
"response_agent": agent_key,
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
try:
with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address):
response = agent.invoke({"messages": invoke_messages}, config=config)
except Exception:
logger.exception("Error invoking %s", agent_key)
return {
"final_response": "Sorry, an error occurred while processing your request.",
"response_agent": agent_key,
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
agent_name, text, messages_out = extract_response_from_graph(response)
resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else agent_key
meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out)
return {
"final_response": text,
"response_agent": resolved_agent_name,
"response_metadata": meta,
"raw_agent_messages": messages_out,
"nodes_executed": nodes,
}
def crypto_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_simple_agent("crypto_agent", state, config)
def search_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_simple_agent("search_agent", state, config)
def default_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_simple_agent("default_agent", state, config)
def portfolio_advisor_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
"""Invoke the portfolio advisor with wallet_address session context."""
user_id = state.get("user_id")
conversation_id = state.get("conversation_id")
wallet_address = state.get("wallet_address")
response_mode = state.get("response_mode", "fast")
langchain_messages = list(state.get("langchain_messages", []))
nodes = list(state.get("nodes_executed", []))
nodes.append("portfolio_advisor_node")
agent = _get_agent("portfolio_advisor", response_mode)
if not agent:
return {
"final_response": "Portfolio advisor is not available.",
"response_agent": "portfolio_advisor",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
# Inject system prompt + per-agent mode directive
scoped_messages = [SystemMessage(content=PORTFOLIO_ADVISOR_SYSTEM_PROMPT)]
agent_directive = get_agent_directive("portfolio_advisor", response_mode)
if agent_directive:
scoped_messages.append(SystemMessage(content=agent_directive))
scoped_messages.extend(langchain_messages)
if not any(isinstance(msg, HumanMessage) and (
bool(msg.content.strip()) if isinstance(msg.content, str)
else any(
(isinstance(part, str) and bool(part.strip()))
or (isinstance(part, dict) and (bool(str(part.get("text", "")).strip()) or bool(part.get("data"))))
for part in (msg.content or [])
)
) for msg in scoped_messages):
logger.warning("Skipping portfolio_advisor invocation because no non-empty HumanMessage is available in state.")
return {
"final_response": "I didn't receive a valid user message for this turn. Please send your request again.",
"response_agent": "portfolio_advisor",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
try:
with portfolio_session(user_id=user_id, conversation_id=conversation_id, wallet_address=wallet_address):
response = agent.invoke({"messages": scoped_messages}, config=config)
except Exception:
logger.exception("Error invoking portfolio_advisor")
return {
"final_response": "Sorry, an error occurred while analyzing your portfolio.",
"response_agent": "portfolio_advisor",
"response_metadata": {},
"raw_agent_messages": [],
"nodes_executed": nodes,
}
agent_name, text, messages_out = extract_response_from_graph(response)
resolved_agent_name = agent_name if agent_name and agent_name != "supervisor" else "portfolio_advisor"
meta = build_metadata(resolved_agent_name, user_id, conversation_id, messages_out)
return {
"final_response": text,
"response_agent": resolved_agent_name,
"response_metadata": meta,
"raw_agent_messages": messages_out,
"nodes_executed": nodes,
}
def database_agent_node(state: AgentState, config: RunnableConfig | None = None) -> dict:
return _invoke_simple_agent("database_agent", state, config)