Spaces:
Sleeping
Sleeping
| """StateBench Explorer - Interactive inspection of the StateBench benchmark. | |
| This Gradio app allows you to: | |
| 1. Browse timelines from parslee/statebench | |
| 2. View conversation events and state changes | |
| 3. See context built by different memory baselines | |
| 4. Inspect ground truth (must mention, must not mention, decision) | |
| """ | |
| import json | |
| import gradio as gr | |
| from datasets import load_dataset | |
| # Load dataset once at startup | |
| print("Loading StateBench dataset...") | |
| ds = load_dataset("parslee/statebench") | |
| print(f"Loaded: train={len(ds['train'])}, validation={len(ds['validation'])}, test={len(ds['test'])}") | |
| # Available baselines | |
| BASELINES = [ | |
| "transcript_replay", | |
| "no_memory", | |
| "rolling_summary", | |
| "fact_extraction", | |
| "state_based", | |
| ] | |
| # Track descriptions | |
| TRACK_INFO = { | |
| "supersession": "Facts invalidated by newer information", | |
| "commitment_durability": "Commitments survive interruptions", | |
| "interruption_resumption": "Context survives topic switches", | |
| "scope_permission": "Role-based access control", | |
| "environmental_freshness": "Time-sensitive state expiration", | |
| "hallucination_resistance": "Only assert established state", | |
| "scope_leak": "Task-local state stays local", | |
| "causality": "Multi-constraint dependencies", | |
| "repair_propagation": "Fixes propagate to dependent facts", | |
| "brutal_realistic": "Real-world complexity scenarios", | |
| "supersession_detection": "Infer supersession from natural language", | |
| "authority_hierarchy": "Respect authority levels", | |
| "enterprise_privacy": "Cross-tenant isolation", | |
| } | |
| def get_timeline_list(split: str, track_filter: str) -> list[str]: | |
| """Get list of timeline IDs for dropdown.""" | |
| data = ds[split] | |
| if track_filter and track_filter != "All": | |
| # Filter by track | |
| ids = [row["id"] for row in data if row["track"] == track_filter] | |
| else: | |
| ids = [row["id"] for row in data] | |
| return ids[:100] # Limit for performance | |
| def parse_timeline(row: dict) -> dict: | |
| """Parse a timeline row from HF dataset.""" | |
| events = json.loads(row["events"]) | |
| initial_state = json.loads(row["initial_state"]) | |
| actors = json.loads(row["actors"]) | |
| return { | |
| "id": row["id"], | |
| "track": row["track"], | |
| "domain": row["domain"], | |
| "difficulty": row["difficulty"], | |
| "detection_mode": row["detection_mode"], | |
| "events": events, | |
| "initial_state": initial_state, | |
| "actors": actors, | |
| "user_name": row["user_name"], | |
| "user_authority": row["user_authority"], | |
| } | |
| def format_events(events: list[dict]) -> str: | |
| """Format events as readable markdown.""" | |
| lines = [] | |
| for i, event in enumerate(events): | |
| event_type = event.get("type", "unknown") | |
| if event_type == "conversation_turn": | |
| speaker = event.get("speaker", "?") | |
| text = event.get("text", "") | |
| emoji = "๐ค" if speaker == "user" else "๐ค" | |
| lines.append(f"**{emoji} {speaker.title()}**: {text}\n") | |
| elif event_type == "state_write": | |
| writes = event.get("writes", []) | |
| lines.append(f"**๐ State Write**:") | |
| for w in writes: | |
| lines.append(f" - `{w.get('key')}`: {w.get('value')}") | |
| lines.append("") | |
| elif event_type == "supersession": | |
| writes = event.get("writes", []) | |
| lines.append(f"**๐ Supersession**:") | |
| for w in writes: | |
| supersedes = w.get("supersedes", "") | |
| lines.append(f" - `{w.get('key')}`: {w.get('value')}") | |
| if supersedes: | |
| lines.append(f" *(supersedes: {supersedes})*") | |
| lines.append("") | |
| elif event_type == "query": | |
| prompt = event.get("prompt", "") | |
| lines.append(f"**โ Query**: {prompt}\n") | |
| gt = event.get("ground_truth", {}) | |
| if gt: | |
| lines.append(f" - **Expected Decision**: `{gt.get('decision', 'N/A')}`") | |
| lines.append(f" - **Decision Type**: {gt.get('decision_type', 'N/A')}") | |
| must_mention = gt.get("must_mention", []) | |
| if must_mention: | |
| mentions = [m if isinstance(m, str) else m.get("phrase", str(m)) for m in must_mention] | |
| lines.append(f" - **Must Mention**: {mentions}") | |
| must_not = gt.get("must_not_mention", []) | |
| if must_not: | |
| forbidden = [m if isinstance(m, str) else m.get("phrase", str(m)) for m in must_not] | |
| lines.append(f" - **Must NOT Mention** โ ๏ธ: {forbidden}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def format_initial_state(state: dict) -> str: | |
| """Format initial state as markdown.""" | |
| lines = ["## Initial State\n"] | |
| # Identity | |
| identity = state.get("identity_role", {}) | |
| lines.append(f"**User**: {identity.get('user_name', 'N/A')}") | |
| lines.append(f"**Authority**: {identity.get('authority', 'N/A')}") | |
| if identity.get("department"): | |
| lines.append(f"**Department**: {identity.get('department')}") | |
| if identity.get("organization"): | |
| lines.append(f"**Organization**: {identity.get('organization')}") | |
| lines.append("") | |
| # Persistent facts | |
| facts = state.get("persistent_facts", []) | |
| if facts: | |
| lines.append("### Persistent Facts") | |
| for f in facts: | |
| lines.append(f"- `{f.get('key')}`: {f.get('value')}") | |
| lines.append("") | |
| # Working set | |
| working = state.get("working_set", []) | |
| if working: | |
| lines.append("### Working Set") | |
| for w in working: | |
| lines.append(f"- `{w.get('key')}`: {w.get('value')}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def build_context_with_baseline(events: list[dict], baseline_name: str) -> str: | |
| """Build context using specified baseline.""" | |
| try: | |
| from statebench.baselines import get_baseline | |
| from statebench.schema.timeline import ConversationTurn, StateWrite, Supersession, Query | |
| baseline = get_baseline(baseline_name, token_budget=8000) | |
| baseline.reset() | |
| # Find the last query | |
| last_query = None | |
| for event in events: | |
| if event.get("type") == "query": | |
| last_query = event | |
| if not last_query: | |
| return "No query found in timeline" | |
| # Process events up to query | |
| for event in events: | |
| event_type = event.get("type") | |
| if event_type == "query": | |
| # Build context at query point | |
| context_result = baseline.build_context(event.get("prompt", "")) | |
| return context_result.context | |
| elif event_type == "conversation_turn": | |
| parsed = ConversationTurn.model_validate(event) | |
| baseline.process_event(parsed) | |
| elif event_type == "state_write": | |
| parsed = StateWrite.model_validate(event) | |
| baseline.process_event(parsed) | |
| elif event_type == "supersession": | |
| parsed = Supersession.model_validate(event) | |
| baseline.process_event(parsed) | |
| return "Could not build context" | |
| except ImportError: | |
| # Fallback: show raw conversation when statebench not available | |
| lines = ["*Context building requires statebench package (not available in this Space)*\n"] | |
| lines.append("**Raw conversation:**\n") | |
| for event in events: | |
| if event.get("type") == "conversation_turn": | |
| speaker = event.get("speaker", "?") | |
| text = event.get("text", "") | |
| lines.append(f"- **{speaker}**: {text}") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"Error building context: {str(e)}" | |
| def inspect_timeline(split: str, timeline_id: str, baseline: str): | |
| """Main inspection function.""" | |
| if not timeline_id: | |
| return "Select a timeline", "", "", "" | |
| # Find the timeline | |
| data = ds[split] | |
| row = None | |
| for r in data: | |
| if r["id"] == timeline_id: | |
| row = dict(r) | |
| break | |
| if not row: | |
| return f"Timeline {timeline_id} not found", "", "", "" | |
| # Parse timeline | |
| timeline = parse_timeline(row) | |
| # Format metadata | |
| track_desc = TRACK_INFO.get(timeline["track"], "") | |
| metadata = f"""## {timeline['id']} | |
| **Track**: {timeline['track']} - *{track_desc}* | |
| **Domain**: {timeline['domain']} | |
| **Difficulty**: {timeline['difficulty']} | |
| **Detection Mode**: {timeline['detection_mode']} | |
| **User**: {timeline['user_name']} ({timeline['user_authority']}) | |
| """ | |
| # Format events | |
| events_md = format_events(timeline["events"]) | |
| # Format initial state | |
| state_md = format_initial_state(timeline["initial_state"]) | |
| # Build context | |
| context = build_context_with_baseline(timeline["events"], baseline) | |
| context_md = f"## Context ({baseline})\n\n```\n{context}\n```" | |
| return metadata, events_md, state_md, context_md | |
| def update_timeline_list(split: str, track: str): | |
| """Update timeline dropdown based on filters.""" | |
| ids = get_timeline_list(split, track) | |
| return gr.Dropdown(choices=ids, value=ids[0] if ids else None) | |
| # Build the Gradio interface | |
| with gr.Blocks(title="StateBench Explorer") as demo: | |
| gr.Markdown(""" | |
| # ๐ StateBench Explorer | |
| Interactive inspection of the [StateBench](https://huggingface.co/datasets/parslee/statebench) benchmark | |
| for LLM state correctness. | |
| **Select a timeline** to view its events, ground truth, and context built by different memory baselines. | |
| """) | |
| with gr.Row(): | |
| split_dropdown = gr.Dropdown( | |
| choices=["test", "validation", "train"], | |
| value="test", | |
| label="Split" | |
| ) | |
| track_dropdown = gr.Dropdown( | |
| choices=["All"] + list(TRACK_INFO.keys()), | |
| value="All", | |
| label="Track Filter" | |
| ) | |
| baseline_dropdown = gr.Dropdown( | |
| choices=BASELINES, | |
| value="transcript_replay", | |
| label="Baseline for Context" | |
| ) | |
| timeline_dropdown = gr.Dropdown( | |
| choices=get_timeline_list("test", "All"), | |
| label="Timeline ID", | |
| value=get_timeline_list("test", "All")[0] if get_timeline_list("test", "All") else None | |
| ) | |
| inspect_btn = gr.Button("๐ Inspect Timeline", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| metadata_output = gr.Markdown(label="Metadata") | |
| state_output = gr.Markdown(label="Initial State") | |
| with gr.Column(scale=2): | |
| events_output = gr.Markdown(label="Events") | |
| context_output = gr.Markdown(label="Built Context") | |
| # Event handlers | |
| split_dropdown.change( | |
| fn=update_timeline_list, | |
| inputs=[split_dropdown, track_dropdown], | |
| outputs=[timeline_dropdown] | |
| ) | |
| track_dropdown.change( | |
| fn=update_timeline_list, | |
| inputs=[split_dropdown, track_dropdown], | |
| outputs=[timeline_dropdown] | |
| ) | |
| inspect_btn.click( | |
| fn=inspect_timeline, | |
| inputs=[split_dropdown, timeline_dropdown, baseline_dropdown], | |
| outputs=[metadata_output, events_output, state_output, context_output] | |
| ) | |
| # Auto-inspect on timeline change | |
| timeline_dropdown.change( | |
| fn=inspect_timeline, | |
| inputs=[split_dropdown, timeline_dropdown, baseline_dropdown], | |
| outputs=[metadata_output, events_output, state_output, context_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **Resources**: [Dataset](https://huggingface.co/datasets/parslee/statebench) | | |
| [GitHub](https://github.com/Parslee-ai/statebench) | | |
| [Paper](https://github.com/Parslee-ai/statebench/blob/main/paper.pdf) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |