govon-runtime / src /cli /renderer.py
GovOn Deploy
sync: PR#584 RAG removal + ReAct architecture
1635ec4
"""Result rendering for GovOn CLI.
Uses `rich` when available; falls back to plain print() otherwise.
"""
from __future__ import annotations
from threading import Lock
from typing import Any
from src.cli.terminal import (
get_narrow_terminal_warning,
get_panel_width,
get_terminal_columns,
is_layout_supported,
)
try:
from rich.console import Console, Group
from rich.markdown import Markdown
from rich.panel import Panel
from rich.status import Status
from rich.table import Table
from rich.text import Text
_console = Console()
_RICH_AVAILABLE = True
except ImportError: # pragma: no cover
_console = None # type: ignore[assignment]
_RICH_AVAILABLE = False
_HAS_WARNED_NARROW_TERMINAL = False
_NARROW_WARNING_LOCK = Lock()
# ---------------------------------------------------------------------------
# Node status message mapping
# ---------------------------------------------------------------------------
NODE_STATUS_MESSAGES: dict[str, str] = {
"session_load": "세션 로드 중…",
"agent": "에이전트 추론 중…",
"approval_wait": "승인 대기 중…",
"tools": "도구 실행 중…",
"persist": "저장 중…",
}
MARKDOWN_CODE_THEME = "monokai"
STRUCTURED_TOOL_ORDER = ("stats_lookup", "keyword_analyzer", "demographics_lookup")
STRUCTURED_TOOL_TITLES = {
"stats_lookup": "민원 통계",
"keyword_analyzer": "키워드 분석",
"demographics_lookup": "인구통계",
}
STRUCTURED_API_TITLES = {
"doc_count": "채널별 접수 건수",
"trend": "추이",
"statistics": "기간별 통계",
"org_ranking": "기관 순위",
"region_ranking": "지역 순위",
"core_keyword": "핵심 키워드",
"related_word": "연관어",
"gender": "성별 분포",
"age": "연령 분포",
"population": "인구 대비 비율",
}
TABLE_COLUMN_PRIORITY = (
"keyword",
"topic",
"label",
"term",
"hits",
"value",
"ratio",
"prebRatio",
"prevRatio",
"population",
"pttn",
"dfpt",
"saeol",
)
TABLE_COLUMN_LABELS = {
"keyword": "키워드",
"topic": "항목",
"label": "항목",
"term": "항목",
"hits": "건수",
"value": "값",
"ratio": "비율",
"prebRatio": "전일 대비",
"prevRatio": "전기 대비",
"population": "인구",
"pttn": "국민신문고",
"dfpt": "민원24",
"saeol": "새올",
"source_type": "출처",
"title": "제목",
"page": "페이지",
"score": "점수",
"link_or_path": "경로/링크",
}
TABLE_HIDDEN_KEYS = {"_source_api"}
EVIDENCE_SOURCE_LABELS = {
"rag": "로컬 문서",
"api": "외부 API",
"llm_generated": "LLM 생성",
}
def get_node_message(node_name: str) -> str:
"""Return a human-readable status message for a given node name."""
return NODE_STATUS_MESSAGES.get(node_name, f"{node_name} 처리 중…")
# ---------------------------------------------------------------------------
# Spinner context manager
# ---------------------------------------------------------------------------
class StreamingStatusDisplay:
"""Context manager that shows a spinner and updates the message per node.
Wraps rich.status.Status when rich is available; falls back to plain print().
"""
def __init__(self, initial_message: str = "처리 중…") -> None:
self._initial_message = initial_message
self._status: Status | None = None # type: ignore[name-defined]
self._use_rich = False
def __enter__(self) -> "StreamingStatusDisplay":
self._use_rich, _ = _resolve_render_mode()
if self._use_rich:
self._status = _console.status(self._initial_message, spinner="dots")
self._status.__enter__()
else:
print(f"→ {self._initial_message}", flush=True)
return self
def update(self, message: str) -> None:
"""Update the displayed status message."""
if self._use_rich and self._status is not None:
self._status.update(message)
else:
print(f"→ {message}", flush=True)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._use_rich and self._status is not None:
self._status.__exit__(exc_type, exc_val, exc_tb)
self._status = None
def _warn_narrow_terminal_once(columns: int) -> None:
"""Emit the narrow-terminal fallback warning once per narrow-state entry."""
global _HAS_WARNED_NARROW_TERMINAL
with _NARROW_WARNING_LOCK:
if _HAS_WARNED_NARROW_TERMINAL:
return
_HAS_WARNED_NARROW_TERMINAL = True
print(get_narrow_terminal_warning(columns), flush=True)
def _reset_narrow_warning() -> None:
"""Reset narrow-terminal warning state for tests and wide-terminal recovery."""
global _HAS_WARNED_NARROW_TERMINAL
with _NARROW_WARNING_LOCK:
_HAS_WARNED_NARROW_TERMINAL = False
def _resolve_render_mode() -> tuple[bool, int]:
"""Return (use_rich, terminal_columns) for the current render call."""
columns = get_terminal_columns()
if not is_layout_supported(columns):
_warn_narrow_terminal_once(columns)
return False, columns
_reset_narrow_warning()
return _RICH_AVAILABLE, columns
def _plain_rule(columns: int) -> str:
"""Return a separator that fits within the current terminal."""
return "─" * max(columns - 2, 12)
def _format_table_value(key: str, value: Any) -> str:
"""Format a structured value for rich/plain table rendering."""
if value in ("", None):
return "-"
if key == "source_type":
return EVIDENCE_SOURCE_LABELS.get(str(value), str(value))
if key == "page":
return f"p.{value}"
if key == "score":
try:
return f"{float(value):.2f}"
except (TypeError, ValueError):
return str(value)
if key in {"hits", "population", "pttn", "dfpt", "saeol"}:
try:
return f"{int(float(value)):,}"
except (TypeError, ValueError):
return str(value)
if key == "value":
try:
value_f = float(value)
return f"{value_f:,.1f}" if value_f % 1 else f"{value_f:,.0f}"
except (TypeError, ValueError):
return str(value)
if key in {"ratio", "prebRatio", "prevRatio"}:
text = str(value)
return text if text.endswith("%") else f"{text}%"
return str(value)
def _select_table_columns(rows: list[dict], columns: int) -> list[str]:
"""Select visible table columns based on row shape and terminal width."""
visible_keys: list[str] = []
seen: set[str] = set()
for key in TABLE_COLUMN_PRIORITY:
if any(row.get(key) not in ("", None) for row in rows):
visible_keys.append(key)
seen.add(key)
for row in rows:
for key in row:
if key in TABLE_HIDDEN_KEYS or key in seen:
continue
if row.get(key) not in ("", None):
visible_keys.append(key)
seen.add(key)
max_columns = 5 if columns >= 120 else 4 if columns >= 80 else 2
return visible_keys[:max_columns]
def _build_rich_table(rows: list[dict], columns: int, *, column_keys: list[str] | None = None):
"""Build a Rich table from structured rows."""
selected_keys = column_keys or _select_table_columns(rows, columns)
if not selected_keys:
return None
table = Table(expand=True)
for key in selected_keys:
table.add_column(
TABLE_COLUMN_LABELS.get(key, key),
overflow="fold",
no_wrap=key in {"source_type", "page", "score"},
)
for row in rows:
table.add_row(*(_format_table_value(key, row.get(key)) for key in selected_keys))
return table
def _render_plain_table(
title: str,
rows: list[dict],
columns: int,
*,
column_keys: list[str] | None = None,
) -> str:
"""Render structured rows as a tab-delimited plain-text table."""
selected_keys = column_keys or _select_table_columns(rows, columns)
if not selected_keys:
return ""
lines = [title, "\t".join(TABLE_COLUMN_LABELS.get(key, key) for key in selected_keys)]
for row in rows:
lines.append("\t".join(_format_table_value(key, row.get(key)) for key in selected_keys))
return "\n".join(lines)
def _iter_structured_result_sections(tool_results: dict[str, Any]) -> list[tuple[str, list[dict]]]:
"""Extract table-ready structured result sections from tool results."""
sections: list[tuple[str, list[dict]]] = []
for tool_name in STRUCTURED_TOOL_ORDER:
payload = tool_results.get(tool_name)
if not isinstance(payload, dict):
continue
results = payload.get("results")
if not isinstance(results, list) or not results:
continue
grouped_rows: dict[str, list[dict]] = {}
for row in results:
if not isinstance(row, dict):
continue
grouped_rows.setdefault(str(row.get("_source_api") or "results"), []).append(row)
for source_api, rows in grouped_rows.items():
source_title = STRUCTURED_API_TITLES.get(source_api)
tool_title = STRUCTURED_TOOL_TITLES.get(tool_name, tool_name)
title = f"{tool_title} · {source_title}" if source_title else tool_title
sections.append((title, rows))
return sections
def _build_evidence_table_rows(evidence_items: list[dict]) -> list[dict]:
"""Normalize evidence items into a table-oriented row schema."""
rows: list[dict] = []
for item in evidence_items:
rows.append(
{
"source_type": item.get("source_type"),
"title": item.get("title") or item.get("excerpt", ""),
"page": item.get("page"),
"score": item.get("score"),
"link_or_path": item.get("link_or_path"),
}
)
return rows
def _select_evidence_columns(columns: int) -> list[str]:
"""Return evidence table columns based on terminal width."""
if columns >= 120:
return ["source_type", "title", "page", "score", "link_or_path"]
if columns >= 80:
return ["source_type", "title", "score"]
return ["source_type", "title"]
def render_evidence_section(evidence_items: list) -> str:
"""EvidenceItem dict 리스트를 출처 섹션 텍스트로 변환한다.
source_type별로 그룹화하여 표시한다:
[로컬 문서] — rag 출처 (file_path, page, score 포함)
[외부 API] — api 출처 (URL 포함)
[LLM 생성] — llm_generated 출처
Parameters
----------
evidence_items : list
EvidenceItem.to_dict() 형태의 dict 리스트.
Returns
-------
str
출처 섹션 텍스트. items가 없으면 빈 문자열.
"""
if not evidence_items:
return ""
# source_type별 그룹화
rag_items = [i for i in evidence_items if i.get("source_type") == "rag"]
api_items = [i for i in evidence_items if i.get("source_type") == "api"]
llm_items = [i for i in evidence_items if i.get("source_type") == "llm_generated"]
lines: list[str] = ["── 참조 근거 ──"]
idx = 1
if rag_items:
lines.append("[로컬 문서]")
for item in rag_items:
title = item.get("title") or item.get("link_or_path", "")
page = item.get("page")
score = item.get("score", 0.0)
page_str = f" (p.{page})" if page is not None else ""
score_str = f" [{score:.2f}]" if score else ""
lines.append(f" {idx}. {title}{page_str}{score_str}")
idx += 1
if api_items:
lines.append("[외부 API]")
for item in api_items:
title = item.get("title", "")
link = item.get("link_or_path", "")
link_str = f" — {link}" if link else ""
lines.append(f" {idx}. {title}{link_str}")
idx += 1
if llm_items:
lines.append("[LLM 생성]")
for item in llm_items:
title = item.get("title", "")
excerpt = item.get("excerpt", "")[:80]
lines.append(f" {idx}. {title}: {excerpt}" if title else f" {idx}. {excerpt}")
idx += 1
return "\n".join(lines) if len(lines) > 1 else ""
def _build_citations_text(citations: list[str]) -> Text:
"""Return a styled fallback citations block for rich rendering."""
content = Text("\n출처\n", style="bold")
for idx, src in enumerate(citations, 1):
content.append(f" {idx}. {src}\n", style="dim")
return content
def _build_rich_result_content(
text_body: str,
evidence_items: list,
citations: list,
tool_results: dict[str, Any],
columns: int,
) -> Text | Markdown | Group:
"""Build the rich renderable used inside the result panel."""
renderables = []
if text_body:
renderables.append(Markdown(text_body, code_theme=MARKDOWN_CODE_THEME))
for title, rows in _iter_structured_result_sections(tool_results):
table = _build_rich_table(rows, columns)
if table is None:
continue
renderables.append(Text(""))
renderables.append(Text(title, style="bold cyan"))
renderables.append(table)
if evidence_items:
evidence_rows = _build_evidence_table_rows(evidence_items)
evidence_table = _build_rich_table(
evidence_rows,
columns,
column_keys=_select_evidence_columns(columns),
)
if evidence_table is not None:
renderables.append(Text(""))
renderables.append(Text("참조 근거", style="bold"))
renderables.append(evidence_table)
elif citations:
renderables.append(_build_citations_text(citations))
if not renderables:
return Text("")
if len(renderables) == 1:
return renderables[0]
return Group(*renderables)
def render_result(result: dict) -> None:
"""Render the final agent response to the terminal.
Expected keys (at least one required):
- result["text"] or result["response"]: main answer text
- result["evidence_items"]: EvidenceItem dict 리스트 (structured, 우선)
- result["citations"] or result["sources"]: list of source strings (fallback)
- result["tool_results"]: stats/keyword/demographics structured result dict
"""
text_body: str = result.get("text") or result.get("response") or ""
evidence_items: list = result.get("evidence_items") or []
citations: list = result.get("citations") or result.get("sources") or []
tool_results: dict[str, Any] = result.get("tool_results") or {}
use_rich, columns = _resolve_render_mode()
if use_rich:
content = _build_rich_result_content(
text_body,
evidence_items,
citations,
tool_results,
columns,
)
_console.print(
Panel(
content,
title="[bold green]GovOn[/bold green]",
border_style="green",
width=get_panel_width(columns),
)
)
else:
rule = _plain_rule(columns)
print(f"\n{rule}")
print("GovOn")
print(text_body)
for title, rows in _iter_structured_result_sections(tool_results):
table_text = _render_plain_table(title, rows, columns)
if table_text:
print(f"\n{table_text}")
if evidence_items:
evidence_table = _render_plain_table(
"참조 근거",
_build_evidence_table_rows(evidence_items),
columns,
column_keys=_select_evidence_columns(columns),
)
if evidence_table:
print(f"\n{evidence_table}")
elif citations:
print("\n출처")
for idx, src in enumerate(citations, 1):
print(f" {idx}. {src}")
print(f"{rule}\n")
def render_status(message: str) -> None:
"""Render a transient status / progress message."""
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]→ {message}[/dim]")
else:
print(f"→ {message}")
def render_error(message: str) -> None:
"""Render an error message in red."""
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[bold red]오류:[/bold red] {message}")
else:
print(f"오류: {message}")
def render_session_info(session_id: str) -> None:
"""Render session resume hint at shell exit."""
hint = f"[session: {session_id}] govon --session {session_id} 로 재개 가능"
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]{hint}[/dim]")
else:
print(hint)