govon-runtime / src /inference /graph /tools /analysis_tools.py
GovOn Deploy
sync: PR#584 RAG removal + ReAct architecture
1635ec4
"""LangGraph ToolNode์šฉ ๋ถ„์„ ๋„๊ตฌ ํŒฉํ† ๋ฆฌ.
๊ธฐ์กด IssueDetectorCapability, StatsLookupCapability,
KeywordAnalyzerCapability, DemographicsLookupCapability์— ์œ„์ž„ํ•˜์—ฌ
StructuredTool ์ธ์Šคํ„ด์Šค๋ฅผ ๋™์  ์ƒ์„ฑํ•œ๋‹ค.
"""
from __future__ import annotations
import json
from typing import Any, Optional
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Pydantic ์Šคํ‚ค๋งˆ โ€” LLM์ด ์ƒ์„ฑํ•˜๋Š” JSON ์ธ์ž
# ---------------------------------------------------------------------------
class IssueDetectorInput(BaseModel):
"""issue_detector ๋„๊ตฌ ์ž…๋ ฅ ์Šคํ‚ค๋งˆ."""
query: str = Field(..., description="์ด์Šˆ ํƒ์ง€ ๋Œ€์ƒ ํ‚ค์›Œ๋“œ ๋˜๋Š” ์งˆ์˜๋ฌธ")
analysis_time: Optional[str] = Field(
None,
description="๋ถ„์„ ์‹œ๊ฐ„๋Œ€ (YYYYMMDDHH ํ˜•์‹, 10์ž๋ฆฌ). ์˜ˆ: '2026040814'",
)
max_result: int = Field(10, description="๋ฐ˜ํ™˜ํ•  ์ตœ๋Œ€ ๊ฒฐ๊ณผ ์ˆ˜", ge=1)
class StatsLookupInput(BaseModel):
"""stats_lookup ๋„๊ตฌ ์ž…๋ ฅ ์Šคํ‚ค๋งˆ."""
query: str = Field(..., description="ํ†ต๊ณ„ ์กฐํšŒ ๋Œ€์ƒ ํ‚ค์›Œ๋“œ")
date_from: Optional[str] = Field(
None, description="์กฐํšŒ ์‹œ์ž‘์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260101'"
)
date_to: Optional[str] = Field(None, description="์กฐํšŒ ์ข…๋ฃŒ์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260408'")
period: Optional[str] = Field(
None, description="์ง‘๊ณ„ ๊ธฐ๊ฐ„ ๋‹จ์œ„ (DAILY, WEEKLY, MONTHLY, YEARLY)"
)
class KeywordAnalyzerInput(BaseModel):
"""keyword_analyzer ๋„๊ตฌ ์ž…๋ ฅ ์Šคํ‚ค๋งˆ."""
query: str = Field(..., description="ํ‚ค์›Œ๋“œ ๋ถ„์„ ๋Œ€์ƒ ์งˆ์˜๋ฌธ")
date_from: Optional[str] = Field(
None, description="๋ถ„์„ ์‹œ์ž‘์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260101'"
)
date_to: Optional[str] = Field(None, description="๋ถ„์„ ์ข…๋ฃŒ์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260408'")
result_count: int = Field(20, description="๋ฐ˜ํ™˜ํ•  ํ‚ค์›Œ๋“œ ์ˆ˜", ge=1)
class DemographicsLookupInput(BaseModel):
"""demographics_lookup ๋„๊ตฌ ์ž…๋ ฅ ์Šคํ‚ค๋งˆ."""
query: str = Field(..., description="์ธ๊ตฌํ†ต๊ณ„ ๋ถ„์„ ๋Œ€์ƒ ์งˆ์˜๋ฌธ")
date_from: Optional[str] = Field(
None, description="๋ถ„์„ ์‹œ์ž‘์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260101'"
)
date_to: Optional[str] = Field(None, description="๋ถ„์„ ์ข…๋ฃŒ์ผ (YYYYMMDD ํ˜•์‹). ์˜ˆ: '20260408'")
# ---------------------------------------------------------------------------
# ํŒฉํ† ๋ฆฌ
# ---------------------------------------------------------------------------
def build_analysis_tools(
api_lookup_action: Optional[Any] = None,
) -> list:
"""๋ถ„์„ ๊ด€๋ จ StructuredTool ๋ชฉ๋ก์„ ์ƒ์„ฑํ•œ๋‹ค.
Parameters
----------
api_lookup_action : Optional[MinwonAnalysisAction]
๊ณต๊ณต๋ฐ์ดํ„ฐํฌํ„ธ API Action ์ธ์Šคํ„ด์Šค. None์ด๋ฉด ๋นˆ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜.
Returns
-------
list[StructuredTool]
[issue_detector_tool, stats_lookup_tool, keyword_analyzer_tool, demographics_lookup_tool]
"""
from src.inference.graph.capabilities.demographics_lookup import DemographicsLookupCapability
from src.inference.graph.capabilities.issue_detector import IssueDetectorCapability
from src.inference.graph.capabilities.keyword_analyzer import KeywordAnalyzerCapability
from src.inference.graph.capabilities.stats_lookup import StatsLookupCapability
# -- issue_detector --
_issue_cap = IssueDetectorCapability(action=api_lookup_action)
async def _issue_detector(
query: str,
analysis_time: Optional[str] = None,
max_result: int = 10,
) -> str:
context: dict[str, Any] = {"max_result": max_result}
if analysis_time is not None:
context["analysis_time"] = analysis_time
# search_date๋ฅผ analysis_time์—์„œ ์ž๋™ ์ถ”์ถœ (YYYYMMDDHH -> YYYYMMDD)
if len(analysis_time) >= 8:
context["search_date"] = analysis_time[:8]
try:
result = await _issue_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
issue_detector_tool = StructuredTool.from_function(
coroutine=_issue_detector,
name="issue_detector",
description=(
"๋ฏผ์› ๋ฐ์ดํ„ฐ์—์„œ ๋ฐ˜๋ณต๋˜๋Š” ์ด์Šˆ ํŒจํ„ด๊ณผ ํŠธ๋ Œ๋“œ๋ฅผ ํƒ์ง€ํ•ฉ๋‹ˆ๋‹ค. "
"๋ฏผ์› ๊ธ‰์ฆ, ๋ฐ˜๋ณต ๋ถˆ๋งŒ, ์‹ ๊ทœ ์ด์Šˆ๋ฅผ ํŒŒ์•…ํ•  ๋•Œ ์‚ฌ์šฉํ•˜์„ธ์š”. "
"๋ฐ˜ํ™˜๊ฐ’: ํƒ์ง€๋œ ์ด์Šˆ ๋ชฉ๋ก (์ด์Šˆ๋ช…, ๊ฑด์ˆ˜, ์‹ฌ๊ฐ๋„)"
),
args_schema=IssueDetectorInput,
metadata={"requires_approval": False},
)
# -- stats_lookup --
_stats_cap = StatsLookupCapability(action=api_lookup_action)
async def _stats_lookup(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
period: Optional[str] = None,
) -> str:
context: dict[str, Any] = {}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
if period is not None:
context["period"] = period
try:
result = await _stats_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
stats_lookup_tool = StructuredTool.from_function(
coroutine=_stats_lookup,
name="stats_lookup",
description=(
"๋ฏผ์› ์ ‘์ˆ˜ ํ†ต๊ณ„๋ฅผ ๊ธฐ๊ฐ„๋ณ„/์œ ํ˜•๋ณ„๋กœ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค. "
"๋ฏผ์› ํ˜„ํ™ฉ ํŒŒ์•…, ์ถ”์ด ๋ถ„์„์— ์‚ฌ์šฉํ•˜์„ธ์š”. "
"๋ฐ˜ํ™˜๊ฐ’: ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ (๊ธฐ๊ฐ„, ์ ‘์ˆ˜๊ฑด์ˆ˜, ์œ ํ˜•๋ณ„ ๋ถ„ํฌ)"
),
args_schema=StatsLookupInput,
metadata={"requires_approval": False},
)
# -- keyword_analyzer --
_kw_cap = KeywordAnalyzerCapability(action=api_lookup_action)
async def _keyword_analyzer(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
result_count: int = 20,
) -> str:
context: dict[str, Any] = {"result_count": result_count}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
# searchword๋ฅผ query์—์„œ ์ž๋™ ์„ค์ • (์—ฐ๊ด€์–ด ๋ถ„์„์šฉ)
context["searchword"] = query
try:
result = await _kw_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
keyword_analyzer_tool = StructuredTool.from_function(
coroutine=_keyword_analyzer,
name="keyword_analyzer",
description=(
"๋ฏผ์› ํ…์ŠคํŠธ์—์„œ ํ•ต์‹ฌ ํ‚ค์›Œ๋“œ์™€ ๋นˆ๋„๋ฅผ ๋ถ„์„ํ•ฉ๋‹ˆ๋‹ค. "
"๋ฏผ์› ์ด์Šˆ์˜ ํ•ต์‹ฌ์–ด๋ฅผ ํŒŒ์•…ํ•  ๋•Œ ์‚ฌ์šฉํ•˜์„ธ์š”. "
"๋ฐ˜ํ™˜๊ฐ’: ํ‚ค์›Œ๋“œ ๋ชฉ๋ก (ํ‚ค์›Œ๋“œ, ๋นˆ๋„, ๊ด€๋ จ๋„)"
),
args_schema=KeywordAnalyzerInput,
metadata={"requires_approval": False},
)
# -- demographics_lookup --
_demo_cap = DemographicsLookupCapability(action=api_lookup_action)
async def _demographics_lookup(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> str:
context: dict[str, Any] = {"searchword": query}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
try:
result = await _demo_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
demographics_lookup_tool = StructuredTool.from_function(
coroutine=_demographics_lookup,
name="demographics_lookup",
description=(
"๋ฏผ์›์ธ์˜ ์ธ๊ตฌํ†ต๊ณ„ ์ •๋ณด(์—ฐ๋ น๋Œ€, ์ง€์—ญ ๋“ฑ)๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค. "
"๋ฏผ์› ๋Œ€์ƒ ๋ถ„์„์— ์‚ฌ์šฉํ•˜์„ธ์š”. "
"๋ฐ˜ํ™˜๊ฐ’: ์ธ๊ตฌํ†ต๊ณ„ ๋ถ„ํฌ ๋ฐ์ดํ„ฐ"
),
args_schema=DemographicsLookupInput,
metadata={"requires_approval": False},
)
return [issue_detector_tool, stats_lookup_tool, keyword_analyzer_tool, demographics_lookup_tool]