| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import asyncio |
| | import re |
| | import traceback |
| | from pathlib import Path |
| | from typing import Annotated, Optional |
| |
|
| | import aiofiles |
| |
|
| | try: |
| | import ujson as json |
| | except ImportError: |
| | import json |
| | import typer |
| | from rich.highlighter import ReprHighlighter |
| | from rich.markdown import Markdown |
| | from rich.table import Table |
| | from rich.text import Text |
| | from textual import on |
| | from textual.app import App, ComposeResult |
| | from textual.containers import Horizontal, Vertical, VerticalScroll |
| | from textual.widgets import Input, ProgressBar, Select, SelectionList, Static |
| |
|
| | INDEX_KEY = "__IDX" |
| | FILE_SUFFIX = ".jsonl" |
| |
|
| |
|
| | def check_textual_version(): |
| | |
| | import textual |
| | from packaging.version import Version |
| |
|
| | if Version(textual.__version__) != Version("0.52.1"): |
| | raise ImportError(f"Textual version {textual.__version__} is not supported, please pip install textual==0.52.1") |
| |
|
| |
|
| | check_textual_version() |
| |
|
| |
|
| | async def load_path(p: Path, data: dict, mask_strs: str, idx: int, pbar): |
| | samples = [] |
| | async with aiofiles.open(p, encoding="utf-8") as f: |
| | async for line in f: |
| | d = json.loads(line) |
| | for k in d: |
| | if isinstance(d[k], str): |
| | if mask_strs: |
| | d[k] = re.sub(rf"{mask_strs}", "*", d[k]) |
| | else: |
| | d[k] = json.dumps(d[k], ensure_ascii=False, indent=4) |
| |
|
| | d[INDEX_KEY] = len(samples) |
| | samples.append(d) |
| | data[idx] = {"samples": samples} |
| |
|
| | print(f"path {p} loaded") |
| | pbar.advance(1) |
| |
|
| |
|
| | async def load_dir(path: Path, data: dict[int, dict], pbar, mask_strs: str = ""): |
| | paths = list(path.glob(f"*{FILE_SUFFIX}")) |
| | paths = sorted(paths, key=lambda x: int(x.stem)) |
| |
|
| | tasks = [load_path(p, data, mask_strs, i, pbar) for i, p in enumerate(paths)] |
| |
|
| | await asyncio.gather(*tasks) |
| |
|
| |
|
| | class Highlighter(ReprHighlighter): |
| | highlights = ReprHighlighter.highlights + [ |
| | r"(?P<tag_name>[][\<\>{}()\|()【】\[\]=`])", |
| | r"\<\|(?P<tag_name>[\w\W]*?)\|\>", |
| | ] |
| |
|
| |
|
| | def center_word_with_equals_exactly(word: str, total_length: int, char: str = "=") -> str: |
| | if len(word) > total_length: |
| | return word |
| |
|
| | padding = total_length - len(word) |
| | left_pad = (padding) // 2 |
| | right_pad = (padding + 1) // 2 |
| | return char * left_pad + " " + word + " " + char * right_pad |
| |
|
| |
|
| | def highlight_keyword(content: str, keyword: Optional[str]): |
| | if not keyword: |
| | return Text(content) |
| | text = Text() |
| | parts = content.split(keyword) |
| | for i, part in enumerate(parts): |
| | text.append(part, style=None) |
| | if i < len(parts) - 1: |
| | |
| | text.append(keyword, style="on #8f51b5") |
| | return text |
| |
|
| |
|
| | help_doc = """ |
| | ⌨️ keybinds: |
| | |
| | - `f/esc`: find/cancel |
| | - `tab/←/→`: change focus |
| | - `j/k`: page down/up |
| | - `g/G`: scroll home/end |
| | - `n/N`: next sample/step |
| | - `p/P`: previous sample/step |
| | - `s`: switch display mode |
| | - plain text |
| | - rich table |
| | |
| | """ |
| |
|
| |
|
| | class JsonLineViewer(App): |
| | BINDINGS = [ |
| | ("left", "focus_previous", "Focus Previous"), |
| | ("right", "focus_next", "Focus Next"), |
| | ("s", "swith_render", "switch render"), |
| | |
| | ("n", "next_sample", "Next Sample"), |
| | ("N", "next_step", "Next Step"), |
| | ("p", "previous_sample", "Previous Sample"), |
| | ("P", "previous_step", "Previous Step"), |
| | |
| | ("f", "toggle_search", "find"), |
| | ("enter", "next_search", "find next"), |
| | ("escape", "cancel_search", "cancel find"), |
| | |
| | ("j", "page_down", "page down"), |
| | ("k", "page_up", "page up"), |
| | ("g", "page_home", "page home"), |
| | ("G", "page_end", "page end"), |
| | ] |
| |
|
| | CSS = """ |
| | |
| | Select:focus > SelectCurrent { |
| | border: tall #8f51b5; |
| | } |
| | Select.-expanded > SelectCurrent { |
| | border: tall #8f51b5; |
| | } |
| | #select-container { |
| | width: 15%; |
| | height: 100%; |
| | align: center top; |
| | } |
| | #search-container { |
| | height: 10%; |
| | align: center top; |
| | } |
| | #search-box { |
| | width: 50%; |
| | } |
| | #reqid-box { |
| | width: 50%; |
| | } |
| | """ |
| |
|
| | def __init__(self, step_num: int, data: dict[int, dict], pbar): |
| | super().__init__() |
| | self.step_num = step_num |
| |
|
| | self.data = data |
| | self.render_table = False |
| | self.selected_step_index = 0 |
| | self.selected_sample_index = 0 |
| | self.pbar = pbar |
| |
|
| | self.matches = [] |
| | self.current_match_index = 0 |
| |
|
| | self.highlighter = Highlighter() |
| |
|
| | first_samples = data[list(data.keys())[0]]["samples"] |
| | |
| | self.filter_fields = [(f, f, True) for f in first_samples[0].keys()] |
| |
|
| | |
| | |
| | |
| | self._field_set: set[str] = set(first_samples[0].keys()) |
| | self.sample_num = len(first_samples) |
| |
|
| | def compose(self) -> ComposeResult: |
| | with Horizontal(id="search-container"): |
| | yield Input(placeholder="find something...", id="search-box") |
| | yield Input(placeholder="request id...", id="reqid-box") |
| | with Vertical(id="search-container2"): |
| | yield self.pbar |
| | yield Static("", id="search-status") |
| |
|
| | with Horizontal(): |
| | with Vertical(id="select-container"): |
| | yield Static("\n") |
| | yield Static( |
| | renderable=Markdown( |
| | help_doc, |
| | ), |
| | markup=False, |
| | ) |
| | yield Static("\n") |
| | yield Select( |
| | id="step-select", |
| | value=0, |
| | prompt="select step", |
| | options=[("step: 1", 0)], |
| | allow_blank=False, |
| | ) |
| | yield Select( |
| | id="sample-select", |
| | value=0, |
| | prompt="select sample", |
| | options=[("sample: 1", 0)], |
| | allow_blank=False, |
| | ) |
| | yield Select( |
| | id="sample-sort", |
| | value=0, |
| | prompt="排序", |
| | options=[ |
| | ("sort", 0), |
| | ("score asc", 1), |
| | ("score desc", 2), |
| | ], |
| | allow_blank=False, |
| | ) |
| |
|
| | yield SelectionList[int](("Select ALL", 1, True), id="fields-select-all") |
| | with VerticalScroll(id="scroll-view2"): |
| | yield SelectionList[str](*self.filter_fields, id="fields-select") |
| | with VerticalScroll(id="scroll-view"): |
| | yield Static(id="content", markup=False) |
| |
|
| | async def on_mount(self) -> None: |
| | self.step_select = self.query_one("#step-select", Select) |
| | self.sample_select = self.query_one("#sample-select", Select) |
| | self.sample_sort = self.query_one("#sample-sort", Select) |
| | self.content_display = self.query_one("#content", Static) |
| | self.search_box = self.query_one("#search-box", Input) |
| | self.reqid_box = self.query_one("#reqid-box", Input) |
| | self.scroll_view = self.query_one("#scroll-view", VerticalScroll) |
| | self.search_status = self.query_one("#search-status", Static) |
| | self.fields_select = self.query_one("#fields-select", SelectionList) |
| | self.fields_select.border_title = "field filter" |
| |
|
| | if self.data: |
| | self.step_select.set_options([(f"step: {i + 1}", i) for i in range(self.step_num)]) |
| | self.sample_select.set_options([(f"sample: {i + 1}", i) for i in range(self.sample_num)]) |
| | self.step_select.focus() |
| | await self.update_content() |
| |
|
| | def update_result_options(self, offset: int = 0, sort_desc: Optional[bool] = None): |
| | options = [] |
| | if isinstance(self.selected_step_index, int) and self.selected_step_index < len(self.data): |
| | if self.sample_num is None or sort_desc is not None: |
| | samples = self.data[self.selected_step_index].get("samples", []) |
| | if not samples: |
| | self.selected_sample_index = offset |
| | return |
| | if sort_desc is not None: |
| | samples = sorted( |
| | samples, |
| | key=lambda x: x.get("score", x.get("score_1", 0)), |
| | reverse=sort_desc, |
| | ) |
| |
|
| | options = [(f"sample: {r[INDEX_KEY] + 1}", r[INDEX_KEY]) for r in samples] |
| | self.sample_select.set_options(options) |
| | self.sample_num = len(samples) |
| |
|
| | if sort_desc is not None and options: |
| | self.selected_sample_index = options[0][1] |
| | else: |
| | self.selected_sample_index = offset |
| |
|
| | async def update_content(self, search_keyword: Optional[str] = None): |
| | content = "" |
| | try: |
| | samples = self.data[self.selected_step_index].get("samples", []) |
| | content_dict_full = samples[self.selected_sample_index] |
| |
|
| | |
| | self._update_fields_select(content_dict_full.keys()) |
| |
|
| | |
| | content_dict = {k: v for k, v in content_dict_full.items() if k in self.fields_select.selected} |
| | if self.render_table: |
| | content = Table("key", "value", show_lines=True) |
| | for k in content_dict: |
| | v = content_dict[k] |
| | v = f"{v}" |
| | content.add_row( |
| | k, |
| | self.highlighter(highlight_keyword(v, search_keyword)), |
| | ) |
| | else: |
| | text = Text() |
| | for k in content_dict: |
| | v = content_dict[k] |
| | s = center_word_with_equals_exactly(k, 64) + f"\n{v}\n" |
| | text.append(highlight_keyword(s, search_keyword)) |
| | content = self.highlighter(text) |
| | except KeyError: |
| | content = f"Loading data asynchronously, progress: {len(self.data)}/{self.step_num} step" |
| |
|
| | except Exception: |
| | content = self.highlighter(traceback.format_exc()) |
| |
|
| | self.content_display.update(content) |
| |
|
| | |
| | |
| | |
| |
|
| | @on(Input.Submitted, "#reqid-box") |
| | async def on_reqid_submitted(self, event: Input.Submitted) -> None: |
| | """Jump to the sample that has a matching `request_id`.""" |
| |
|
| | req_id_raw = event.value.strip() |
| | |
| | req_id = req_id_raw.replace("-", "") |
| | if not req_id: |
| | return |
| |
|
| | found = False |
| | for step_idx, step_data in self.data.items(): |
| | for sample in step_data.get("samples", []): |
| | sample_id = str(sample.get("request_id", "")) |
| | if sample_id.replace("-", "") == req_id: |
| | |
| | self.selected_step_index = step_idx |
| | self.step_select.value = step_idx |
| |
|
| | |
| | self.update_result_options(offset=sample[INDEX_KEY]) |
| | self.selected_sample_index = sample[INDEX_KEY] |
| | self.sample_select.value = sample[INDEX_KEY] |
| |
|
| | await self._clear_search() |
| | await self.update_content() |
| |
|
| | found = True |
| | break |
| | if found: |
| | break |
| |
|
| | if not found: |
| | self.search_status.update(Text(f"request_id '{req_id_raw}' not found", style="bold red")) |
| | else: |
| | |
| | pass |
| |
|
| | |
| | |
| | |
| |
|
| | def _update_fields_select(self, keys): |
| | """Add any unseen *keys* to the field-selection widget so they can be toggled. |
| | |
| | The viewer is often launched with only the first step loaded. Later steps may |
| | introduce new columns (e.g. `request_id`). This helper ensures those fields |
| | become visible without requiring a restart. |
| | """ |
| | |
| | if not hasattr(self, "fields_select"): |
| | return |
| |
|
| | for k in keys: |
| | if k not in self._field_set: |
| | self._field_set.add(k) |
| | try: |
| | |
| | self.fields_select.add_option(k, k, selected=True) |
| | except Exception: |
| | |
| | self.fields_select.add_option((k, k, True)) |
| |
|
| | @on(Select.Changed, "#step-select") |
| | async def step_changed(self, event): |
| | self.selected_step_index = event.value |
| | self.update_result_options() |
| | await self.update_content() |
| |
|
| | @on(Select.Changed, "#sample-select") |
| | async def sample_changed(self, event): |
| | self.selected_sample_index = event.value |
| | await self._clear_search() |
| | await self.update_content() |
| |
|
| | @on(Select.Changed, "#sample-sort") |
| | async def sort_changed(self, event): |
| | v = event.value |
| | self.update_result_options(sort_desc=None if v == 0 else False if v == 1 else True) |
| | await self.update_content() |
| |
|
| | @on(SelectionList.SelectedChanged, "#fields-select") |
| | async def fields_changed(self, event): |
| | await self.update_content() |
| |
|
| | @on(SelectionList.SelectedChanged, "#fields-select-all") |
| | async def fields_all_changed(self, event): |
| | s = self.query_one("#fields-select-all", SelectionList) |
| | if s.selected: |
| | self.fields_select.select_all() |
| | else: |
| | self.fields_select.deselect_all() |
| |
|
| | def action_focus_previous(self): |
| | self.screen.focus_previous() |
| |
|
| | def action_focus_next(self): |
| | self.screen.focus_next() |
| |
|
| | async def action_next_step(self) -> None: |
| | self.selected_step_index += 1 |
| | if self.selected_step_index >= self.step_num: |
| | self.selected_step_index = 0 |
| | self.step_select.value = self.selected_step_index |
| | self.update_result_options() |
| | await self.update_content() |
| |
|
| | async def action_next_sample(self) -> None: |
| | self.selected_sample_index += 1 |
| | if not self.sample_num or self.selected_sample_index >= self.sample_num: |
| | self.selected_sample_index = 0 |
| | self.sample_select.value = self.selected_sample_index |
| | await self._clear_search() |
| | await self.update_content() |
| |
|
| | async def action_previous_step(self) -> None: |
| | self.selected_step_index -= 1 |
| | if self.selected_step_index < 0: |
| | self.selected_step_index = self.step_num - 1 |
| | self.step_select.value = self.selected_step_index |
| | self.update_result_options() |
| | await self.update_content() |
| |
|
| | async def action_previous_sample(self) -> None: |
| | self.selected_sample_index -= 1 |
| | if self.selected_sample_index < 0: |
| | self.selected_sample_index = self.sample_num - 1 |
| | self.sample_select.value = self.selected_sample_index |
| | await self._clear_search() |
| | await self.update_content() |
| |
|
| | async def action_swith_render(self): |
| | self.render_table = not self.render_table |
| | await self.update_content() |
| |
|
| | def action_toggle_search(self) -> None: |
| | self.search_box.focus() |
| |
|
| | async def action_cancel_search(self) -> None: |
| | self.search_box.value = "" |
| | await self._clear_search() |
| | await self.update_content() |
| |
|
| | async def _clear_search(self): |
| | self.matches = [] |
| | self.search_status.update("") |
| | self.current_match_index = 0 |
| |
|
| | @on(Input.Submitted, "#search-box") |
| | async def on_search_submitted(self, event: Input.Submitted) -> None: |
| | self.matches = [] |
| | self.current_match_index = 0 |
| | if event.value: |
| | await self.update_content(event.value) |
| | renderable = self.content_display.render() |
| | if isinstance(renderable, Table): |
| | return |
| |
|
| | assert isinstance(renderable, Text) |
| | console = self.content_display._console |
| | lines = renderable.wrap(console, self.scroll_view.container_size.width) |
| | line_idx_recorded = set() |
| | for line_idx, line in enumerate(lines): |
| | if line_idx in line_idx_recorded: |
| | continue |
| | if event.value in line: |
| | self.matches.append( |
| | { |
| | "line": line_idx, |
| | "word": event.value, |
| | } |
| | ) |
| | line_idx_recorded.add(line_idx) |
| | self.scroll_view.focus() |
| | await self.action_next_search() |
| |
|
| | async def action_next_search(self) -> None: |
| | if not self.matches or self.current_match_index >= len(self.matches): |
| | return |
| |
|
| | target_line = self.matches[self.current_match_index]["line"] |
| | self.scroll_view.scroll_to(x=0, y=target_line * 1, animate=False) |
| | self.current_match_index = (self.current_match_index + 1) % len(self.matches) |
| | self.search_status.update( |
| | Text( |
| | f"Find :{self.current_match_index + 1}/{len(self.matches)}", |
| | style="bold on #8f51b5", |
| | ) |
| | ) |
| |
|
| | def action_page_up(self): |
| | self.scroll_view.scroll_page_up(animate=False) |
| |
|
| | def action_page_down(self): |
| | self.scroll_view.scroll_page_down(animate=False) |
| |
|
| | def action_page_home(self): |
| | self.scroll_view.scroll_home(animate=False) |
| |
|
| | def action_page_end(self): |
| | self.scroll_view.scroll_end(animate=False) |
| |
|
| |
|
| | async def _run(path: Path, mask_str: str): |
| | assert path.exists(), f"{path} not exist" |
| |
|
| | paths = list(path.glob(f"*{FILE_SUFFIX}")) |
| | paths = sorted(paths, key=lambda x: int(x.stem)) |
| |
|
| | if not paths: |
| | raise ValueError(f"no available reward dump files under f{path}") |
| |
|
| | print(f"get jsonl file nums: {len(paths)}") |
| |
|
| | pbar = ProgressBar(total=len(paths), name="data load progress") |
| | data = {} |
| | await load_path(paths[0], data, mask_str, 0, pbar) |
| | app = JsonLineViewer(step_num=len(paths), data=data, pbar=pbar) |
| | await asyncio.gather(load_dir(path, data, pbar, mask_str), app.run_async()) |
| |
|
| |
|
| | app = typer.Typer() |
| |
|
| |
|
| | @app.command(help="launch TUI APP") |
| | def run( |
| | rollout_data_dir: Path, |
| | mask_str: Annotated[str, typer.Option(help="string that will be masked to *")] = r"<\|image_pad\|>|<\|imgpad\|>", |
| | ): |
| | loop = asyncio.get_event_loop() |
| | loop.run_until_complete(_run(rollout_data_dir, mask_str)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | app() |
| |
|