insurance-chatbot / app /tools /__init__.py
κΉ€λ―Όκ²½
fix: μ„œλΉ„μŠ€ 쀑 μž¬μΈλ±μ‹± μ‹œ 검색 곡백·이벀트 루프 λΈ”λ‘œν‚ΉΒ·λ‹€μ€‘ 콜백 문제 ν•΄κ²°
2bc2974
"""LangChain 도ꡬ λͺ¨μŒ β€” 동적 ToolRegistry둜 λŸ°νƒ€μž„ 도ꡬ ν•«λ¦¬λ‘œλ“œ 지원.
μƒˆ 도ꡬ μΆ”κ°€ μ‹œ:
1. ν•΄λ‹Ή λͺ¨λ“ˆμ— 도ꡬ ν•¨μˆ˜ μž‘μ„±
2. λͺ¨λ“ˆ ν•˜λ‹¨ TOOLS λ¦¬μŠ€νŠΈμ— μΆ”κ°€
(μƒˆ λͺ¨λ“ˆμ΄λ©΄ _TOOL_MODULES에 λͺ¨λ“ˆ μΆ”κ°€)
λŸ°νƒ€μž„ 등둝:
registry = get_tool_registry()
registry.register(my_new_tool) # 단일 도ꡬ 등둝 + ChromaDB μžλ™ 인덱싱
registry.unregister("my_new_tool") # μ΄λ¦„μœΌλ‘œ ν•΄μ œ
"""
from __future__ import annotations
import logging
import threading
from typing import Callable
from langchain_core.tools import BaseTool
from app.tools import (
product, premium, coverage, underwriting,
compliance, claims, customer_db, rag_tools,
)
logger = logging.getLogger("insurance.tools.registry")
_TOOL_MODULES = [
product, premium, coverage, underwriting,
compliance, claims, customer_db, rag_tools,
]
def _inject_when_not_to_use(tool: BaseTool) -> BaseTool:
"""ToolCard의 when_not_to_useλ₯Ό LLM이 λ³΄λŠ” tool description에 μ£Όμž…ν•œλ‹€.
LLM이 bind_tools()둜 도ꡬ λͺ©λ‘μ„ 받을 λ•Œ description 전체가 μ „λ‹¬λœλ‹€.
ν˜Όλ™ν•˜κΈ° μ‰¬μš΄ μœ μ‚¬ 도ꡬ 쌍(예: premium_estimate vs plan_options)을
λͺ…μ‹œμ μœΌλ‘œ μ•Œλ €μ€ŒμœΌλ‘œμ¨ 잘λͺ»λœ 도ꡬ 선택을 쀄인닀.
when_not_to_useλŠ” ChromaDB μž„λ² λ”© ν…μŠ€νŠΈ(to_embed_text)μ—λŠ” ν¬ν•¨λ˜μ§€ μ•ŠλŠ”λ‹€.
(타 도ꡬ μ–΄νœ˜κ°€ 포함돼 μž„λ² λ”© 벑터λ₯Ό μ˜€μ—Όμ‹œν‚€κΈ° λ•Œλ¬Έ)
"""
from app.tool_search.tool_cards import get_card
card = get_card(tool.name)
if not card or not card.when_not_to_use:
return tool
neg = "\n[μ‚¬μš© κΈˆμ§€ 상황]\n" + "\n".join(f"Β· {w}" for w in card.when_not_to_use)
return tool.model_copy(update={"description": tool.description.rstrip() + neg})
class ToolRegistry:
"""μŠ€λ ˆλ“œ μ•ˆμ „ 동적 도ꡬ λ ˆμ§€μŠ€νŠΈλ¦¬.
μ„œλ²„ μ‹œμž‘ μ‹œ κΈ°μ‘΄ λͺ¨λ“ˆμ—μ„œ 도ꡬλ₯Ό 일괄 λ‘œλ“œν•˜κ³ ,
λŸ°νƒ€μž„μ— register()/unregister()둜 μ„œλ²„ μž¬μ‹œμž‘ 없이 도ꡬλ₯Ό μΆ”κ°€Β·μ œκ±°ν•œλ‹€.
λ³€κ²½ μ‹œ λ“±λ‘λœ 콜백(on_change)을 ν˜ΈμΆœν•˜μ—¬ ChromaDB μž¬μΈλ±μ‹± 등을 νŠΈλ¦¬κ±°ν•œλ‹€.
"""
def __init__(self) -> None:
self._tools: dict[str, BaseTool] = {}
self._lock = threading.Lock()
self._version = 0
self._on_change_callbacks: list[Callable[["ToolRegistry"], None]] = []
# ── 쑰회 ──────────────────────────────────────────────────
def get_all(self) -> tuple[BaseTool, ...]:
with self._lock:
return tuple(self._tools.values())
def get_by_name(self, name: str) -> BaseTool | None:
with self._lock:
return self._tools.get(name)
@property
def version(self) -> int:
return self._version
def __len__(self) -> int:
return len(self._tools)
# ── 등둝 / ν•΄μ œ ──────────────────────────────────────────
def register(self, tool: BaseTool) -> None:
"""단일 도ꡬ 등둝. when_not_to_use μžλ™ μ£Όμž… + λ³€κ²½ 콜백 호좜."""
enriched = _inject_when_not_to_use(tool)
with self._lock:
self._tools[enriched.name] = enriched
self._version += 1
logger.info("Registered tool: %s (v=%d)", enriched.name, self._version)
self._fire_on_change()
def register_many(self, tools: list[BaseTool] | tuple[BaseTool, ...]) -> None:
"""λ‹€μˆ˜ 도ꡬ 일괄 등둝. μ½œλ°±μ€ λ§ˆμ§€λ§‰μ— ν•œ 번만 호좜."""
with self._lock:
for t in tools:
self._tools[t.name] = _inject_when_not_to_use(t)
self._version += 1
logger.info("Registered %d tools (v=%d)", len(tools), self._version)
self._fire_on_change()
def unregister(self, name: str) -> bool:
"""μ΄λ¦„μœΌλ‘œ 도ꡬ ν•΄μ œ. 제거 성곡 μ‹œ True."""
with self._lock:
removed = self._tools.pop(name, None)
if removed:
self._version += 1
if removed:
logger.info("Unregistered tool: %s (v=%d)", name, self._version)
self._fire_on_change()
return removed is not None
# ── λ³€κ²½ 콜백 ────────────────────────────────────────────
def on_change(self, callback: Callable[["ToolRegistry"], None]) -> None:
"""도ꡬ λͺ©λ‘ λ³€κ²½ μ‹œ 호좜될 μ½œλ°±μ„ λ“±λ‘ν•œλ‹€."""
self._on_change_callbacks.append(callback)
def _fire_on_change(self) -> None:
"""μ½œλ°±μ„ λ°±κ·ΈλΌμš΄λ“œ 데λͺ¬ μŠ€λ ˆλ“œμ—μ„œ μ‹€ν–‰.
FastAPI async 이벀트 루프λ₯Ό λΈ”λ‘œν‚Ήν•˜μ§€ μ•ŠμœΌλ©΄μ„œ,
embedder._index_lock이 λ™μ‹œ 싀행을 μ§λ ¬ν™”ν•œλ‹€.
"""
for cb in self._on_change_callbacks:
threading.Thread(
target=self._safe_callback,
args=(cb,),
daemon=True,
name="tool-registry-reindex",
).start()
def _safe_callback(self, cb: Callable[["ToolRegistry"], None]) -> None:
try:
cb(self)
except Exception:
logger.exception("on_change callback failed")
# ── 초기 λ‘œλ“œ ────────────────────────────────────────────
def load_from_modules(self) -> None:
"""_TOOL_MODULESμ—μ„œ 도ꡬλ₯Ό 일괄 μˆ˜μ§‘ν•˜μ—¬ 등둝. μ„œλ²„ μ‹œμž‘ μ‹œ 1회 호좜."""
tools: list[BaseTool] = []
for mod in _TOOL_MODULES:
tools.extend(getattr(mod, "TOOLS", []))
self.register_many(tools)
# ── 싱글톀 ────────────────────────────────────────────────────
_registry: ToolRegistry | None = None
_registry_lock = threading.Lock()
def get_tool_registry() -> ToolRegistry:
"""ToolRegistry 싱글톀을 λ°˜ν™˜ν•œλ‹€."""
global _registry
if _registry is None:
with _registry_lock:
if _registry is None:
_registry = ToolRegistry()
return _registry
def get_all_tools() -> tuple[BaseTool, ...]:
"""ν•˜μœ„ ν˜Έν™˜μš© β€” κΈ°μ‘΄ μ½”λ“œκ°€ get_all_tools()λ₯Ό ν˜ΈμΆœν•˜λŠ” κ³³μ—μ„œ λ™μž‘ μœ μ§€."""
return get_tool_registry().get_all()