Spaces:
Restarting on A100
Restarting on A100
| """Result rendering for GovOn CLI. | |
| Uses `rich` when available; falls back to plain print() otherwise. | |
| """ | |
| from __future__ import annotations | |
| from threading import Lock | |
| from typing import Any | |
| from src.cli.terminal import ( | |
| get_narrow_terminal_warning, | |
| get_panel_width, | |
| get_terminal_columns, | |
| is_layout_supported, | |
| ) | |
| try: | |
| from rich.console import Console, Group | |
| from rich.markdown import Markdown | |
| from rich.panel import Panel | |
| from rich.status import Status | |
| from rich.table import Table | |
| from rich.text import Text | |
| _console = Console() | |
| _RICH_AVAILABLE = True | |
| except ImportError: # pragma: no cover | |
| _console = None # type: ignore[assignment] | |
| _RICH_AVAILABLE = False | |
| _HAS_WARNED_NARROW_TERMINAL = False | |
| _NARROW_WARNING_LOCK = Lock() | |
| # --------------------------------------------------------------------------- | |
| # Node status message mapping | |
| # --------------------------------------------------------------------------- | |
| NODE_STATUS_MESSAGES: dict[str, str] = { | |
| "session_load": "세션 로드 중…", | |
| "agent": "에이전트 추론 중…", | |
| "approval_wait": "승인 대기 중…", | |
| "tools": "도구 실행 중…", | |
| "persist": "저장 중…", | |
| } | |
| MARKDOWN_CODE_THEME = "monokai" | |
| STRUCTURED_TOOL_ORDER = ("stats_lookup", "keyword_analyzer", "demographics_lookup") | |
| STRUCTURED_TOOL_TITLES = { | |
| "stats_lookup": "민원 통계", | |
| "keyword_analyzer": "키워드 분석", | |
| "demographics_lookup": "인구통계", | |
| } | |
| STRUCTURED_API_TITLES = { | |
| "doc_count": "채널별 접수 건수", | |
| "trend": "추이", | |
| "statistics": "기간별 통계", | |
| "org_ranking": "기관 순위", | |
| "region_ranking": "지역 순위", | |
| "core_keyword": "핵심 키워드", | |
| "related_word": "연관어", | |
| "gender": "성별 분포", | |
| "age": "연령 분포", | |
| "population": "인구 대비 비율", | |
| } | |
| TABLE_COLUMN_PRIORITY = ( | |
| "keyword", | |
| "topic", | |
| "label", | |
| "term", | |
| "hits", | |
| "value", | |
| "ratio", | |
| "prebRatio", | |
| "prevRatio", | |
| "population", | |
| "pttn", | |
| "dfpt", | |
| "saeol", | |
| ) | |
| TABLE_COLUMN_LABELS = { | |
| "keyword": "키워드", | |
| "topic": "항목", | |
| "label": "항목", | |
| "term": "항목", | |
| "hits": "건수", | |
| "value": "값", | |
| "ratio": "비율", | |
| "prebRatio": "전일 대비", | |
| "prevRatio": "전기 대비", | |
| "population": "인구", | |
| "pttn": "국민신문고", | |
| "dfpt": "민원24", | |
| "saeol": "새올", | |
| "source_type": "출처", | |
| "title": "제목", | |
| "page": "페이지", | |
| "score": "점수", | |
| "link_or_path": "경로/링크", | |
| } | |
| TABLE_HIDDEN_KEYS = {"_source_api"} | |
| EVIDENCE_SOURCE_LABELS = { | |
| "rag": "로컬 문서", | |
| "api": "외부 API", | |
| "llm_generated": "LLM 생성", | |
| } | |
| def get_node_message(node_name: str) -> str: | |
| """Return a human-readable status message for a given node name.""" | |
| return NODE_STATUS_MESSAGES.get(node_name, f"{node_name} 처리 중…") | |
| # --------------------------------------------------------------------------- | |
| # Spinner context manager | |
| # --------------------------------------------------------------------------- | |
| class StreamingStatusDisplay: | |
| """Context manager that shows a spinner and updates the message per node. | |
| Wraps rich.status.Status when rich is available; falls back to plain print(). | |
| """ | |
| def __init__(self, initial_message: str = "처리 중…") -> None: | |
| self._initial_message = initial_message | |
| self._status: Status | None = None # type: ignore[name-defined] | |
| self._use_rich = False | |
| def __enter__(self) -> "StreamingStatusDisplay": | |
| self._use_rich, _ = _resolve_render_mode() | |
| if self._use_rich: | |
| self._status = _console.status(self._initial_message, spinner="dots") | |
| self._status.__enter__() | |
| else: | |
| print(f"→ {self._initial_message}", flush=True) | |
| return self | |
| def update(self, message: str) -> None: | |
| """Update the displayed status message.""" | |
| if self._use_rich and self._status is not None: | |
| self._status.update(message) | |
| else: | |
| print(f"→ {message}", flush=True) | |
| def __exit__(self, exc_type, exc_val, exc_tb) -> None: | |
| if self._use_rich and self._status is not None: | |
| self._status.__exit__(exc_type, exc_val, exc_tb) | |
| self._status = None | |
| def _warn_narrow_terminal_once(columns: int) -> None: | |
| """Emit the narrow-terminal fallback warning once per narrow-state entry.""" | |
| global _HAS_WARNED_NARROW_TERMINAL | |
| with _NARROW_WARNING_LOCK: | |
| if _HAS_WARNED_NARROW_TERMINAL: | |
| return | |
| _HAS_WARNED_NARROW_TERMINAL = True | |
| print(get_narrow_terminal_warning(columns), flush=True) | |
| def _reset_narrow_warning() -> None: | |
| """Reset narrow-terminal warning state for tests and wide-terminal recovery.""" | |
| global _HAS_WARNED_NARROW_TERMINAL | |
| with _NARROW_WARNING_LOCK: | |
| _HAS_WARNED_NARROW_TERMINAL = False | |
| def _resolve_render_mode() -> tuple[bool, int]: | |
| """Return (use_rich, terminal_columns) for the current render call.""" | |
| columns = get_terminal_columns() | |
| if not is_layout_supported(columns): | |
| _warn_narrow_terminal_once(columns) | |
| return False, columns | |
| _reset_narrow_warning() | |
| return _RICH_AVAILABLE, columns | |
| def _plain_rule(columns: int) -> str: | |
| """Return a separator that fits within the current terminal.""" | |
| return "─" * max(columns - 2, 12) | |
| def _format_table_value(key: str, value: Any) -> str: | |
| """Format a structured value for rich/plain table rendering.""" | |
| if value in ("", None): | |
| return "-" | |
| if key == "source_type": | |
| return EVIDENCE_SOURCE_LABELS.get(str(value), str(value)) | |
| if key == "page": | |
| return f"p.{value}" | |
| if key == "score": | |
| try: | |
| return f"{float(value):.2f}" | |
| except (TypeError, ValueError): | |
| return str(value) | |
| if key in {"hits", "population", "pttn", "dfpt", "saeol"}: | |
| try: | |
| return f"{int(float(value)):,}" | |
| except (TypeError, ValueError): | |
| return str(value) | |
| if key == "value": | |
| try: | |
| value_f = float(value) | |
| return f"{value_f:,.1f}" if value_f % 1 else f"{value_f:,.0f}" | |
| except (TypeError, ValueError): | |
| return str(value) | |
| if key in {"ratio", "prebRatio", "prevRatio"}: | |
| text = str(value) | |
| return text if text.endswith("%") else f"{text}%" | |
| return str(value) | |
| def _select_table_columns(rows: list[dict], columns: int) -> list[str]: | |
| """Select visible table columns based on row shape and terminal width.""" | |
| visible_keys: list[str] = [] | |
| seen: set[str] = set() | |
| for key in TABLE_COLUMN_PRIORITY: | |
| if any(row.get(key) not in ("", None) for row in rows): | |
| visible_keys.append(key) | |
| seen.add(key) | |
| for row in rows: | |
| for key in row: | |
| if key in TABLE_HIDDEN_KEYS or key in seen: | |
| continue | |
| if row.get(key) not in ("", None): | |
| visible_keys.append(key) | |
| seen.add(key) | |
| max_columns = 5 if columns >= 120 else 4 if columns >= 80 else 2 | |
| return visible_keys[:max_columns] | |
| def _build_rich_table(rows: list[dict], columns: int, *, column_keys: list[str] | None = None): | |
| """Build a Rich table from structured rows.""" | |
| selected_keys = column_keys or _select_table_columns(rows, columns) | |
| if not selected_keys: | |
| return None | |
| table = Table(expand=True) | |
| for key in selected_keys: | |
| table.add_column( | |
| TABLE_COLUMN_LABELS.get(key, key), | |
| overflow="fold", | |
| no_wrap=key in {"source_type", "page", "score"}, | |
| ) | |
| for row in rows: | |
| table.add_row(*(_format_table_value(key, row.get(key)) for key in selected_keys)) | |
| return table | |
| def _render_plain_table( | |
| title: str, | |
| rows: list[dict], | |
| columns: int, | |
| *, | |
| column_keys: list[str] | None = None, | |
| ) -> str: | |
| """Render structured rows as a tab-delimited plain-text table.""" | |
| selected_keys = column_keys or _select_table_columns(rows, columns) | |
| if not selected_keys: | |
| return "" | |
| lines = [title, "\t".join(TABLE_COLUMN_LABELS.get(key, key) for key in selected_keys)] | |
| for row in rows: | |
| lines.append("\t".join(_format_table_value(key, row.get(key)) for key in selected_keys)) | |
| return "\n".join(lines) | |
| def _iter_structured_result_sections(tool_results: dict[str, Any]) -> list[tuple[str, list[dict]]]: | |
| """Extract table-ready structured result sections from tool results.""" | |
| sections: list[tuple[str, list[dict]]] = [] | |
| for tool_name in STRUCTURED_TOOL_ORDER: | |
| payload = tool_results.get(tool_name) | |
| if not isinstance(payload, dict): | |
| continue | |
| results = payload.get("results") | |
| if not isinstance(results, list) or not results: | |
| continue | |
| grouped_rows: dict[str, list[dict]] = {} | |
| for row in results: | |
| if not isinstance(row, dict): | |
| continue | |
| grouped_rows.setdefault(str(row.get("_source_api") or "results"), []).append(row) | |
| for source_api, rows in grouped_rows.items(): | |
| source_title = STRUCTURED_API_TITLES.get(source_api) | |
| tool_title = STRUCTURED_TOOL_TITLES.get(tool_name, tool_name) | |
| title = f"{tool_title} · {source_title}" if source_title else tool_title | |
| sections.append((title, rows)) | |
| return sections | |
| def _build_evidence_table_rows(evidence_items: list[dict]) -> list[dict]: | |
| """Normalize evidence items into a table-oriented row schema.""" | |
| rows: list[dict] = [] | |
| for item in evidence_items: | |
| rows.append( | |
| { | |
| "source_type": item.get("source_type"), | |
| "title": item.get("title") or item.get("excerpt", ""), | |
| "page": item.get("page"), | |
| "score": item.get("score"), | |
| "link_or_path": item.get("link_or_path"), | |
| } | |
| ) | |
| return rows | |
| def _select_evidence_columns(columns: int) -> list[str]: | |
| """Return evidence table columns based on terminal width.""" | |
| if columns >= 120: | |
| return ["source_type", "title", "page", "score", "link_or_path"] | |
| if columns >= 80: | |
| return ["source_type", "title", "score"] | |
| return ["source_type", "title"] | |
| def render_evidence_section(evidence_items: list) -> str: | |
| """EvidenceItem dict 리스트를 출처 섹션 텍스트로 변환한다. | |
| source_type별로 그룹화하여 표시한다: | |
| [로컬 문서] — rag 출처 (file_path, page, score 포함) | |
| [외부 API] — api 출처 (URL 포함) | |
| [LLM 생성] — llm_generated 출처 | |
| Parameters | |
| ---------- | |
| evidence_items : list | |
| EvidenceItem.to_dict() 형태의 dict 리스트. | |
| Returns | |
| ------- | |
| str | |
| 출처 섹션 텍스트. items가 없으면 빈 문자열. | |
| """ | |
| if not evidence_items: | |
| return "" | |
| # source_type별 그룹화 | |
| rag_items = [i for i in evidence_items if i.get("source_type") == "rag"] | |
| api_items = [i for i in evidence_items if i.get("source_type") == "api"] | |
| llm_items = [i for i in evidence_items if i.get("source_type") == "llm_generated"] | |
| lines: list[str] = ["── 참조 근거 ──"] | |
| idx = 1 | |
| if rag_items: | |
| lines.append("[로컬 문서]") | |
| for item in rag_items: | |
| title = item.get("title") or item.get("link_or_path", "") | |
| page = item.get("page") | |
| score = item.get("score", 0.0) | |
| page_str = f" (p.{page})" if page is not None else "" | |
| score_str = f" [{score:.2f}]" if score else "" | |
| lines.append(f" {idx}. {title}{page_str}{score_str}") | |
| idx += 1 | |
| if api_items: | |
| lines.append("[외부 API]") | |
| for item in api_items: | |
| title = item.get("title", "") | |
| link = item.get("link_or_path", "") | |
| link_str = f" — {link}" if link else "" | |
| lines.append(f" {idx}. {title}{link_str}") | |
| idx += 1 | |
| if llm_items: | |
| lines.append("[LLM 생성]") | |
| for item in llm_items: | |
| title = item.get("title", "") | |
| excerpt = item.get("excerpt", "")[:80] | |
| lines.append(f" {idx}. {title}: {excerpt}" if title else f" {idx}. {excerpt}") | |
| idx += 1 | |
| return "\n".join(lines) if len(lines) > 1 else "" | |
| def _build_citations_text(citations: list[str]) -> Text: | |
| """Return a styled fallback citations block for rich rendering.""" | |
| content = Text("\n출처\n", style="bold") | |
| for idx, src in enumerate(citations, 1): | |
| content.append(f" {idx}. {src}\n", style="dim") | |
| return content | |
| def _build_rich_result_content( | |
| text_body: str, | |
| evidence_items: list, | |
| citations: list, | |
| tool_results: dict[str, Any], | |
| columns: int, | |
| ) -> Text | Markdown | Group: | |
| """Build the rich renderable used inside the result panel.""" | |
| renderables = [] | |
| if text_body: | |
| renderables.append(Markdown(text_body, code_theme=MARKDOWN_CODE_THEME)) | |
| for title, rows in _iter_structured_result_sections(tool_results): | |
| table = _build_rich_table(rows, columns) | |
| if table is None: | |
| continue | |
| renderables.append(Text("")) | |
| renderables.append(Text(title, style="bold cyan")) | |
| renderables.append(table) | |
| if evidence_items: | |
| evidence_rows = _build_evidence_table_rows(evidence_items) | |
| evidence_table = _build_rich_table( | |
| evidence_rows, | |
| columns, | |
| column_keys=_select_evidence_columns(columns), | |
| ) | |
| if evidence_table is not None: | |
| renderables.append(Text("")) | |
| renderables.append(Text("참조 근거", style="bold")) | |
| renderables.append(evidence_table) | |
| elif citations: | |
| renderables.append(_build_citations_text(citations)) | |
| if not renderables: | |
| return Text("") | |
| if len(renderables) == 1: | |
| return renderables[0] | |
| return Group(*renderables) | |
| def render_result(result: dict) -> None: | |
| """Render the final agent response to the terminal. | |
| Expected keys (at least one required): | |
| - result["text"] or result["response"]: main answer text | |
| - result["evidence_items"]: EvidenceItem dict 리스트 (structured, 우선) | |
| - result["citations"] or result["sources"]: list of source strings (fallback) | |
| - result["tool_results"]: stats/keyword/demographics structured result dict | |
| """ | |
| text_body: str = result.get("text") or result.get("response") or "" | |
| evidence_items: list = result.get("evidence_items") or [] | |
| citations: list = result.get("citations") or result.get("sources") or [] | |
| tool_results: dict[str, Any] = result.get("tool_results") or {} | |
| use_rich, columns = _resolve_render_mode() | |
| if use_rich: | |
| content = _build_rich_result_content( | |
| text_body, | |
| evidence_items, | |
| citations, | |
| tool_results, | |
| columns, | |
| ) | |
| _console.print( | |
| Panel( | |
| content, | |
| title="[bold green]GovOn[/bold green]", | |
| border_style="green", | |
| width=get_panel_width(columns), | |
| ) | |
| ) | |
| else: | |
| rule = _plain_rule(columns) | |
| print(f"\n{rule}") | |
| print("GovOn") | |
| print(text_body) | |
| for title, rows in _iter_structured_result_sections(tool_results): | |
| table_text = _render_plain_table(title, rows, columns) | |
| if table_text: | |
| print(f"\n{table_text}") | |
| if evidence_items: | |
| evidence_table = _render_plain_table( | |
| "참조 근거", | |
| _build_evidence_table_rows(evidence_items), | |
| columns, | |
| column_keys=_select_evidence_columns(columns), | |
| ) | |
| if evidence_table: | |
| print(f"\n{evidence_table}") | |
| elif citations: | |
| print("\n출처") | |
| for idx, src in enumerate(citations, 1): | |
| print(f" {idx}. {src}") | |
| print(f"{rule}\n") | |
| def render_status(message: str) -> None: | |
| """Render a transient status / progress message.""" | |
| use_rich, _ = _resolve_render_mode() | |
| if use_rich: | |
| _console.print(f"[dim]→ {message}[/dim]") | |
| else: | |
| print(f"→ {message}") | |
| def render_error(message: str) -> None: | |
| """Render an error message in red.""" | |
| use_rich, _ = _resolve_render_mode() | |
| if use_rich: | |
| _console.print(f"[bold red]오류:[/bold red] {message}") | |
| else: | |
| print(f"오류: {message}") | |
| def render_session_info(session_id: str) -> None: | |
| """Render session resume hint at shell exit.""" | |
| hint = f"[session: {session_id}] govon --session {session_id} 로 재개 가능" | |
| use_rich, _ = _resolve_render_mode() | |
| if use_rich: | |
| _console.print(f"[dim]{hint}[/dim]") | |
| else: | |
| print(hint) | |