Qurio / backend-python /src /services /agent_registry.py
veeiiinnnnn's picture
new
592cb1d
"""
Agent registry built with Agno SDK (Agent + AgentOS).
"""
from __future__ import annotations
import os
import json
from types import SimpleNamespace
from typing import Any
from agno.agent import Agent
from agno.skills import Skills, LocalSkills
# from agno.db.postgres import PostgresDb
# from agno.memory import MemoryManager
from agno.models.google import Gemini
from agno.models.openai import OpenAILike
# from agno.session.summary import SessionSummaryManager
from agno.utils.log import logger
from ..config import get_settings
from ..models.db import DbFilter, DbQueryRequest
from .db_service import get_db_adapter
from .custom_tools import (
DuckDuckGoImageTools,
DuckDuckGoVideoTools,
DuckDuckGoWebSearchTools,
QurioLocalTools,
SerpApiImageTools,
)
from .tool_registry import AGNO_TOOLS, IMAGE_SEARCH_TOOLS, LOCAL_TOOLS, VIDEO_SEARCH_TOOLS, resolve_tool_name
from .user_tools import build_user_tools_toolkit
try:
from agno.tools.exa import ExaTools
except Exception:
ExaTools = None
EXA_SEARCH_TOOL_SET = {"search_exa"}
EXA_ALLOWED_CATEGORIES = {
"company",
"research paper",
"news",
"pdf",
"github",
"tweet",
"personal site",
"linkedin profile",
"financial report",
}
EXA_TIMEOUT_SECONDS = max(
15,
int(os.getenv("EXA_TOOLS_TIMEOUT_SECONDS", os.getenv("EXA_MCP_TIMEOUT_SECONDS", "45"))),
)
DEFAULT_MODELS: dict[str, str] = {
"openai": os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
"openai_compatibility": os.getenv("OPENAI_COMPAT_MODEL", "gpt-4o-mini"),
"siliconflow": os.getenv("SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct"),
"glm": os.getenv("GLM_MODEL", "glm-4-flash"),
"deepseek": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
"volcengine": os.getenv("VOLCENGINE_MODEL", "doubao-seed-1-6-thinking-250615"),
"modelscope": os.getenv("MODELSCOPE_MODEL", "AI-ModelScope/glm-4-9b-chat"),
"kimi": os.getenv("KIMI_MODEL", "moonshot-v1-8k"),
"gemini": os.getenv("GEMINI_MODEL", "gemini-2.0-flash-exp"),
"nvidia": os.getenv("NVIDIA_MODEL", "deepseek-ai/deepseek-r1"),
"minimax": os.getenv("MINIMAX_MODEL", "minimax-m2"),
}
DEFAULT_BASE_URLS: dict[str, str] = {
"openai": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"openai_compatibility": os.getenv("OPENAI_COMPAT_BASE_URL", "https://api.openai.com/v1"),
"siliconflow": os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1"),
"glm": os.getenv("GLM_BASE_URL", "https://open.bigmodel.cn/api/paas/v4"),
"deepseek": os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"),
"volcengine": os.getenv("VOLCENGINE_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3"),
"modelscope": os.getenv("MODELSCOPE_BASE_URL", "https://api-inference.modelscope.cn/v1"),
"kimi": os.getenv("KIMI_BASE_URL", "https://api.moonshot.cn/v1"),
"nvidia": os.getenv("NVIDIA_BASE_URL", "https://integrate.api.nvidia.com/v1"),
"minimax": os.getenv("MINIMAX_BASE_URL", "https://api.minimax.io/v1"),
}
# These will be initialized within functions using get_settings() to ensure .env is loaded
# MEMORY_LITE_PROVIDER = ...
# MEMORY_LITE_MODEL = ...
# MEMORY_LITE_BASE_URL = ...
# MEMORY_AGENT_API_KEY = ...
# Global database instance to avoid multiple table definitions in SQLAlchemy
# Global database instance to avoid multiple table definitions in SQLAlchemy
# _agent_db was removed as we use DbAdapter pattern.
def _build_model(provider: str, api_key: str | None, base_url: str | None, model: str | None):
provider_key = provider or "openai"
model_id = model or DEFAULT_MODELS.get(provider_key) or DEFAULT_MODELS["openai"]
resolved_base = base_url or DEFAULT_BASE_URLS.get(provider_key) or DEFAULT_BASE_URLS["openai"]
if provider_key == "gemini":
return Gemini(id=model_id, api_key=api_key)
return OpenAILike(id=model_id, api_key=api_key, base_url=resolved_base)
def _merge_model_dict_attr(model: Any, attr: str, payload: dict[str, Any]) -> None:
if not payload:
return
current = getattr(model, attr, None)
if current is None:
setattr(model, attr, dict(payload))
elif isinstance(current, dict):
merged = {**current, **payload}
setattr(model, attr, merged)
def _apply_common_params(model: Any, request: Any) -> None:
if request.temperature is not None and hasattr(model, "temperature"):
model.temperature = request.temperature
if request.top_p is not None and hasattr(model, "top_p"):
model.top_p = request.top_p
if request.frequency_penalty is not None and hasattr(model, "frequency_penalty"):
model.frequency_penalty = request.frequency_penalty
if request.presence_penalty is not None and hasattr(model, "presence_penalty"):
model.presence_penalty = request.presence_penalty
if request.top_k is not None:
if hasattr(model, "top_k"):
model.top_k = request.top_k
else:
_merge_model_dict_attr(model, "extra_body", {"top_k": request.top_k})
def _apply_thinking_params(model: Any, provider: str, thinking: dict[str, Any] | bool | None) -> None:
if not thinking:
return
if provider == "gemini":
if isinstance(thinking, dict):
config = thinking.get("thinkingConfig") or thinking.get("thinking_config") or {}
include = config.get("includeThoughts") or config.get("include_thoughts")
budget = config.get("thinkingBudget") or config.get("thinking_budget")
level = config.get("thinkingLevel") or config.get("thinking_level")
if include is not None and hasattr(model, "include_thoughts"):
model.include_thoughts = include
if budget is not None and hasattr(model, "thinking_budget"):
model.thinking_budget = budget
if level is not None and hasattr(model, "thinking_level"):
model.thinking_level = level
elif hasattr(model, "include_thoughts"):
model.include_thoughts = True
return
if provider in {"siliconflow", "modelscope"}:
budget = None
if isinstance(thinking, dict):
budget = thinking.get("budget_tokens") or thinking.get("budgetTokens")
if budget is None:
budget = 1024
model_id_lower = str(getattr(model, "id", "") or "").lower()
is_siliconflow_kimi_thinking = (
provider == "siliconflow"
and "kimi" in model_id_lower
and "thinking" in model_id_lower
)
# SiliconFlow Kimi-thinking models may reject `enable_thinking`.
if is_siliconflow_kimi_thinking:
_merge_model_dict_attr(model, "extra_body", {"thinking_budget": budget})
current_extra = getattr(model, "extra_body", None)
if isinstance(current_extra, dict) and "enable_thinking" in current_extra:
merged = dict(current_extra)
merged.pop("enable_thinking", None)
setattr(model, "extra_body", merged)
else:
_merge_model_dict_attr(
model,
"extra_body",
{"enable_thinking": True, "thinking_budget": budget},
)
# _merge_model_dict_attr(
# model,
# "request_params",
# {"enable_thinking": True, "thinking_budget": budget},
# )
return
if provider == "nvidia":
_merge_model_dict_attr(model, "extra_body", {"chat_template_kwargs": {"thinking": True}})
return
if provider == "minimax":
if isinstance(thinking, dict) and isinstance(thinking.get("extra_body"), dict):
_merge_model_dict_attr(model, "extra_body", thinking.get("extra_body"))
else:
_merge_model_dict_attr(model, "extra_body", {"reasoning_split": True})
return
if provider in {"glm", "deepseek", "volcengine"}:
if isinstance(thinking, dict) and thinking.get("type"):
payload = {"thinking": {"type": thinking.get("type")}}
_merge_model_dict_attr(model, "extra_body", payload)
# _merge_model_dict_attr(model, "request_params", payload)
return
if provider == "kimi":
if isinstance(thinking, dict):
max_tokens = thinking.get("max_tokens")
temperature = thinking.get("temperature")
if max_tokens is not None and hasattr(model, "max_tokens"):
model.max_tokens = max_tokens
if temperature is not None and hasattr(model, "temperature"):
model.temperature = temperature
return
if provider == "openai_compatibility":
if isinstance(thinking, dict):
extra_body = thinking.get("extra_body")
if isinstance(extra_body, dict):
_merge_model_dict_attr(model, "extra_body", extra_body)
return
def _apply_model_settings(model: Any, request: Any) -> None:
_apply_common_params(model, request)
_apply_thinking_params(model, request.provider, request.thinking)
def _collect_enabled_tool_names(request: Any) -> list[str]:
names: list[str] = []
if request.provider != "gemini":
for tool_id in request.tool_ids or []:
names.append(resolve_tool_name(str(tool_id)))
for tool_def in request.tools or []:
if hasattr(tool_def, "model_dump"):
tool_def = tool_def.model_dump()
name = tool_def.get("function", {}).get("name") if isinstance(tool_def, dict) else None
if name:
names.append(resolve_tool_name(name))
for user_tool in request.user_tools or []:
if getattr(user_tool, "name", None):
names.append(str(user_tool.name))
elif isinstance(user_tool, dict) and user_tool.get("name"):
names.append(str(user_tool["name"]))
# Disable interactive forms in expert mode (team mode)
is_expert = getattr(request, "expert_mode", False) or bool(getattr(request, "team_agent_ids", []))
if is_expert:
names = [n for n in names if n != "interactive_form"]
return names
def _has_selected_skills(request: Any) -> bool:
"""Check if any manual/external skills are selected in the request."""
raw_skill_ids = getattr(request, "skill_ids", None)
if isinstance(raw_skill_ids, list):
return any(str(item or "").strip() for item in raw_skill_ids)
if isinstance(raw_skill_ids, str):
return bool(raw_skill_ids.strip())
return False
def _has_skills(request: Any) -> bool:
"""Check if the agent will have any active skills (internal or external)."""
if not getattr(request, "enable_skills", False):
return False
# 1. Check for manual/external skills
if _has_selected_skills(request):
return True
# 2. Check for internal skills.
# Note: agent-memory and skill-creator are handled specifically, but other
# internal skills are loaded by default if enable_skills is True.
if getattr(request, "enable_long_term_memory", False):
return True
internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills')
if os.path.isdir(internal_skills_dir):
for item in os.listdir(internal_skills_dir):
if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"):
continue
if os.path.isdir(os.path.join(internal_skills_dir, item)):
return True
return False
def _build_tools(request: Any) -> list[Any]:
enabled_names = set(_collect_enabled_tool_names(request))
if (
not enabled_names
and not request.user_tools
and not getattr(request, "enable_skills", False)
):
return []
serpapi_api_key = getattr(request, "serpapi_api_key", None)
local_tool_names = {tool["name"] for tool in LOCAL_TOOLS}
include_local = sorted([name for name in enabled_names if name in local_tool_names])
# Inject skill execution tools if ANY skill (internal or external) is present
if _has_skills(request):
include_local = sorted(
set(include_local) | {"execute_skill_script", "install_skill_dependency"}
)
tools: list[Any] = []
if include_local:
tools.append(
QurioLocalTools(
tavily_api_key=request.tavily_api_key,
include_tools=include_local,
)
)
agno_tool_names = {tool["name"] for tool in AGNO_TOOLS}
include_agno = sorted([name for name in enabled_names if name in agno_tool_names])
# Always include zero-config image/video search tools by default if not explicitly disabled
# SerpApi-based tools are only included if API key is configured
if not getattr(request, "skip_default_tools", False):
# Zero-config tools (DuckDuckGo) - always include
default_image_tools = {"duckduckgo_image_search"}
default_video_tools = {"duckduckgo_video_search"}
# SerpApi-based tools - only include if API key is available
serpapi_image_tools = {"google_image_search", "serpapi_image_search", "bing_image_search"}
serpapi_video_tools = {"search_youtube"}
default_tools = default_image_tools | default_video_tools
if serpapi_api_key:
default_tools = default_tools | serpapi_image_tools | serpapi_video_tools
include_agno = sorted(list(set(include_agno) | default_tools))
if include_agno:
tools.extend(_build_agno_toolkits(request, include_agno))
user_toolkit = build_user_tools_toolkit(
[tool.model_dump() if hasattr(tool, "model_dump") else tool for tool in request.user_tools or []]
)
if user_toolkit:
tools.append(user_toolkit)
mcp_url = os.getenv("MCP_SERVER_URL")
if mcp_url:
try:
from agno.tools.mcp import MCPTools
except Exception:
MCPTools = None
if MCPTools:
tools.append(MCPTools(url=mcp_url, transport=os.getenv("MCP_TRANSPORT", "streamable-http")))
return tools
def _build_agno_toolkits(request: Any, include_agno: list[str]) -> list[Any]:
toolkits: list[Any] = []
include_set = set(include_agno)
serpapi_api_key = getattr(request, "serpapi_api_key", None)
tavily_tools = {"web_search_using_tavily", "web_search_with_tavily", "extract_url_content"}
if include_set.intersection(tavily_tools):
try:
from agno.tools.tavily import TavilyTools
except Exception:
TavilyTools = None
if TavilyTools:
selected = [name for name in include_agno if name in tavily_tools]
toolkits.append(TavilyTools(api_key=request.tavily_api_key, include_tools=selected))
websearch_tools = {"web_search", "search_news"}
if include_set.intersection(websearch_tools):
backend = getattr(request, "search_backend", None) or "auto"
if backend == "exa":
exa_toolkit = _build_exa_toolkit(
getattr(request, "exa_api_key", None),
getattr(request, "exa_search_category", None),
)
if exa_toolkit:
toolkits.append(exa_toolkit)
else:
logger.warning("Exa backend requested but ExaTools is unavailable or API key is missing.")
else:
selected = [name for name in include_agno if name in websearch_tools]
toolkits.append(
DuckDuckGoWebSearchTools(
include_tools=selected,
backend=backend,
)
)
if include_set.intersection(EXA_SEARCH_TOOL_SET) and not (
(getattr(request, "search_backend", None) or "auto") == "exa"
and include_set.intersection(websearch_tools)
):
exa_toolkit = _build_exa_toolkit(
getattr(request, "exa_api_key", None),
getattr(request, "exa_search_category", None),
)
if exa_toolkit:
toolkits.append(exa_toolkit)
else:
logger.warning("Exa tool requested but ExaTools is unavailable or API key is missing.")
arxiv_tools = {"search_arxiv_and_return_articles", "read_arxiv_papers"}
if include_set.intersection(arxiv_tools):
try:
from agno.tools.arxiv import ArxivTools
except Exception:
ArxivTools = None
if ArxivTools:
selected = [name for name in include_agno if name in arxiv_tools]
toolkits.append(ArxivTools(include_tools=selected))
wikipedia_tools = {"search_wikipedia"}
if include_set.intersection(wikipedia_tools):
try:
from agno.tools.wikipedia import WikipediaTools
except Exception:
WikipediaTools = None
if WikipediaTools:
toolkits.append(WikipediaTools(include_tools=["search_wikipedia"]))
yfinance_tools = {
"yfinance_tools",
"get_current_stock_price",
"get_company_info",
"get_stock_fundamentals",
"get_income_statements",
"get_key_financial_ratios",
"get_analyst_recommendations",
"get_company_news",
"get_technical_indicators",
"get_historical_stock_prices",
}
if include_set.intersection(yfinance_tools):
try:
from agno.tools.yfinance import YFinanceTools
except Exception:
YFinanceTools = None
if YFinanceTools:
# Keep YFinance toolkit initialization aligned with Agno's default usage.
# Some SDK versions expose different subsets/names, which can make
# include_tools fail with "tool not present in toolkit".
toolkits.append(YFinanceTools())
image_search_tools = {
"duckduckgo_image_search",
"google_image_search",
"serpapi_image_search",
"bing_image_search",
}
if include_set.intersection(image_search_tools):
# DuckDuckGo Image Search (Custom) - always available, no config needed
if "duckduckgo_image_search" in include_set:
toolkits.append(DuckDuckGoImageTools(include_tools=["duckduckgo_image_search"]))
# SerpApi Image Search (Custom) - only add if API key is configured
serpapi_tools = {
"google_image_search",
"serpapi_image_search",
"bing_image_search",
}
serpapi_include = sorted([name for name in include_set if name in serpapi_tools])
# Only add SerpApi tools if API key is available
if serpapi_include and serpapi_api_key:
toolkits.append(
SerpApiImageTools(
api_key=serpapi_api_key, include_tools=serpapi_include
)
)
video_search_tools = {
"duckduckgo_video_search",
"search_youtube",
}
if include_set.intersection(video_search_tools):
# DuckDuckGo Video Search (Custom) - always available, no config needed
if "duckduckgo_video_search" in include_set:
toolkits.append(DuckDuckGoVideoTools(include_tools=["duckduckgo_video_search"]))
# YouTube Search via SerpApi - only add if API key is configured
if "search_youtube" in include_set and serpapi_api_key:
try:
from agno.tools.serpapi import SerpApiTools as AgnoSerpApiTools
except Exception:
AgnoSerpApiTools = None
if AgnoSerpApiTools:
toolkits.append(
AgnoSerpApiTools(
api_key=serpapi_api_key,
enable_search_google=False,
enable_search_youtube=True,
)
)
return toolkits
def _normalize_exa_category(category: str | None) -> str | None:
normalized = str(category or "").strip().lower()
if not normalized or normalized == "auto":
return None
return normalized if normalized in EXA_ALLOWED_CATEGORIES else None
def _build_exa_toolkit(exa_api_key: str | None, category: str | None = None) -> Any | None:
if ExaTools is None:
return None
trimmed_key = str(exa_api_key or os.getenv("EXA_API_KEY") or "").strip()
if not trimmed_key:
return None
return ExaTools(
api_key=trimmed_key,
enable_search=True,
enable_get_contents=False,
enable_find_similar=False,
enable_answer=False,
timeout=EXA_TIMEOUT_SECONDS,
category=_normalize_exa_category(category),
)
def get_summary_model(request: Any) -> Any | None:
"""
Get the lite model for session summary generation from environment variables.
This is a simplified implementation that uses global configuration.
Future enhancement: Support per-agent lite_model from database.
Returns:
Agno model instance for summary generation, or None if unavailable
"""
settings = get_settings()
try:
# Priority: Request params > Global Settings
# Priority: Request params (summary_*) > Global Settings (summary_*)
lite_provider = getattr(request, "summary_provider", None) or settings.summary_lite_provider
lite_model = getattr(request, "summary_model", None) or settings.summary_lite_model
lite_api_key = getattr(request, "summary_api_key", None) or settings.summary_agent_api_key
lite_base_url = getattr(request, "summary_base_url", None) or settings.summary_lite_base_url
if not lite_model or not lite_api_key:
logger.warning("Lite Model not configured (checked request summary_* params and SUMMARY_LITE_MODEL env var)")
return None
source = "Request-Specific" if getattr(request, "summary_model", None) else "Global-Default"
logger.info(f"[{source}] Selected Lite Model for Session Summary: {lite_provider}/{lite_model}")
# If no base_url provided, use the default for the provider
resolved_base = lite_base_url or DEFAULT_BASE_URLS.get(lite_provider) or DEFAULT_BASE_URLS["openai"]
summary_model = _build_model(lite_provider, lite_api_key, resolved_base, lite_model)
# Disable native structured outputs for summary model to ensure robust parsing with non-OpenAI providers (like GLM)
# This only affects this specific summary_model instance.
if hasattr(summary_model, "supports_native_structured_outputs"):
summary_model.supports_native_structured_outputs = False
return summary_model
except Exception as exc:
logger.warning(f"Failed to build lite_model for session summary: {exc}")
return None
def build_agent(request: Any = None, **kwargs: Any) -> Agent:
# Backward-compatible shim for legacy build_agent(provider=..., api_key=...) calls.
if request is None or kwargs:
provider = request if isinstance(request, str) else kwargs.get("provider")
request = SimpleNamespace(
provider=provider or "openai",
api_key=kwargs.get("api_key"),
base_url=kwargs.get("base_url"),
model=kwargs.get("model"),
tavily_api_key=kwargs.get("tavily_api_key"),
temperature=kwargs.get("temperature"),
top_p=kwargs.get("top_p"),
top_k=kwargs.get("top_k"),
frequency_penalty=kwargs.get("frequency_penalty"),
presence_penalty=kwargs.get("presence_penalty"),
thinking=kwargs.get("thinking"),
tool_ids=kwargs.get("tool_ids"),
tools=kwargs.get("tools"),
user_tools=kwargs.get("user_tools"),
tool_choice=kwargs.get("tool_choice"),
)
model = _build_model(request.provider, request.api_key, request.base_url, request.model)
_apply_model_settings(model, request)
tools = _build_tools(request)
# memory_kwargs = _build_memory_kwargs(request)
tool_choice = request.tool_choice
if tool_choice is None and tools:
tool_choice = "auto"
# 1. Conditional instructions: Multi-form guidance & Image/Video rendering
enabled_names = set(_collect_enabled_tool_names(request))
# Add image and video search tools to enabled_names set since they are forced
image_search_names = {tool["name"] for tool in IMAGE_SEARCH_TOOLS}
video_search_names = {tool["name"] for tool in VIDEO_SEARCH_TOOLS}
enabled_names.update(image_search_names)
enabled_names.update(video_search_names)
instructions_list = []
if "interactive_form" in enabled_names:
instructions_list.append(
"When using the interactive_form tool to collect user information: "
"If the user's initial responses lack critical details needed to fulfill their request, "
"you MUST call interactive_form again to gather the missing specific information. "
"Do not proceed with incomplete information. "
"However, limit to 2-3 forms maximum per conversation to respect user time."
)
if getattr(request, "enable_skills", False) and _has_selected_skills(request):
instructions_list.append(
"When a skill tells you to run a bundled script, do not merely summarize or restate the script. "
"Use execute_skill_script to actually run the file and rely on its stdout/stderr."
)
instructions_list.append(
"When a skill script fails because of a missing Python dependency "
"(for example ModuleNotFoundError or ImportError), do not pretend it succeeded. "
"First explain the missing package briefly, then use interactive_form to ask for explicit approval "
"before installing anything. Do NOT ask for approval in plain text if interactive_form is available. "
"For dependency approval, keep the form minimal: include the package name in the form title or description, "
"and ask only for an approval choice when skill_id and package_name are already known from context. "
"Do NOT ask the user to retype known values like skill_id or package_name. "
"Only after the user approves may you call install_skill_dependency. "
"If the user declines, stop and report that installation was skipped."
)
if "duckduckgo_image_search" in enabled_names or "google_image_search" in enabled_names:
instructions_list.append(
"When explaining concepts that can benefit from visual aids (like Logo, diagrams, or photos), "
"you should use the image search tools to find relevant images. "
"ALWAYS render images in your response using markdown format: ![caption](url). "
"Place images appropriately within your explanation to enhance user understanding."
)
if "duckduckgo_video_search" in enabled_names or "search_youtube" in enabled_names:
instructions_list.append(
"When users ask about tutorials, demonstrations, or topics that benefit from video content, "
"you should use the video search tools to find relevant videos. "
"ALWAYS include video links in your response using markdown format with descriptive text. "
"Provide context about why each video is relevant to the user's query."
)
if getattr(request, "search_backend", None) == "exa" and (
"web_search" in enabled_names or bool(enabled_names.intersection(EXA_SEARCH_TOOL_SET))
):
exa_category = _normalize_exa_category(getattr(request, "exa_search_category", None))
if exa_category:
instructions_list.append(
f"Exa search is available via search_exa and is constrained to the '{exa_category}' category."
)
else:
instructions_list.append(
"Exa search is available via search_exa. Use it for current web information, and choose a category when it helps."
)
instructions = "\n\n".join(instructions_list) if instructions_list else None
# 2. Agent Construction (Stateless / Manual Context)
# We do NOT inject 'db' or 'memory' here.
# Session context (history + summary) is injected manually in stream_chat.py
skills = None
if getattr(request, "enable_skills", False):
skills_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.skills')
internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills')
requested_skills = getattr(request, "skill_ids", [])
if isinstance(requested_skills, str):
try:
requested_skills = json.loads(requested_skills)
except (json.JSONDecodeError, TypeError):
requested_skills = []
paths = []
# Inject built-in agent-memory skill if long term memory is enabled
if getattr(request, "enable_long_term_memory", False):
am_path = os.path.join(internal_skills_dir, "agent-memory")
if os.path.isdir(am_path):
paths.append(am_path)
# Inject any other internal skills by default (except for specific non-autoloading skills)
if os.path.isdir(internal_skills_dir):
for item in os.listdir(internal_skills_dir):
if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"):
continue
item_path = os.path.join(internal_skills_dir, item)
if os.path.isdir(item_path):
paths.append(item_path)
if requested_skills:
for skill_id in requested_skills:
internal_skill_path = os.path.join(internal_skills_dir, skill_id)
external_skill_path = os.path.join(skills_dir, skill_id)
if os.path.isdir(internal_skill_path):
paths.append(internal_skill_path)
elif os.path.isdir(external_skill_path):
paths.append(external_skill_path)
if paths:
skills = Skills(loaders=[LocalSkills(path) for path in paths])
# Merge personalized prompt with tool-derived instructions
personalized = getattr(request, "personalized_prompt", None)
if personalized:
if instructions:
instructions = f"{personalized}\n\n{instructions}"
else:
instructions = personalized
# Use resolved agent name/description if available (for Team member identification)
agent_id = getattr(request, "agent_id", None) or f"qurio-{request.provider}"
agent_name = getattr(request, "agent_name", None) or f"Qurio {request.provider} Agent"
agent_description = getattr(request, "agent_description", None)
return Agent(
id=agent_id,
name=agent_name,
description=agent_description,
role=agent_description,
model=model,
tools=tools or None,
markdown=True,
tool_choice=tool_choice,
instructions=instructions,
skills=skills,
)
# Mapping of provider to user_settings key for API keys
_PROVIDER_KEY_MAP: dict[str, str] = {
"gemini": "googleApiKey",
"openai": "OpenAICompatibilityKey",
"openai_compatibility": "OpenAICompatibilityKey",
"siliconflow": "SiliconFlowKey",
"glm": "GlmKey",
"deepseek": "DeepSeekKey",
"volcengine": "VolcengineKey",
"modelscope": "ModelScopeKey",
"kimi": "KimiKey",
"nvidia": "NvidiaKey",
"minimax": "MinimaxKey",
}
def _get_provider_credentials(provider: str) -> tuple[str | None, str | None]:
"""
Get API key and base URL for a provider.
Priority: user_settings table -> environment variables.
"""
api_key: str | None = None
base_url = DEFAULT_BASE_URLS.get(provider)
# 1. Try to get API key from user_settings table
db_key = _PROVIDER_KEY_MAP.get(provider)
if db_key:
try:
adapter = get_db_adapter()
if adapter:
req = DbQueryRequest(
action="select",
table="user_settings",
filters=[DbFilter(op="eq", column="key", value=db_key)],
maybe_single=True,
)
result = adapter.execute(req)
if result.data:
data = result.data
if isinstance(data, list) and len(data) > 0:
data = data[0]
if isinstance(data, dict):
api_key = data.get("value")
except Exception as e:
logger.debug(f"Failed to get API key from user_settings for {provider}: {e}")
# 2. Fallback to environment variables
if not api_key:
provider_upper = provider.upper().replace("-", "_")
api_key = os.getenv(f"{provider_upper}_API_KEY")
return api_key, base_url
def resolve_agent_config(agent_id: str, base_request: Any) -> Any:
"""Fetch agent configuration from database and merge with base request secrets."""
import copy
from types import SimpleNamespace
adapter = get_db_adapter()
if not adapter:
return base_request
try:
query_req = DbQueryRequest(
action="select",
table="agents",
filters=[DbFilter(op="eq", column="id", value=agent_id)],
maybe_single=True
)
response = adapter.execute(query_req)
if response.error:
logger.warning(f"[resolve_agent_config] DB error for agent_id={agent_id}: {response.error}")
return base_request
if not response.data:
logger.warning(f"[resolve_agent_config] No agent found with id={agent_id}")
return base_request
# Handle case where data might be a list or a single dict
agent_data = response.data
if isinstance(agent_data, list):
if len(agent_data) == 0:
return base_request
agent_data = agent_data[0]
new_req = copy.deepcopy(base_request)
# Override provider and get corresponding credentials from settings
agent_provider = agent_data.get("provider")
if agent_provider:
new_req.provider = agent_provider
# Get API key and base URL for this provider from environment
api_key, base_url = _get_provider_credentials(agent_provider)
if api_key:
new_req.api_key = api_key
if base_url:
new_req.base_url = base_url
new_req.model = agent_data.get("default_model") or base_request.model
new_req.tool_ids = agent_data.get("tool_ids") or []
new_req.skill_ids = agent_data.get("skill_ids") or []
# Handle instructions (personalized prompt)
# We'll store it in a custom attribute that build_agent can pick up
new_req.personalized_prompt = agent_data.get("prompt")
# Store agent name and description for proper identification in Teams
new_req.agent_id = agent_id # Store the original agent_id
new_req.agent_name = agent_data.get("name")
new_req.agent_emoji = agent_data.get("emoji")
new_req.agent_description = agent_data.get("description")
logger.info(f"[resolve_agent_config] Resolved agent: id={agent_id}, name={new_req.agent_name}, provider={agent_provider}")
# Override generation params if configured
if not agent_data.get("use_global_model_settings"):
if agent_data.get("temperature") is not None:
new_req.temperature = agent_data.get("temperature")
if agent_data.get("top_p") is not None:
new_req.top_p = agent_data.get("top_p")
if agent_data.get("frequency_penalty") is not None:
new_req.frequency_penalty = agent_data.get("frequency_penalty")
if agent_data.get("presence_penalty") is not None:
new_req.presence_penalty = agent_data.get("presence_penalty")
return new_req
except Exception as e:
logger.error(f"Failed to resolve agent config for {agent_id}: {e}")
return base_request
def build_memory_agent(
user_id: str | None = None,
provider: str | None = None,
model: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
) -> Agent:
settings = get_settings()
resolved_provider = provider or settings.memory_lite_provider
resolved_model = model or settings.memory_lite_model
resolved_api_key = api_key or settings.memory_agent_api_key or os.getenv("OPENAI_API_KEY")
resolved_base_url = (
base_url
or DEFAULT_BASE_URLS.get(resolved_provider)
or DEFAULT_BASE_URLS["openai"]
)
memory_request = SimpleNamespace(
provider=resolved_provider,
api_key=resolved_api_key,
base_url=resolved_base_url,
model=resolved_model,
tavily_api_key=os.getenv("TAVILY_API_KEY"),
temperature=None,
top_p=None,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
thinking=None,
tool_ids=[],
tools=None,
user_tools=None,
tool_choice=None,
enable_long_term_memory=True,
database_provider=os.getenv("DATABASE_PROVIDER") or "default",
user_id=user_id,
enable_skills=False, # Helper agent: keep skills disabled
)
return build_agent(memory_request)
def get_agent_for_provider(request: Any) -> Agent:
return build_agent(request)
def build_team(request: Any, members: list[Agent]) -> Any:
"""Build an Agno Team from a list of member agents."""
from agno.team import Team
from agno.team.mode import TeamMode
# Map requested team mode string to TeamMode enum
mode_str = getattr(request, "team_mode", "route")
try:
team_mode = TeamMode(mode_str)
except Exception:
team_mode = TeamMode.route
# Build a leader agent to inherit tools, skills, and personalised prompt
leader_agent = build_agent(request)
instructions = (
"You are the Team Leader coordinating a group of expert agents. "
"Analyze the user's request. Based on the expertise of your team members, "
"delegate sub-tasks or questions to them. Finally, synthesize and summarize their findings into a comprehensive response."
)
# Prepend leader's own instructions if they exist
if leader_agent.instructions:
instructions = f"{leader_agent.instructions}\n\n{instructions}"
team = Team(
name=getattr(request, "agent_name", "Expert Team"),
members=members,
model=leader_agent.model,
tools=leader_agent.tools,
mode=team_mode,
instructions=instructions,
markdown=True,
stream_member_events=True, # Ensure member events are streamed
)
# Set agent_id manually as Team constructor might not support it directly
team.agent_id = getattr(request, "agent_id", None)
return team