Spaces:
Running
Running
File size: 39,147 Bytes
4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d 4ef118d 592cb1d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 | """
Agent registry built with Agno SDK (Agent + AgentOS).
"""
from __future__ import annotations
import os
import json
from types import SimpleNamespace
from typing import Any
from agno.agent import Agent
from agno.skills import Skills, LocalSkills
# from agno.db.postgres import PostgresDb
# from agno.memory import MemoryManager
from agno.models.google import Gemini
from agno.models.openai import OpenAILike
# from agno.session.summary import SessionSummaryManager
from agno.utils.log import logger
from ..config import get_settings
from ..models.db import DbFilter, DbQueryRequest
from .db_service import get_db_adapter
from .custom_tools import (
DuckDuckGoImageTools,
DuckDuckGoVideoTools,
DuckDuckGoWebSearchTools,
QurioLocalTools,
SerpApiImageTools,
)
from .tool_registry import AGNO_TOOLS, IMAGE_SEARCH_TOOLS, LOCAL_TOOLS, VIDEO_SEARCH_TOOLS, resolve_tool_name
from .user_tools import build_user_tools_toolkit
try:
from agno.tools.exa import ExaTools
except Exception:
ExaTools = None
EXA_SEARCH_TOOL_SET = {"search_exa"}
EXA_ALLOWED_CATEGORIES = {
"company",
"research paper",
"news",
"pdf",
"github",
"tweet",
"personal site",
"linkedin profile",
"financial report",
}
EXA_TIMEOUT_SECONDS = max(
15,
int(os.getenv("EXA_TOOLS_TIMEOUT_SECONDS", os.getenv("EXA_MCP_TIMEOUT_SECONDS", "45"))),
)
DEFAULT_MODELS: dict[str, str] = {
"openai": os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
"openai_compatibility": os.getenv("OPENAI_COMPAT_MODEL", "gpt-4o-mini"),
"siliconflow": os.getenv("SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct"),
"glm": os.getenv("GLM_MODEL", "glm-4-flash"),
"deepseek": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
"volcengine": os.getenv("VOLCENGINE_MODEL", "doubao-seed-1-6-thinking-250615"),
"modelscope": os.getenv("MODELSCOPE_MODEL", "AI-ModelScope/glm-4-9b-chat"),
"kimi": os.getenv("KIMI_MODEL", "moonshot-v1-8k"),
"gemini": os.getenv("GEMINI_MODEL", "gemini-2.0-flash-exp"),
"nvidia": os.getenv("NVIDIA_MODEL", "deepseek-ai/deepseek-r1"),
"minimax": os.getenv("MINIMAX_MODEL", "minimax-m2"),
}
DEFAULT_BASE_URLS: dict[str, str] = {
"openai": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"openai_compatibility": os.getenv("OPENAI_COMPAT_BASE_URL", "https://api.openai.com/v1"),
"siliconflow": os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1"),
"glm": os.getenv("GLM_BASE_URL", "https://open.bigmodel.cn/api/paas/v4"),
"deepseek": os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"),
"volcengine": os.getenv("VOLCENGINE_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3"),
"modelscope": os.getenv("MODELSCOPE_BASE_URL", "https://api-inference.modelscope.cn/v1"),
"kimi": os.getenv("KIMI_BASE_URL", "https://api.moonshot.cn/v1"),
"nvidia": os.getenv("NVIDIA_BASE_URL", "https://integrate.api.nvidia.com/v1"),
"minimax": os.getenv("MINIMAX_BASE_URL", "https://api.minimax.io/v1"),
}
# These will be initialized within functions using get_settings() to ensure .env is loaded
# MEMORY_LITE_PROVIDER = ...
# MEMORY_LITE_MODEL = ...
# MEMORY_LITE_BASE_URL = ...
# MEMORY_AGENT_API_KEY = ...
# Global database instance to avoid multiple table definitions in SQLAlchemy
# Global database instance to avoid multiple table definitions in SQLAlchemy
# _agent_db was removed as we use DbAdapter pattern.
def _build_model(provider: str, api_key: str | None, base_url: str | None, model: str | None):
provider_key = provider or "openai"
model_id = model or DEFAULT_MODELS.get(provider_key) or DEFAULT_MODELS["openai"]
resolved_base = base_url or DEFAULT_BASE_URLS.get(provider_key) or DEFAULT_BASE_URLS["openai"]
if provider_key == "gemini":
return Gemini(id=model_id, api_key=api_key)
return OpenAILike(id=model_id, api_key=api_key, base_url=resolved_base)
def _merge_model_dict_attr(model: Any, attr: str, payload: dict[str, Any]) -> None:
if not payload:
return
current = getattr(model, attr, None)
if current is None:
setattr(model, attr, dict(payload))
elif isinstance(current, dict):
merged = {**current, **payload}
setattr(model, attr, merged)
def _apply_common_params(model: Any, request: Any) -> None:
if request.temperature is not None and hasattr(model, "temperature"):
model.temperature = request.temperature
if request.top_p is not None and hasattr(model, "top_p"):
model.top_p = request.top_p
if request.frequency_penalty is not None and hasattr(model, "frequency_penalty"):
model.frequency_penalty = request.frequency_penalty
if request.presence_penalty is not None and hasattr(model, "presence_penalty"):
model.presence_penalty = request.presence_penalty
if request.top_k is not None:
if hasattr(model, "top_k"):
model.top_k = request.top_k
else:
_merge_model_dict_attr(model, "extra_body", {"top_k": request.top_k})
def _apply_thinking_params(model: Any, provider: str, thinking: dict[str, Any] | bool | None) -> None:
if not thinking:
return
if provider == "gemini":
if isinstance(thinking, dict):
config = thinking.get("thinkingConfig") or thinking.get("thinking_config") or {}
include = config.get("includeThoughts") or config.get("include_thoughts")
budget = config.get("thinkingBudget") or config.get("thinking_budget")
level = config.get("thinkingLevel") or config.get("thinking_level")
if include is not None and hasattr(model, "include_thoughts"):
model.include_thoughts = include
if budget is not None and hasattr(model, "thinking_budget"):
model.thinking_budget = budget
if level is not None and hasattr(model, "thinking_level"):
model.thinking_level = level
elif hasattr(model, "include_thoughts"):
model.include_thoughts = True
return
if provider in {"siliconflow", "modelscope"}:
budget = None
if isinstance(thinking, dict):
budget = thinking.get("budget_tokens") or thinking.get("budgetTokens")
if budget is None:
budget = 1024
model_id_lower = str(getattr(model, "id", "") or "").lower()
is_siliconflow_kimi_thinking = (
provider == "siliconflow"
and "kimi" in model_id_lower
and "thinking" in model_id_lower
)
# SiliconFlow Kimi-thinking models may reject `enable_thinking`.
if is_siliconflow_kimi_thinking:
_merge_model_dict_attr(model, "extra_body", {"thinking_budget": budget})
current_extra = getattr(model, "extra_body", None)
if isinstance(current_extra, dict) and "enable_thinking" in current_extra:
merged = dict(current_extra)
merged.pop("enable_thinking", None)
setattr(model, "extra_body", merged)
else:
_merge_model_dict_attr(
model,
"extra_body",
{"enable_thinking": True, "thinking_budget": budget},
)
# _merge_model_dict_attr(
# model,
# "request_params",
# {"enable_thinking": True, "thinking_budget": budget},
# )
return
if provider == "nvidia":
_merge_model_dict_attr(model, "extra_body", {"chat_template_kwargs": {"thinking": True}})
return
if provider == "minimax":
if isinstance(thinking, dict) and isinstance(thinking.get("extra_body"), dict):
_merge_model_dict_attr(model, "extra_body", thinking.get("extra_body"))
else:
_merge_model_dict_attr(model, "extra_body", {"reasoning_split": True})
return
if provider in {"glm", "deepseek", "volcengine"}:
if isinstance(thinking, dict) and thinking.get("type"):
payload = {"thinking": {"type": thinking.get("type")}}
_merge_model_dict_attr(model, "extra_body", payload)
# _merge_model_dict_attr(model, "request_params", payload)
return
if provider == "kimi":
if isinstance(thinking, dict):
max_tokens = thinking.get("max_tokens")
temperature = thinking.get("temperature")
if max_tokens is not None and hasattr(model, "max_tokens"):
model.max_tokens = max_tokens
if temperature is not None and hasattr(model, "temperature"):
model.temperature = temperature
return
if provider == "openai_compatibility":
if isinstance(thinking, dict):
extra_body = thinking.get("extra_body")
if isinstance(extra_body, dict):
_merge_model_dict_attr(model, "extra_body", extra_body)
return
def _apply_model_settings(model: Any, request: Any) -> None:
_apply_common_params(model, request)
_apply_thinking_params(model, request.provider, request.thinking)
def _collect_enabled_tool_names(request: Any) -> list[str]:
names: list[str] = []
if request.provider != "gemini":
for tool_id in request.tool_ids or []:
names.append(resolve_tool_name(str(tool_id)))
for tool_def in request.tools or []:
if hasattr(tool_def, "model_dump"):
tool_def = tool_def.model_dump()
name = tool_def.get("function", {}).get("name") if isinstance(tool_def, dict) else None
if name:
names.append(resolve_tool_name(name))
for user_tool in request.user_tools or []:
if getattr(user_tool, "name", None):
names.append(str(user_tool.name))
elif isinstance(user_tool, dict) and user_tool.get("name"):
names.append(str(user_tool["name"]))
# Disable interactive forms in expert mode (team mode)
is_expert = getattr(request, "expert_mode", False) or bool(getattr(request, "team_agent_ids", []))
if is_expert:
names = [n for n in names if n != "interactive_form"]
return names
def _has_selected_skills(request: Any) -> bool:
"""Check if any manual/external skills are selected in the request."""
raw_skill_ids = getattr(request, "skill_ids", None)
if isinstance(raw_skill_ids, list):
return any(str(item or "").strip() for item in raw_skill_ids)
if isinstance(raw_skill_ids, str):
return bool(raw_skill_ids.strip())
return False
def _has_skills(request: Any) -> bool:
"""Check if the agent will have any active skills (internal or external)."""
if not getattr(request, "enable_skills", False):
return False
# 1. Check for manual/external skills
if _has_selected_skills(request):
return True
# 2. Check for internal skills.
# Note: agent-memory and skill-creator are handled specifically, but other
# internal skills are loaded by default if enable_skills is True.
if getattr(request, "enable_long_term_memory", False):
return True
internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills')
if os.path.isdir(internal_skills_dir):
for item in os.listdir(internal_skills_dir):
if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"):
continue
if os.path.isdir(os.path.join(internal_skills_dir, item)):
return True
return False
def _build_tools(request: Any) -> list[Any]:
enabled_names = set(_collect_enabled_tool_names(request))
if (
not enabled_names
and not request.user_tools
and not getattr(request, "enable_skills", False)
):
return []
serpapi_api_key = getattr(request, "serpapi_api_key", None)
local_tool_names = {tool["name"] for tool in LOCAL_TOOLS}
include_local = sorted([name for name in enabled_names if name in local_tool_names])
# Inject skill execution tools if ANY skill (internal or external) is present
if _has_skills(request):
include_local = sorted(
set(include_local) | {"execute_skill_script", "install_skill_dependency"}
)
tools: list[Any] = []
if include_local:
tools.append(
QurioLocalTools(
tavily_api_key=request.tavily_api_key,
include_tools=include_local,
)
)
agno_tool_names = {tool["name"] for tool in AGNO_TOOLS}
include_agno = sorted([name for name in enabled_names if name in agno_tool_names])
# Always include zero-config image/video search tools by default if not explicitly disabled
# SerpApi-based tools are only included if API key is configured
if not getattr(request, "skip_default_tools", False):
# Zero-config tools (DuckDuckGo) - always include
default_image_tools = {"duckduckgo_image_search"}
default_video_tools = {"duckduckgo_video_search"}
# SerpApi-based tools - only include if API key is available
serpapi_image_tools = {"google_image_search", "serpapi_image_search", "bing_image_search"}
serpapi_video_tools = {"search_youtube"}
default_tools = default_image_tools | default_video_tools
if serpapi_api_key:
default_tools = default_tools | serpapi_image_tools | serpapi_video_tools
include_agno = sorted(list(set(include_agno) | default_tools))
if include_agno:
tools.extend(_build_agno_toolkits(request, include_agno))
user_toolkit = build_user_tools_toolkit(
[tool.model_dump() if hasattr(tool, "model_dump") else tool for tool in request.user_tools or []]
)
if user_toolkit:
tools.append(user_toolkit)
mcp_url = os.getenv("MCP_SERVER_URL")
if mcp_url:
try:
from agno.tools.mcp import MCPTools
except Exception:
MCPTools = None
if MCPTools:
tools.append(MCPTools(url=mcp_url, transport=os.getenv("MCP_TRANSPORT", "streamable-http")))
return tools
def _build_agno_toolkits(request: Any, include_agno: list[str]) -> list[Any]:
toolkits: list[Any] = []
include_set = set(include_agno)
serpapi_api_key = getattr(request, "serpapi_api_key", None)
tavily_tools = {"web_search_using_tavily", "web_search_with_tavily", "extract_url_content"}
if include_set.intersection(tavily_tools):
try:
from agno.tools.tavily import TavilyTools
except Exception:
TavilyTools = None
if TavilyTools:
selected = [name for name in include_agno if name in tavily_tools]
toolkits.append(TavilyTools(api_key=request.tavily_api_key, include_tools=selected))
websearch_tools = {"web_search", "search_news"}
if include_set.intersection(websearch_tools):
backend = getattr(request, "search_backend", None) or "auto"
if backend == "exa":
exa_toolkit = _build_exa_toolkit(
getattr(request, "exa_api_key", None),
getattr(request, "exa_search_category", None),
)
if exa_toolkit:
toolkits.append(exa_toolkit)
else:
logger.warning("Exa backend requested but ExaTools is unavailable or API key is missing.")
else:
selected = [name for name in include_agno if name in websearch_tools]
toolkits.append(
DuckDuckGoWebSearchTools(
include_tools=selected,
backend=backend,
)
)
if include_set.intersection(EXA_SEARCH_TOOL_SET) and not (
(getattr(request, "search_backend", None) or "auto") == "exa"
and include_set.intersection(websearch_tools)
):
exa_toolkit = _build_exa_toolkit(
getattr(request, "exa_api_key", None),
getattr(request, "exa_search_category", None),
)
if exa_toolkit:
toolkits.append(exa_toolkit)
else:
logger.warning("Exa tool requested but ExaTools is unavailable or API key is missing.")
arxiv_tools = {"search_arxiv_and_return_articles", "read_arxiv_papers"}
if include_set.intersection(arxiv_tools):
try:
from agno.tools.arxiv import ArxivTools
except Exception:
ArxivTools = None
if ArxivTools:
selected = [name for name in include_agno if name in arxiv_tools]
toolkits.append(ArxivTools(include_tools=selected))
wikipedia_tools = {"search_wikipedia"}
if include_set.intersection(wikipedia_tools):
try:
from agno.tools.wikipedia import WikipediaTools
except Exception:
WikipediaTools = None
if WikipediaTools:
toolkits.append(WikipediaTools(include_tools=["search_wikipedia"]))
yfinance_tools = {
"yfinance_tools",
"get_current_stock_price",
"get_company_info",
"get_stock_fundamentals",
"get_income_statements",
"get_key_financial_ratios",
"get_analyst_recommendations",
"get_company_news",
"get_technical_indicators",
"get_historical_stock_prices",
}
if include_set.intersection(yfinance_tools):
try:
from agno.tools.yfinance import YFinanceTools
except Exception:
YFinanceTools = None
if YFinanceTools:
# Keep YFinance toolkit initialization aligned with Agno's default usage.
# Some SDK versions expose different subsets/names, which can make
# include_tools fail with "tool not present in toolkit".
toolkits.append(YFinanceTools())
image_search_tools = {
"duckduckgo_image_search",
"google_image_search",
"serpapi_image_search",
"bing_image_search",
}
if include_set.intersection(image_search_tools):
# DuckDuckGo Image Search (Custom) - always available, no config needed
if "duckduckgo_image_search" in include_set:
toolkits.append(DuckDuckGoImageTools(include_tools=["duckduckgo_image_search"]))
# SerpApi Image Search (Custom) - only add if API key is configured
serpapi_tools = {
"google_image_search",
"serpapi_image_search",
"bing_image_search",
}
serpapi_include = sorted([name for name in include_set if name in serpapi_tools])
# Only add SerpApi tools if API key is available
if serpapi_include and serpapi_api_key:
toolkits.append(
SerpApiImageTools(
api_key=serpapi_api_key, include_tools=serpapi_include
)
)
video_search_tools = {
"duckduckgo_video_search",
"search_youtube",
}
if include_set.intersection(video_search_tools):
# DuckDuckGo Video Search (Custom) - always available, no config needed
if "duckduckgo_video_search" in include_set:
toolkits.append(DuckDuckGoVideoTools(include_tools=["duckduckgo_video_search"]))
# YouTube Search via SerpApi - only add if API key is configured
if "search_youtube" in include_set and serpapi_api_key:
try:
from agno.tools.serpapi import SerpApiTools as AgnoSerpApiTools
except Exception:
AgnoSerpApiTools = None
if AgnoSerpApiTools:
toolkits.append(
AgnoSerpApiTools(
api_key=serpapi_api_key,
enable_search_google=False,
enable_search_youtube=True,
)
)
return toolkits
def _normalize_exa_category(category: str | None) -> str | None:
normalized = str(category or "").strip().lower()
if not normalized or normalized == "auto":
return None
return normalized if normalized in EXA_ALLOWED_CATEGORIES else None
def _build_exa_toolkit(exa_api_key: str | None, category: str | None = None) -> Any | None:
if ExaTools is None:
return None
trimmed_key = str(exa_api_key or os.getenv("EXA_API_KEY") or "").strip()
if not trimmed_key:
return None
return ExaTools(
api_key=trimmed_key,
enable_search=True,
enable_get_contents=False,
enable_find_similar=False,
enable_answer=False,
timeout=EXA_TIMEOUT_SECONDS,
category=_normalize_exa_category(category),
)
def get_summary_model(request: Any) -> Any | None:
"""
Get the lite model for session summary generation from environment variables.
This is a simplified implementation that uses global configuration.
Future enhancement: Support per-agent lite_model from database.
Returns:
Agno model instance for summary generation, or None if unavailable
"""
settings = get_settings()
try:
# Priority: Request params > Global Settings
# Priority: Request params (summary_*) > Global Settings (summary_*)
lite_provider = getattr(request, "summary_provider", None) or settings.summary_lite_provider
lite_model = getattr(request, "summary_model", None) or settings.summary_lite_model
lite_api_key = getattr(request, "summary_api_key", None) or settings.summary_agent_api_key
lite_base_url = getattr(request, "summary_base_url", None) or settings.summary_lite_base_url
if not lite_model or not lite_api_key:
logger.warning("Lite Model not configured (checked request summary_* params and SUMMARY_LITE_MODEL env var)")
return None
source = "Request-Specific" if getattr(request, "summary_model", None) else "Global-Default"
logger.info(f"[{source}] Selected Lite Model for Session Summary: {lite_provider}/{lite_model}")
# If no base_url provided, use the default for the provider
resolved_base = lite_base_url or DEFAULT_BASE_URLS.get(lite_provider) or DEFAULT_BASE_URLS["openai"]
summary_model = _build_model(lite_provider, lite_api_key, resolved_base, lite_model)
# Disable native structured outputs for summary model to ensure robust parsing with non-OpenAI providers (like GLM)
# This only affects this specific summary_model instance.
if hasattr(summary_model, "supports_native_structured_outputs"):
summary_model.supports_native_structured_outputs = False
return summary_model
except Exception as exc:
logger.warning(f"Failed to build lite_model for session summary: {exc}")
return None
def build_agent(request: Any = None, **kwargs: Any) -> Agent:
# Backward-compatible shim for legacy build_agent(provider=..., api_key=...) calls.
if request is None or kwargs:
provider = request if isinstance(request, str) else kwargs.get("provider")
request = SimpleNamespace(
provider=provider or "openai",
api_key=kwargs.get("api_key"),
base_url=kwargs.get("base_url"),
model=kwargs.get("model"),
tavily_api_key=kwargs.get("tavily_api_key"),
temperature=kwargs.get("temperature"),
top_p=kwargs.get("top_p"),
top_k=kwargs.get("top_k"),
frequency_penalty=kwargs.get("frequency_penalty"),
presence_penalty=kwargs.get("presence_penalty"),
thinking=kwargs.get("thinking"),
tool_ids=kwargs.get("tool_ids"),
tools=kwargs.get("tools"),
user_tools=kwargs.get("user_tools"),
tool_choice=kwargs.get("tool_choice"),
)
model = _build_model(request.provider, request.api_key, request.base_url, request.model)
_apply_model_settings(model, request)
tools = _build_tools(request)
# memory_kwargs = _build_memory_kwargs(request)
tool_choice = request.tool_choice
if tool_choice is None and tools:
tool_choice = "auto"
# 1. Conditional instructions: Multi-form guidance & Image/Video rendering
enabled_names = set(_collect_enabled_tool_names(request))
# Add image and video search tools to enabled_names set since they are forced
image_search_names = {tool["name"] for tool in IMAGE_SEARCH_TOOLS}
video_search_names = {tool["name"] for tool in VIDEO_SEARCH_TOOLS}
enabled_names.update(image_search_names)
enabled_names.update(video_search_names)
instructions_list = []
if "interactive_form" in enabled_names:
instructions_list.append(
"When using the interactive_form tool to collect user information: "
"If the user's initial responses lack critical details needed to fulfill their request, "
"you MUST call interactive_form again to gather the missing specific information. "
"Do not proceed with incomplete information. "
"However, limit to 2-3 forms maximum per conversation to respect user time."
)
if getattr(request, "enable_skills", False) and _has_selected_skills(request):
instructions_list.append(
"When a skill tells you to run a bundled script, do not merely summarize or restate the script. "
"Use execute_skill_script to actually run the file and rely on its stdout/stderr."
)
instructions_list.append(
"When a skill script fails because of a missing Python dependency "
"(for example ModuleNotFoundError or ImportError), do not pretend it succeeded. "
"First explain the missing package briefly, then use interactive_form to ask for explicit approval "
"before installing anything. Do NOT ask for approval in plain text if interactive_form is available. "
"For dependency approval, keep the form minimal: include the package name in the form title or description, "
"and ask only for an approval choice when skill_id and package_name are already known from context. "
"Do NOT ask the user to retype known values like skill_id or package_name. "
"Only after the user approves may you call install_skill_dependency. "
"If the user declines, stop and report that installation was skipped."
)
if "duckduckgo_image_search" in enabled_names or "google_image_search" in enabled_names:
instructions_list.append(
"When explaining concepts that can benefit from visual aids (like Logo, diagrams, or photos), "
"you should use the image search tools to find relevant images. "
"ALWAYS render images in your response using markdown format: . "
"Place images appropriately within your explanation to enhance user understanding."
)
if "duckduckgo_video_search" in enabled_names or "search_youtube" in enabled_names:
instructions_list.append(
"When users ask about tutorials, demonstrations, or topics that benefit from video content, "
"you should use the video search tools to find relevant videos. "
"ALWAYS include video links in your response using markdown format with descriptive text. "
"Provide context about why each video is relevant to the user's query."
)
if getattr(request, "search_backend", None) == "exa" and (
"web_search" in enabled_names or bool(enabled_names.intersection(EXA_SEARCH_TOOL_SET))
):
exa_category = _normalize_exa_category(getattr(request, "exa_search_category", None))
if exa_category:
instructions_list.append(
f"Exa search is available via search_exa and is constrained to the '{exa_category}' category."
)
else:
instructions_list.append(
"Exa search is available via search_exa. Use it for current web information, and choose a category when it helps."
)
instructions = "\n\n".join(instructions_list) if instructions_list else None
# 2. Agent Construction (Stateless / Manual Context)
# We do NOT inject 'db' or 'memory' here.
# Session context (history + summary) is injected manually in stream_chat.py
skills = None
if getattr(request, "enable_skills", False):
skills_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.skills')
internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills')
requested_skills = getattr(request, "skill_ids", [])
if isinstance(requested_skills, str):
try:
requested_skills = json.loads(requested_skills)
except (json.JSONDecodeError, TypeError):
requested_skills = []
paths = []
# Inject built-in agent-memory skill if long term memory is enabled
if getattr(request, "enable_long_term_memory", False):
am_path = os.path.join(internal_skills_dir, "agent-memory")
if os.path.isdir(am_path):
paths.append(am_path)
# Inject any other internal skills by default (except for specific non-autoloading skills)
if os.path.isdir(internal_skills_dir):
for item in os.listdir(internal_skills_dir):
if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"):
continue
item_path = os.path.join(internal_skills_dir, item)
if os.path.isdir(item_path):
paths.append(item_path)
if requested_skills:
for skill_id in requested_skills:
internal_skill_path = os.path.join(internal_skills_dir, skill_id)
external_skill_path = os.path.join(skills_dir, skill_id)
if os.path.isdir(internal_skill_path):
paths.append(internal_skill_path)
elif os.path.isdir(external_skill_path):
paths.append(external_skill_path)
if paths:
skills = Skills(loaders=[LocalSkills(path) for path in paths])
# Merge personalized prompt with tool-derived instructions
personalized = getattr(request, "personalized_prompt", None)
if personalized:
if instructions:
instructions = f"{personalized}\n\n{instructions}"
else:
instructions = personalized
# Use resolved agent name/description if available (for Team member identification)
agent_id = getattr(request, "agent_id", None) or f"qurio-{request.provider}"
agent_name = getattr(request, "agent_name", None) or f"Qurio {request.provider} Agent"
agent_description = getattr(request, "agent_description", None)
return Agent(
id=agent_id,
name=agent_name,
description=agent_description,
role=agent_description,
model=model,
tools=tools or None,
markdown=True,
tool_choice=tool_choice,
instructions=instructions,
skills=skills,
)
# Mapping of provider to user_settings key for API keys
_PROVIDER_KEY_MAP: dict[str, str] = {
"gemini": "googleApiKey",
"openai": "OpenAICompatibilityKey",
"openai_compatibility": "OpenAICompatibilityKey",
"siliconflow": "SiliconFlowKey",
"glm": "GlmKey",
"deepseek": "DeepSeekKey",
"volcengine": "VolcengineKey",
"modelscope": "ModelScopeKey",
"kimi": "KimiKey",
"nvidia": "NvidiaKey",
"minimax": "MinimaxKey",
}
def _get_provider_credentials(provider: str) -> tuple[str | None, str | None]:
"""
Get API key and base URL for a provider.
Priority: user_settings table -> environment variables.
"""
api_key: str | None = None
base_url = DEFAULT_BASE_URLS.get(provider)
# 1. Try to get API key from user_settings table
db_key = _PROVIDER_KEY_MAP.get(provider)
if db_key:
try:
adapter = get_db_adapter()
if adapter:
req = DbQueryRequest(
action="select",
table="user_settings",
filters=[DbFilter(op="eq", column="key", value=db_key)],
maybe_single=True,
)
result = adapter.execute(req)
if result.data:
data = result.data
if isinstance(data, list) and len(data) > 0:
data = data[0]
if isinstance(data, dict):
api_key = data.get("value")
except Exception as e:
logger.debug(f"Failed to get API key from user_settings for {provider}: {e}")
# 2. Fallback to environment variables
if not api_key:
provider_upper = provider.upper().replace("-", "_")
api_key = os.getenv(f"{provider_upper}_API_KEY")
return api_key, base_url
def resolve_agent_config(agent_id: str, base_request: Any) -> Any:
"""Fetch agent configuration from database and merge with base request secrets."""
import copy
from types import SimpleNamespace
adapter = get_db_adapter()
if not adapter:
return base_request
try:
query_req = DbQueryRequest(
action="select",
table="agents",
filters=[DbFilter(op="eq", column="id", value=agent_id)],
maybe_single=True
)
response = adapter.execute(query_req)
if response.error:
logger.warning(f"[resolve_agent_config] DB error for agent_id={agent_id}: {response.error}")
return base_request
if not response.data:
logger.warning(f"[resolve_agent_config] No agent found with id={agent_id}")
return base_request
# Handle case where data might be a list or a single dict
agent_data = response.data
if isinstance(agent_data, list):
if len(agent_data) == 0:
return base_request
agent_data = agent_data[0]
new_req = copy.deepcopy(base_request)
# Override provider and get corresponding credentials from settings
agent_provider = agent_data.get("provider")
if agent_provider:
new_req.provider = agent_provider
# Get API key and base URL for this provider from environment
api_key, base_url = _get_provider_credentials(agent_provider)
if api_key:
new_req.api_key = api_key
if base_url:
new_req.base_url = base_url
new_req.model = agent_data.get("default_model") or base_request.model
new_req.tool_ids = agent_data.get("tool_ids") or []
new_req.skill_ids = agent_data.get("skill_ids") or []
# Handle instructions (personalized prompt)
# We'll store it in a custom attribute that build_agent can pick up
new_req.personalized_prompt = agent_data.get("prompt")
# Store agent name and description for proper identification in Teams
new_req.agent_id = agent_id # Store the original agent_id
new_req.agent_name = agent_data.get("name")
new_req.agent_emoji = agent_data.get("emoji")
new_req.agent_description = agent_data.get("description")
logger.info(f"[resolve_agent_config] Resolved agent: id={agent_id}, name={new_req.agent_name}, provider={agent_provider}")
# Override generation params if configured
if not agent_data.get("use_global_model_settings"):
if agent_data.get("temperature") is not None:
new_req.temperature = agent_data.get("temperature")
if agent_data.get("top_p") is not None:
new_req.top_p = agent_data.get("top_p")
if agent_data.get("frequency_penalty") is not None:
new_req.frequency_penalty = agent_data.get("frequency_penalty")
if agent_data.get("presence_penalty") is not None:
new_req.presence_penalty = agent_data.get("presence_penalty")
return new_req
except Exception as e:
logger.error(f"Failed to resolve agent config for {agent_id}: {e}")
return base_request
def build_memory_agent(
user_id: str | None = None,
provider: str | None = None,
model: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
) -> Agent:
settings = get_settings()
resolved_provider = provider or settings.memory_lite_provider
resolved_model = model or settings.memory_lite_model
resolved_api_key = api_key or settings.memory_agent_api_key or os.getenv("OPENAI_API_KEY")
resolved_base_url = (
base_url
or DEFAULT_BASE_URLS.get(resolved_provider)
or DEFAULT_BASE_URLS["openai"]
)
memory_request = SimpleNamespace(
provider=resolved_provider,
api_key=resolved_api_key,
base_url=resolved_base_url,
model=resolved_model,
tavily_api_key=os.getenv("TAVILY_API_KEY"),
temperature=None,
top_p=None,
top_k=None,
frequency_penalty=None,
presence_penalty=None,
thinking=None,
tool_ids=[],
tools=None,
user_tools=None,
tool_choice=None,
enable_long_term_memory=True,
database_provider=os.getenv("DATABASE_PROVIDER") or "default",
user_id=user_id,
enable_skills=False, # Helper agent: keep skills disabled
)
return build_agent(memory_request)
def get_agent_for_provider(request: Any) -> Agent:
return build_agent(request)
def build_team(request: Any, members: list[Agent]) -> Any:
"""Build an Agno Team from a list of member agents."""
from agno.team import Team
from agno.team.mode import TeamMode
# Map requested team mode string to TeamMode enum
mode_str = getattr(request, "team_mode", "route")
try:
team_mode = TeamMode(mode_str)
except Exception:
team_mode = TeamMode.route
# Build a leader agent to inherit tools, skills, and personalised prompt
leader_agent = build_agent(request)
instructions = (
"You are the Team Leader coordinating a group of expert agents. "
"Analyze the user's request. Based on the expertise of your team members, "
"delegate sub-tasks or questions to them. Finally, synthesize and summarize their findings into a comprehensive response."
)
# Prepend leader's own instructions if they exist
if leader_agent.instructions:
instructions = f"{leader_agent.instructions}\n\n{instructions}"
team = Team(
name=getattr(request, "agent_name", "Expert Team"),
members=members,
model=leader_agent.model,
tools=leader_agent.tools,
mode=team_mode,
instructions=instructions,
markdown=True,
stream_member_events=True, # Ensure member events are streamed
)
# Set agent_id manually as Team constructor might not support it directly
team.agent_id = getattr(request, "agent_id", None)
return team
|