Spaces:
Paused
Paused
File size: 9,180 Bytes
1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 ed0a52d 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 ed0a52d 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 ed0a52d 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 206d4bc 1635ec4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | """LangGraph ToolNode용 분석 도구 팩토리.
기존 IssueDetectorCapability, StatsLookupCapability,
KeywordAnalyzerCapability, DemographicsLookupCapability에 위임하여
StructuredTool 인스턴스를 동적 생성한다.
"""
from __future__ import annotations
import json
from typing import Any, Optional
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Pydantic 스키마 — LLM이 생성하는 JSON 인자
# ---------------------------------------------------------------------------
class IssueDetectorInput(BaseModel):
"""issue_detector tool input schema."""
query: str = Field(..., description="Keywords or query to detect complaint issues")
analysis_time: Optional[str] = Field(
None,
description="Analysis timestamp (YYYYMMDDHH, 10 digits). Example: '2026040814'",
)
max_result: int = Field(10, description="Maximum number of results to return", ge=1)
class StatsLookupInput(BaseModel):
"""stats_lookup tool input schema."""
query: str = Field(..., description="Keywords for statistics lookup")
date_from: Optional[str] = Field(
None, description="Start date (YYYYMMDD format). Example: '20260101'"
)
date_to: Optional[str] = Field(
None, description="End date (YYYYMMDD format). Example: '20260408'"
)
period: Optional[str] = Field(
None, description="Aggregation period (DAILY, WEEKLY, MONTHLY, YEARLY)"
)
class KeywordAnalyzerInput(BaseModel):
"""keyword_analyzer tool input schema."""
query: str = Field(..., description="Query text for keyword frequency analysis")
date_from: Optional[str] = Field(
None, description="Start date (YYYYMMDD format). Example: '20260101'"
)
date_to: Optional[str] = Field(
None, description="End date (YYYYMMDD format). Example: '20260408'"
)
result_count: int = Field(20, description="Number of keywords to return", ge=1)
class DemographicsLookupInput(BaseModel):
"""demographics_lookup tool input schema."""
query: str = Field(..., description="Query for demographic analysis of complaint filers")
date_from: Optional[str] = Field(
None, description="Start date (YYYYMMDD format). Example: '20260101'"
)
date_to: Optional[str] = Field(
None, description="End date (YYYYMMDD format). Example: '20260408'"
)
# ---------------------------------------------------------------------------
# 팩토리
# ---------------------------------------------------------------------------
def build_analysis_tools(
api_lookup_action: Optional[Any] = None,
) -> list:
"""분석 관련 StructuredTool 목록을 생성한다.
Parameters
----------
api_lookup_action : Optional[MinwonAnalysisAction]
공공데이터포털 API Action 인스턴스. None이면 빈 결과 반환.
Returns
-------
list[StructuredTool]
[issue_detector_tool, stats_lookup_tool, keyword_analyzer_tool, demographics_lookup_tool]
"""
from src.inference.graph.capabilities.demographics_lookup import DemographicsLookupCapability
from src.inference.graph.capabilities.issue_detector import IssueDetectorCapability
from src.inference.graph.capabilities.keyword_analyzer import KeywordAnalyzerCapability
from src.inference.graph.capabilities.stats_lookup import StatsLookupCapability
# -- issue_detector --
_issue_cap = IssueDetectorCapability(action=api_lookup_action)
async def _issue_detector(
query: str,
analysis_time: Optional[str] = None,
max_result: int = 10,
) -> str:
context: dict[str, Any] = {"max_result": max_result}
if analysis_time is not None:
context["analysis_time"] = analysis_time
# search_date를 analysis_time에서 자동 추출 (YYYYMMDDHH -> YYYYMMDD)
if len(analysis_time) >= 8:
context["search_date"] = analysis_time[:8]
try:
result = await _issue_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
issue_detector_tool = StructuredTool.from_function(
coroutine=_issue_detector,
name="issue_detector",
description=(
"Detect recurring issue patterns and trends in civil complaint data. "
"USE THIS TOOL when the user asks about complaint surges, repeated complaints, "
"emerging issues, or trend analysis. "
"Returns: list of detected issues with name, count, and severity score."
),
args_schema=IssueDetectorInput,
metadata={"requires_approval": False},
)
# -- stats_lookup --
_stats_cap = StatsLookupCapability(action=api_lookup_action)
async def _stats_lookup(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
period: Optional[str] = None,
) -> str:
context: dict[str, Any] = {}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
if period is not None:
context["period"] = period
try:
result = await _stats_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
stats_lookup_tool = StructuredTool.from_function(
coroutine=_stats_lookup,
name="stats_lookup",
description=(
"Query civil complaint filing statistics by period and category. "
"USE THIS TOOL when the user asks about complaint volume, filing counts, "
"category distribution, or time-series trends. "
"Returns: statistical data including period, filing count, and category breakdown."
),
args_schema=StatsLookupInput,
metadata={"requires_approval": False},
)
# -- keyword_analyzer --
_kw_cap = KeywordAnalyzerCapability(action=api_lookup_action)
async def _keyword_analyzer(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
result_count: int = 20,
) -> str:
context: dict[str, Any] = {"result_count": result_count}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
# searchword를 query에서 자동 설정 (연관어 분석용)
context["searchword"] = query
try:
result = await _kw_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
keyword_analyzer_tool = StructuredTool.from_function(
coroutine=_keyword_analyzer,
name="keyword_analyzer",
description=(
"Analyze top keywords and their frequency in civil complaint texts. "
"USE THIS TOOL when the user asks about trending topics, frequently mentioned terms, "
"or wants to understand what citizens are complaining about. "
"Returns: ranked keyword list with frequency and relevance scores."
),
args_schema=KeywordAnalyzerInput,
metadata={"requires_approval": False},
)
# -- demographics_lookup --
_demo_cap = DemographicsLookupCapability(action=api_lookup_action)
async def _demographics_lookup(
query: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> str:
context: dict[str, Any] = {"searchword": query}
if date_from is not None:
context["date_from"] = date_from
if date_to is not None:
context["date_to"] = date_to
try:
result = await _demo_cap.execute(query=query, context=context, session=None)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "success": False}, ensure_ascii=False)
demographics_lookup_tool = StructuredTool.from_function(
coroutine=_demographics_lookup,
name="demographics_lookup",
description=(
"Look up demographic distribution (age group, region, gender) of civil complaint filers. "
"USE THIS TOOL when the user asks about who is filing complaints, "
"regional patterns, or age-based analysis. "
"Returns: demographic distribution data."
),
args_schema=DemographicsLookupInput,
metadata={"requires_approval": False},
)
return [issue_detector_tool, stats_lookup_tool, keyword_analyzer_tool, demographics_lookup_tool]
|