arjunguha's picture
first load bug
a695db3 unverified
"""
Gradio app to view tasks.jsonl and validated_tasks.jsonl side by side.
Run with:
uv --with gradio python -m buildabench_workshop.view_tasks_gradio tasks.jsonl validated_tasks.jsonl
"""
from __future__ import annotations
import argparse
import difflib
import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
def normalize_task(task: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize a task by removing unwanted fields and cleaning structure."""
# Fields to remove completely
fields_to_remove = {
"matching_files",
"commit_sha",
"task_id",
"repo",
"log",
"tips",
"container",
}
def clean_dict(obj: Any) -> Any:
"""Recursively clean dictionaries."""
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
# Skip unwanted top-level fields
if key in fields_to_remove:
continue
# Skip nested fields with these names
if "matching_files" in key or "log" in key or "tips" in key or "container" in key:
continue
result[key] = clean_dict(value)
return result
elif isinstance(obj, list):
return [clean_dict(item) for item in obj]
else:
return obj
return clean_dict(task)
def load_jsonl(filepath: Path) -> List[Dict[str, Any]]:
"""Load a JSONL file and return a list of normalized dictionaries."""
data = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
task = json.loads(line)
# Normalize the task (removes unwanted fields)
task = normalize_task(task)
data.append(task)
return data
def load_and_join_tasks(tasks_path: Path, validated_path: Path) -> Dict[str, Dict[str, Any]]:
"""Load both files, normalize, and join by task_id. Returns clean structure."""
result: Dict[str, Dict[str, Any]] = {}
# Load tasks.jsonl
with open(tasks_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
task = json.loads(line)
task_id = task["task_id"]
normalized = normalize_task(task)
# Preserve repo separately for descriptions
repo = task["repo"]
result[task_id] = {"task": normalized, "validated": None, "_repo": repo}
# Load validated_tasks.jsonl
with open(validated_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
validated = json.loads(line)
task_id = validated["task_id"]
normalized = normalize_task(validated)
if task_id not in result:
repo = validated["repo"]
result[task_id] = {"task": None, "validated": normalized, "_repo": repo}
else:
# Preserve repo if not already set
if "_repo" not in result[task_id] or not result[task_id]["_repo"]:
repo = validated["repo"]
result[task_id]["_repo"] = repo
result[task_id]["validated"] = normalized
return result
def build_paths(
data: Any,
prefix: str = "",
max_list_items: int = 5,
paths: Optional[Set[str]] = None,
) -> Set[str]:
"""Collect dotted paths for dict/list structures."""
if paths is None:
paths = set()
if isinstance(data, dict):
for key in sorted(data.keys()):
path = f"{prefix}.{key}" if prefix else key
paths.add(path)
build_paths(data[key], path, max_list_items=max_list_items, paths=paths)
elif isinstance(data, list):
for i, item in enumerate(data[:max_list_items]):
path = f"{prefix}[{i}]" if prefix else f"[{i}]"
paths.add(path)
build_paths(item, path, max_list_items=max_list_items, paths=paths)
return paths
def get_value_by_path(obj: Any, path: str) -> Any:
"""Get a value from nested dict/list using dot notation with [idx]."""
if obj is None:
return None
if isinstance(obj, dict) and path in obj:
return obj[path]
parts = path.split(".") if path else []
current = obj
for i, part in enumerate(parts):
if not isinstance(current, (dict, list)):
return None
# If a full remaining path exists as a key, return it
if isinstance(current, dict) and part not in current:
remaining = ".".join(parts[i:])
if remaining in current:
return current[remaining]
key = part.split("[", 1)[0] if "[" in part else part
if key:
if not isinstance(current, dict) or key not in current:
return None
current = current[key]
# Handle list indexes like key[0][1]
indices: List[int] = []
start = 0
while True:
open_idx = part.find("[", start)
if open_idx == -1:
break
close_idx = part.find("]", open_idx)
if close_idx == -1:
break
idx_str = part[open_idx + 1 : close_idx]
if idx_str.isdigit():
indices.append(int(idx_str))
start = close_idx + 1
for idx in indices:
if not isinstance(current, list) or idx >= len(current):
return None
current = current[idx]
return current
def format_value(value: Any) -> str:
if value is None:
return "null"
if isinstance(value, (dict, list)):
return json.dumps(value, indent=2, ensure_ascii=False)
return str(value)
def diff_values(left: Any, right: Any) -> str:
left_text = format_value(left).splitlines()
right_text = format_value(right).splitlines()
diff = difflib.unified_diff(
left_text,
right_text,
fromfile="task",
tofile="validated",
lineterm="",
)
return "\n".join(diff) if left is not None or right is not None else ""
def build_tree_items(
data: Any,
prefix: str = "",
max_list_items: int = 5,
items: Optional[List[Tuple[str, str]]] = None,
) -> List[Tuple[str, str]]:
"""Build tree items as (path, display_label) tuples, matching view_tasks.py structure."""
if items is None:
items = []
if isinstance(data, dict):
for key in sorted(data.keys()):
path = f"{prefix}.{key}" if prefix else key
display = key
# Special handling for 'task' and 'validated' wrappers - show their children directly
if key in ("task", "validated"):
value = data[key]
if value and isinstance(value, dict):
wrapper_path = path
for sub_key in sorted(value.keys()):
sub_path = f"{wrapper_path}.{sub_key}"
sub_value = value[sub_key]
# For matching_files, flatten it
if sub_key == "matching_files" and isinstance(sub_value, list):
items.append((sub_path, sub_key))
continue
has_children = isinstance(sub_value, dict) and not isinstance(sub_value, list)
items.append((sub_path, sub_key))
if has_children:
build_tree_items(sub_value, sub_path, max_list_items=max_list_items, items=items)
elif isinstance(sub_value, list) and len(sub_value) > 0:
for i, item in enumerate(sub_value[:max_list_items]):
item_path = f"{sub_path}[{i}]"
items.append((item_path, f"[{i}]"))
if isinstance(item, (dict, list)):
build_tree_items(item, item_path, max_list_items=max_list_items, items=items)
if len(sub_value) > max_list_items:
items.append((f"{sub_path}[...]", f"[... ({len(sub_value)} total)]"))
continue
# For matching_files, flatten it
if key == "matching_files" and isinstance(data[key], list):
items.append((path, display))
continue
value = data[key]
has_children = isinstance(value, dict) and not isinstance(value, list)
items.append((path, display))
if has_children:
build_tree_items(value, path, max_list_items=max_list_items, items=items)
elif isinstance(value, list) and len(value) > 0:
for i, item in enumerate(value[:max_list_items]):
item_path = f"{path}[{i}]"
items.append((item_path, f"[{i}]"))
if isinstance(item, (dict, list)):
build_tree_items(item, item_path, max_list_items=max_list_items, items=items)
if len(value) > max_list_items:
items.append((f"{path}[...]", f"[... ({len(value)} total)]"))
elif isinstance(data, list):
for i, item in enumerate(data[:max_list_items]):
path = f"{prefix}[{i}]" if prefix else f"[{i}]"
items.append((path, f"[{i}]"))
if isinstance(item, (dict, list)):
build_tree_items(item, path, max_list_items=max_list_items, items=items)
if len(data) > max_list_items:
items.append((f"{prefix}[...]", f"[... ({len(data)} total)]"))
return items
def build_app(joined_data: Dict[str, Dict[str, Any]]):
import gradio as gr
task_ids = sorted(joined_data.keys())
def get_field_paths(task_id: str) -> List[str]:
"""Get list of field paths to display, in order."""
entry = joined_data[task_id]
# Get all paths
paths: Set[str] = set()
if entry["task"] is not None:
paths |= build_paths(entry["task"])
if entry["validated"] is not None:
paths |= build_paths(entry["validated"])
# Filter out unwanted fields
filtered_paths = []
reasoning_path = None
for path in sorted(paths):
path_parts = path.split(".")
# Skip log, repo, tips, container, matching_files, commit_message, commit_sha, task_id, patches, subject
if ("log" in path_parts or "repo" in path_parts or
"tips" in path_parts or "container" in path_parts or
"matching_files" in path_parts or
path == "commit_message" or path == "commit_sha" or path == "task_id" or
path == "patches" or path == "subject"):
continue
# Track reasoning separately to put it last
if path == "reasoning" or path.endswith(".reasoning"):
reasoning_path = path
continue
filtered_paths.append(path)
# Put task_description first
if "task_description" in filtered_paths:
filtered_paths.remove("task_description")
filtered_paths.insert(0, "task_description")
# Add reasoning last
if reasoning_path:
filtered_paths.append(reasoning_path)
return filtered_paths
def render_field(task_id: str, path: str) -> str:
"""Render a single field's content as markdown."""
entry = joined_data[task_id]
task_value = get_value_by_path(entry["task"], path)
validated_value = get_value_by_path(entry["validated"], path)
# Skip if both values are None
if task_value is None and validated_value is None:
return ""
# Determine if this is a code field (diff or patch)
is_diff = path.endswith(".diff") or "diff" in path.lower()
is_patch = "patch" in path.lower() and not is_diff
is_code_field = is_diff or is_patch
# Check if values are identical
task_str = format_value(task_value) if task_value is not None else None
validated_str = format_value(validated_value) if validated_value is not None else None
values_identical = task_str == validated_str
lines = []
# Special handling for task_description - plain text (no code fences)
if path == "task_description":
if task_value is not None:
return task_str
return ""
# If values are identical, show only once
if values_identical and task_value is not None:
value_str = task_str
if is_code_field:
if is_diff:
lines.append(f"```diff\n{value_str}\n```")
else: # patch
lines.append(f"```patch\n{value_str}\n```")
else:
lines.append(value_str)
else:
# Values differ or one is missing - show both
if task_value is not None:
value_str = task_str
if is_code_field:
if is_diff:
lines.append(f"```diff\n{value_str}\n```")
else: # patch
lines.append(f"```patch\n{value_str}\n```")
else:
lines.append(value_str)
if validated_value is not None:
value_str = validated_str
if is_code_field:
if is_diff:
lines.append(f"```diff\n{value_str}\n```")
else: # patch
lines.append(f"```patch\n{value_str}\n```")
else:
lines.append(value_str)
# Show diff if both values exist and differ
if task_value is not None and validated_value is not None:
diff_text = diff_values(task_value, validated_value)
if diff_text.strip():
lines.append(f"```diff\n{diff_text}\n```")
return "\n\n".join(lines)
def extract_repo_from_task_id(task_id: str) -> str:
"""Extract and format repo from task_id.
Example: "JuliaORNL#JACC.jl.tar/0" -> "JuliaORNL/JACC.jl"
"""
try:
# Split by "/" to get the part before the number
parts = task_id.split("/")
if len(parts) < 2:
return task_id
repo_part = parts[0] # "JuliaORNL#JACC.jl.tar"
# Split by # to separate org and repo name
org_repo = repo_part.split("#")
if len(org_repo) < 2:
return task_id
org = org_repo[0] # "JuliaORNL"
repo_with_ext = org_repo[1] # "JACC.jl.tar"
# Split by . and take first two parts (JACC.jl)
repo_parts = repo_with_ext.split(".")
if len(repo_parts) >= 2:
repo_name = f"{repo_parts[0]}.{repo_parts[1]}" # "JACC.jl"
return f"{org}/{repo_name}" # "JuliaORNL/JACC.jl"
# Fallback: just use the repo_with_ext
return f"{org}/{repo_with_ext}"
except Exception:
# If parsing fails, return task_id as-is
return task_id
def get_task_display_name(task_id: str) -> str:
"""Get display name for task including formatted repo and subject."""
entry = joined_data[task_id]
subject = None
# Try to get subject from task (tasks.jsonl has "subject" field)
if entry["task"] is not None and "subject" in entry["task"]:
subject = entry["task"]["subject"]
elif entry["validated"] is not None and "subject" in entry["validated"]:
subject = entry["validated"]["subject"]
# Extract and format repo from task_id
repo = extract_repo_from_task_id(task_id)
if subject:
# Extract first line if it's multi-line
subject_line = subject.split("\n")[0].strip()
return f"{repo} - {subject_line}"
return repo
def get_field_description(task_id: str, path: str) -> str:
"""Get description text for a field."""
entry = joined_data[task_id]
if path == "task_description":
return "*This is the prompt to the agent, asking it to implement an existing feature in the repository.*"
elif path == "src.diff":
return "*This patch removes the feature from the repository. The goal is to ensure the repo is in a working state.*"
elif path == "tests.diff":
return "*This patch adds tests for the feature to the repository. After the agent solves the task, we run these tests to see if it did it correctly*"
elif path == "reasoning":
return "*This is the model's reasoning for why this is a good task and how to do it. It's for debugging.*"
return ""
def update_task(task_id: str):
"""Update UI when task changes. Returns content for all field tabs."""
if not task_id:
return [""] * len(field_paths)
return [render_field(task_id, path) for path in field_paths]
# Create dropdown choices with display names
task_choices = [(get_task_display_name(tid), tid) for tid in task_ids]
# Get field paths from first task (or collect from all tasks)
field_paths = []
if task_ids:
# Get fields from first task
field_paths = get_field_paths(task_ids[0])
# Also check other tasks to get all possible fields
all_paths = set(field_paths)
for tid in task_ids[1:]:
all_paths.update(get_field_paths(tid))
# Sort and maintain order: task_description first, reasoning last
field_paths = []
if "task_description" in all_paths:
field_paths.append("task_description")
for path in sorted(all_paths):
if path not in ["task_description", "reasoning"]:
field_paths.append(path)
if "reasoning" in all_paths:
field_paths.append("reasoning")
with gr.Blocks(title="Task Viewer") as demo:
gr.Markdown("# Task Viewer")
# Task dropdown at the top
task_list = gr.Dropdown(
label="Task",
choices=task_choices,
value=task_ids[0] if task_ids else None,
interactive=True,
)
# Tabs for each field
if field_paths:
# Pre-compute initial values for first task to ensure all tabs are properly initialized
initial_task_id = task_ids[0] if task_ids else None
initial_field_contents = []
initial_descriptions = []
if initial_task_id:
initial_field_contents = [render_field(initial_task_id, path) for path in field_paths]
initial_descriptions = [get_field_description(initial_task_id, path) for path in field_paths]
else:
initial_field_contents = [""] * len(field_paths)
initial_descriptions = [""] * len(field_paths)
with gr.Tabs() as field_tabs:
field_components = []
description_components = []
for i, path in enumerate(field_paths):
with gr.Tab(path):
# Description component (will be updated dynamically)
desc_comp = gr.Markdown(value=initial_descriptions[i] if i < len(initial_descriptions) else "")
description_components.append(desc_comp)
# Use Markdown for all fields (supports code fences)
comp = gr.Markdown(value=initial_field_contents[i] if i < len(initial_field_contents) else "")
field_components.append(comp)
else:
gr.Markdown("No fields available")
field_components = []
description_components = []
def update_task_with_descriptions(task_id: str):
"""Update UI when task changes. Returns content for all field tabs and descriptions."""
if not task_id:
field_contents = [""] * len(field_paths)
descriptions = [""] * len(field_paths)
else:
field_contents = [render_field(task_id, path) for path in field_paths]
descriptions = [get_field_description(task_id, path) for path in field_paths]
return field_contents + descriptions
# Initialize with first task on load
def on_load():
if task_ids and field_components:
return update_task_with_descriptions(task_ids[0])
return [""] * (len(field_components) + len(description_components))
if field_components:
all_outputs = field_components + description_components
demo.load(on_load, outputs=all_outputs)
# Update fields and descriptions when task changes
task_list.change(
update_task_with_descriptions,
inputs=[task_list],
outputs=all_outputs,
)
return demo
def main() -> None:
parser = argparse.ArgumentParser(description="View tasks.jsonl and validated_tasks.jsonl in Gradio")
parser.add_argument("--tasks", type=Path, default=Path("tasks.jsonl"), help="Path to tasks.jsonl file")
parser.add_argument("--validated", type=Path, default=Path("validated_tasks.jsonl"), help="Path to validated_tasks.jsonl file")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host for web server")
parser.add_argument("--port", type=int, default=7860, help="Port for web server")
args = parser.parse_args()
joined_data = load_and_join_tasks(args.tasks, args.validated)
app = build_app(joined_data)
app.launch(server_name=args.host, server_port=args.port)
if __name__ == "__main__":
main()