govon-runtime / src /inference /graph /capabilities /keyword_analyzer.py
umyunsang's picture
fix: LoRA ํ”„๋กฌํ”„ํŠธ ์ •๋ ฌ + ๋„๊ตฌ ํŒŒ๋ผ๋ฏธํ„ฐ ์Šคํ‚ค๋งˆ + API ํ‚ค ์ธ์ฝ”๋”ฉ
c794814 verified
"""keyword_analyzer capability โ€” ํ•ต์‹ฌํ‚ค์›Œ๋“œ+์—ฐ๊ด€์–ด ์กฐํ•ฉ.
Issue #488: ๋ฏผ์› ํ‚ค์›Œ๋“œ ๋ถ„์„ ๋„๊ตฌ.
2๊ฐœ API(ํ•ต์‹ฌํ‚ค์›Œ๋“œ, ์—ฐ๊ด€์–ด)๋ฅผ ์กฐํ•ฉํ•˜์—ฌ
ํ‚ค์›Œ๋“œ ๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
"""
from __future__ import annotations
import asyncio
from typing import Any, Dict, List, Optional
from loguru import logger
from .base import (
CapabilityBase,
CapabilityMetadata,
EvidenceEnvelope,
EvidenceItem,
LookupResult,
)
from .defaults import get_timeout
class KeywordAnalyzerCapability(CapabilityBase):
"""๋ฏผ์› ํ‚ค์›Œ๋“œ ๋ถ„์„ capability.
ํ•ต์‹ฌํ‚ค์›Œ๋“œ์™€ ์—ฐ๊ด€์–ด๋ฅผ ์กฐํ•ฉํ•˜์—ฌ ํ‚ค์›Œ๋“œ ๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
Parameters
----------
action : Optional[MinwonAnalysisAction]
API ํ˜ธ์ถœ์šฉ Action ์ธ์Šคํ„ด์Šค. None์ด๋ฉด ๋นˆ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜.
"""
def __init__(self, action: Optional[Any] = None) -> None:
self._action = action
@property
def metadata(self) -> CapabilityMetadata:
return CapabilityMetadata(
name="keyword_analyzer",
description=("ํ•ต์‹ฌํ‚ค์›Œ๋“œ์™€ ์—ฐ๊ด€์–ด๋ฅผ ์กฐํ•ฉํ•˜์—ฌ " "๋ฏผ์› ํ‚ค์›Œ๋“œ ๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค."),
approval_summary="๊ณต๊ณต๋ฐ์ดํ„ฐํฌํ„ธ์—์„œ ๋ฏผ์› ํ‚ค์›Œ๋“œ๋ฅผ ๋ถ„์„ํ•ฉ๋‹ˆ๋‹ค.",
provider="data.go.kr",
timeout_sec=get_timeout("keyword_analyzer"),
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "ํ‚ค์›Œ๋“œ ๋ถ„์„ ๋Œ€์ƒ ์งˆ์˜๋ฌธ",
},
"date_from": {
"type": "string",
"description": "๋ถ„์„ ์‹œ์ž‘์ผ (YYYYMMDD ํ˜•์‹, 8์ž๋ฆฌ ํ•„์ˆ˜). ์˜ˆ: '20260101'",
},
"date_to": {
"type": "string",
"description": "๋ถ„์„ ์ข…๋ฃŒ์ผ (YYYYMMDD ํ˜•์‹, 8์ž๋ฆฌ ํ•„์ˆ˜). ์˜ˆ: '20260408'",
},
"searchword": {
"type": "string",
"description": "์—ฐ๊ด€์–ด ๋ถ„์„ ์‹œ ๊ธฐ์ค€ ํ‚ค์›Œ๋“œ. ์—ฐ๊ด€์–ด ๋ถ„์„์—๋Š” ํ•„์ˆ˜",
},
"result_count": {
"type": "integer",
"description": "๋ฐ˜ํ™˜ํ•  ํ‚ค์›Œ๋“œ ์ˆ˜ (๊ธฐ๋ณธ๊ฐ’ 5)",
"default": 5,
},
},
"required": ["query", "date_from", "date_to"],
},
)
async def execute(
self,
query: str,
context: Dict[str, Any],
session: Any,
) -> LookupResult:
"""ํ•ต์‹ฌํ‚ค์›Œ๋“œ + ์—ฐ๊ด€์–ด API๋ฅผ ๋ณ‘๋ ฌ ํ˜ธ์ถœํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ์กฐํ•ฉํ•œ๋‹ค."""
provider = self.metadata.provider
if not query or not query.strip():
return LookupResult(
success=False,
query=query,
provider=provider,
error="query๊ฐ€ ๋น„์–ด์žˆ์Šต๋‹ˆ๋‹ค",
empty_reason="validation_error",
evidence=EvidenceEnvelope(status="error", errors=["query๊ฐ€ ๋น„์–ด์žˆ์Šต๋‹ˆ๋‹ค"]),
)
if self._action is None:
logger.debug("[keyword_analyzer] action์ด None - ๋นˆ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜")
return LookupResult(
success=True,
query=query,
provider=provider,
empty_reason="no_match",
evidence=EvidenceEnvelope(status="empty"),
)
date_from = context.get("date_from", "")
date_to = context.get("date_to", "")
searchword = context.get("searchword", "")
result_count = int(context.get("result_count", 5))
try:
core_kw, related = await asyncio.wait_for(
self._fetch_all(date_from, date_to, searchword, result_count),
timeout=self.metadata.timeout_sec,
)
except asyncio.TimeoutError:
msg = f"API ํ˜ธ์ถœ ํƒ€์ž„์•„์›ƒ ({self.metadata.timeout_sec}์ดˆ ์ดˆ๊ณผ)"
logger.warning(f"[keyword_analyzer] {msg}")
return LookupResult(
success=False,
query=query,
provider=provider,
error=msg,
empty_reason="provider_error",
evidence=EvidenceEnvelope(status="error", errors=[msg]),
)
except Exception as exc:
logger.error(f"[keyword_analyzer] API ํ˜ธ์ถœ ์˜ค๋ฅ˜: {exc}", exc_info=True)
return LookupResult(
success=False,
query=query,
provider=provider,
error=str(exc),
empty_reason="provider_error",
evidence=EvidenceEnvelope(status="error", errors=[str(exc)]),
)
all_results: List[Dict[str, Any]] = []
evidence_items: List[EvidenceItem] = []
errors: List[str] = []
if core_kw is not None:
for item in core_kw:
item["_source_api"] = "core_keyword"
all_results.append(item)
evidence_items.append(
EvidenceItem(
source_type="api",
title=item.get("label", ""),
excerpt=f"ํ•ต์‹ฌํ‚ค์›Œ๋“œ: {item.get('label', '')}, "
f"์ ์ˆ˜={item.get('value', 0)}",
provider_meta={"provider": provider, "api": "core_keyword"},
)
)
else:
errors.append("ํ•ต์‹ฌํ‚ค์›Œ๋“œ API ์‹คํŒจ")
if related is not None:
for item in related:
item["_source_api"] = "related_word"
all_results.append(item)
evidence_items.append(
EvidenceItem(
source_type="api",
title=item.get("label", ""),
excerpt=f"์—ฐ๊ด€์–ด: {item.get('label', '')}, " f"์ ์ˆ˜={item.get('value', 0)}",
provider_meta={"provider": provider, "api": "related_word"},
)
)
else:
if searchword:
errors.append("์—ฐ๊ด€์–ด API ์‹คํŒจ")
if not all_results:
status = "error" if errors else "empty"
return LookupResult(
success=not errors,
query=query,
provider=provider,
empty_reason="no_match" if not errors else "provider_error",
error="; ".join(errors) if errors else None,
evidence=EvidenceEnvelope(items=[], status=status, errors=errors),
)
context_text = self._build_context_text(core_kw, related)
status = "ok" if not errors else "partial"
return LookupResult(
success=True,
query=query,
results=all_results,
context_text=context_text,
provider=provider,
evidence=EvidenceEnvelope(
items=evidence_items,
summary_text=context_text,
status=status,
errors=errors,
),
)
async def _fetch_all(
self,
date_from: str,
date_to: str,
searchword: str,
result_count: int,
) -> tuple:
"""ํ•ต์‹ฌํ‚ค์›Œ๋“œ + ์—ฐ๊ด€์–ด๋ฅผ ๋ณ‘๋ ฌ ํ˜ธ์ถœํ•œ๋‹ค."""
tasks = []
# ํ•ต์‹ฌํ‚ค์›Œ๋“œ๋Š” date_from/date_to๊ฐ€ ์žˆ์œผ๋ฉด ํ•ญ์ƒ ํ˜ธ์ถœ
if date_from and date_to:
tasks.append(
self._safe_call(
self._action.get_core_keywords,
date_from=date_from,
date_to=date_to,
result_count=result_count,
)
)
else:
tasks.append(self._noop())
# ์—ฐ๊ด€์–ด๋Š” searchword๊ฐ€ ์žˆ์„ ๋•Œ๋งŒ ํ˜ธ์ถœ
if date_from and date_to and searchword:
tasks.append(
self._safe_call(
self._action.get_related_words,
date_from=date_from,
date_to=date_to,
searchword=searchword,
result_count=result_count,
)
)
else:
tasks.append(self._noop())
return tuple(await asyncio.gather(*tasks))
@staticmethod
async def _safe_call(fn, **kwargs) -> Optional[List[Dict[str, Any]]]:
"""๊ฐœ๋ณ„ API ํ˜ธ์ถœ์„ ์•ˆ์ „ํ•˜๊ฒŒ ๋ž˜ํ•‘ํ•œ๋‹ค."""
try:
return await fn(**kwargs)
except Exception as exc:
logger.warning(f"[keyword_analyzer] ๊ฐœ๋ณ„ API ์‹คํŒจ: {exc}")
return None
@staticmethod
async def _noop() -> None:
"""๋นˆ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” no-op ์ฝ”๋ฃจํ‹ด."""
return None
@staticmethod
def _build_context_text(
core_kw: Optional[List],
related: Optional[List],
) -> str:
"""์กฐํ•ฉ ๊ฒฐ๊ณผ์—์„œ ์ž์—ฐ์–ด ์š”์•ฝ์„ ์ƒ์„ฑํ•œ๋‹ค."""
parts: List[str] = []
if core_kw:
items = []
for k in core_kw[:5]:
label = k.get("label", "")
value = k.get("value", 0)
try:
value_f = float(value)
items.append(f"{label}({value_f:,.0f}๊ฑด)")
except (ValueError, TypeError):
items.append(f"{label}({value})")
if items:
parts.append(f"ํ•ต์‹ฌ ํ‚ค์›Œ๋“œ: {', '.join(items)}")
if related:
items = []
for r in related[:5]:
label = r.get("label", "")
value = r.get("value", 0)
try:
value_f = float(value)
items.append(f"{label}({value_f:,.1f}์ )")
except (ValueError, TypeError):
items.append(f"{label}({value})")
if items:
parts.append(f"์—ฐ๊ด€์–ด: {', '.join(items)}")
return ", ".join(parts) if parts else ""