#!/usr/bin/env python3 """ LLM Completion Viewer - Streamlit visualization for LLM completion JSON files. Usage: streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions Default port: 8502 """ import argparse import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import streamlit as st ROLE_STYLE = { "system": {"label": "SYSTEM", "color": "#4B5563", "bg": "#F3F4F6"}, "user": {"label": "USER", "color": "#1D4ED8", "bg": "#DBEAFE"}, "assistant": {"label": "ASSISTANT", "color": "#065F46", "bg": "#D1FAE5"}, "tool": {"label": "TOOL", "color": "#7C2D12", "bg": "#FFEDD5"}, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Streamlit viewer for LLM completion files.") parser.add_argument("--dir", type=str, default="", help="Directory containing LLM completion JSON files.") return parser.parse_args() def extract_timestamp_from_filename(filename: str) -> float: """Extract timestamp from filename like 'vertex_ai__gemini-2.5-flash-1771538250.607-8981.json'""" match = re.search(r'-(\d+\.\d+)-[a-f0-9]+\.json$', filename) if match: return float(match.group(1)) return 0.0 def file_sort_key(path: Path) -> Tuple[float, str]: """Sort files by timestamp (descending - latest first), then by name""" timestamp = extract_timestamp_from_filename(path.name) # Negate timestamp for descending order (latest first) return (-timestamp, path.name) def try_load_json(path: Path) -> Optional[Any]: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return None def extract_text_from_message(message: Dict[str, Any]) -> str: """Extract text content from message, handling both string and list[dict] formats""" text_parts: List[str] = [] content = message.get("content") if isinstance(content, str): return content elif isinstance(content, list): for item in content: if isinstance(item, dict) and item.get("type") == "text": text = item.get("text") if isinstance(text, str) and text: text_parts.append(text) return "\n".join(text_parts).strip() def extract_thinking_blocks(message: Dict[str, Any]) -> List[str]: """Extract thinking block texts from message's thinking_blocks field""" thinking_parts: List[str] = [] thinking_blocks = message.get("thinking_blocks") if isinstance(thinking_blocks, list): for block in thinking_blocks: if isinstance(block, dict): # Gemini/Vertex AI format: {"type": "thinking", "thinking": "...", "signature": "..."} thinking = block.get("thinking") or block.get("text") or "" if thinking: thinking_parts.append(thinking) return thinking_parts def format_timestamp(timestamp: float) -> str: """Format Unix timestamp to human-readable string""" from datetime import datetime dt = datetime.fromtimestamp(timestamp) return dt.strftime("%Y-%m-%d %H:%M:%S") def messages_summary(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Create a summary table of messages""" rows: List[Dict[str, Any]] = [] for idx, msg in enumerate(messages): role = msg.get("role", "unknown") text = extract_text_from_message(msg) preview = text[:120] + ("..." if len(text) > 120 else "") tool_calls = msg.get("tool_calls") tool_call_count = len(tool_calls) if isinstance(tool_calls, list) else 0 thinking_blocks = extract_thinking_blocks(msg) thinking_chars = sum(len(t) for t in thinking_blocks) rows.append( { "idx": idx, "role": role, "thinking_blocks": len(thinking_blocks), "thinking_chars": thinking_chars, "tool_calls": tool_call_count, "chars": len(text), "preview": preview, } ) return rows def render_completion_data(data: Dict[str, Any]): """Render the LLM completion data""" # Display metadata st.subheader("Completion Metadata") metadata_cols = st.columns(3) with metadata_cols[0]: context_window = data.get("context_window", "N/A") st.metric("Context Window", f"{context_window:,}" if isinstance(context_window, int) else context_window) with metadata_cols[1]: messages = data.get("messages", []) st.metric("Total Messages", len(messages)) with metadata_cols[2]: total_chars = sum(len(extract_text_from_message(msg)) for msg in messages) st.metric("Total Characters", f"{total_chars:,}") # Display additional metadata fields if present other_metadata = {k: v for k, v in data.items() if k not in ["messages", "context_window"]} if other_metadata: with st.expander("📋 Additional Metadata", expanded=False): st.json(other_metadata) # Display messages messages = data.get("messages", []) if not messages: st.warning("No messages found in this completion.") return st.subheader("Messages Overview") rows = messages_summary(messages) st.dataframe(rows, use_container_width=True) st.subheader("Full Message Timeline") show_raw = st.checkbox("Show raw dict under each message", value=False) for idx, msg in enumerate(messages): role = str(msg.get("role", "unknown")) style = ROLE_STYLE.get(role, {"label": role.upper(), "color": "#111827", "bg": "#F9FAFB"}) text = extract_text_from_message(msg) tool_calls = msg.get("tool_calls") tool_call_list = tool_calls if isinstance(tool_calls, list) else [] tool_call_count = len(tool_call_list) thinking_blocks = extract_thinking_blocks(msg) title = f"{style['label']} #{idx}" if thinking_blocks: title += f" | 🧠 thinking×{len(thinking_blocks)}" if tool_call_count > 0: title += f" | 🔧 tool_calls×{tool_call_count}" st.markdown( ( f"
" f"" f"{title}
" ), unsafe_allow_html=True, ) show_msg = st.toggle(f"Show message #{idx}", value=(idx < 3), key=f"show_msg_{idx}") if show_msg: # Render thinking blocks if thinking_blocks: for tb_idx, thinking_text in enumerate(thinking_blocks): with st.expander(f"🧠 Thinking block {tb_idx + 1} ({len(thinking_text):,} chars)", expanded=False): st.markdown( ( "
" f"{thinking_text}
" ), unsafe_allow_html=True, ) # Render text content if text: st.markdown( ( f"
" f"{text}
" ), unsafe_allow_html=True, ) elif not thinking_blocks and not tool_call_list: st.caption("") # Render tool calls if tool_call_list: for tc_idx, tc in enumerate(tool_call_list): tc_name = tc.get("function", {}).get("name", tc.get("name", f"tool_{tc_idx}")) if isinstance(tc, dict) else str(tc) with st.expander(f"🔧 Tool call {tc_idx + 1}: `{tc_name}`", expanded=False): st.json(tc) if show_raw: st.json(msg) def main(): args = parse_args() st.set_page_config(page_title="LLM Completion Viewer", layout="wide") st.title("🤖 LLM Completion Viewer") default_dir = args.dir or "" run_dir_input = st.sidebar.text_input("Completions directory", value=default_dir) run_dir = Path(run_dir_input).expanduser() if run_dir_input else None if not run_dir_input: st.info("Pass `--dir` or set the directory in the sidebar.") st.markdown(""" **Usage:** ```bash streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions ``` """) return if not run_dir or not run_dir.exists() or not run_dir.is_dir(): st.error(f"Directory not found: {run_dir_input}") return # Find all JSON files and sort by timestamp (latest first) json_files = [p for p in run_dir.iterdir() if p.is_file() and p.suffix == '.json'] if not json_files: st.warning("No JSON files found in this directory.") return # Sort files by timestamp (latest first) sorted_files = sorted(json_files, key=file_sort_key) st.sidebar.markdown(f"**Found {len(sorted_files)} completion files**") # Create file selection with timestamp info file_options = [] for f in sorted_files: timestamp = extract_timestamp_from_filename(f.name) if timestamp > 0: time_str = format_timestamp(timestamp) file_options.append(f"{f.name} ({time_str})") else: file_options.append(f.name) selected_idx = st.sidebar.selectbox( "Select completion file", options=range(len(file_options)), format_func=lambda i: file_options[i], index=0 ) selected_path = sorted_files[selected_idx] # Display file info st.caption(f"**Selected:** `{selected_path.name}`") file_size = selected_path.stat().st_size st.caption(f"**Size:** {file_size:,} bytes ({file_size / 1024:.1f} KB)") timestamp = extract_timestamp_from_filename(selected_path.name) if timestamp > 0: st.caption(f"**Timestamp:** {format_timestamp(timestamp)}") # Load and display the completion data data = try_load_json(selected_path) if data is None: st.error("Failed to parse JSON file.") raw = selected_path.read_text(encoding="utf-8", errors="replace") st.code(raw, language="json") return if not isinstance(data, dict): st.error("Expected JSON object (dict) at root level.") st.json(data) return render_completion_data(data) if __name__ == "__main__": main()