agg-trace-visualizer / backend /api /harbor_datasets.py
Zayne Rea Sprague
fixes
499ba81
import json
import hashlib
from flask import Blueprint, request, jsonify
from datasets import load_dataset
bp = Blueprint("harbor_datasets", __name__, url_prefix="/api/harbor/datasets")
_cache: dict[str, dict] = {}
def _make_id(repo: str, split: str) -> str:
key = f"{repo}:{split}"
return hashlib.md5(key.encode()).hexdigest()[:12]
def _parse_trajectory(traj_json: str) -> dict:
"""Parse ATIF trajectory JSON into structured steps (v1.2 and v1.5)."""
if not traj_json:
return {"steps": [], "agent_info": {}, "final_metrics": {}}
try:
traj = json.loads(traj_json) if isinstance(traj_json, str) else traj_json
except (json.JSONDecodeError, TypeError):
return {"steps": [], "agent_info": {}, "final_metrics": {}}
steps = []
for step in traj.get("steps", []):
parsed = {
"index": len(steps),
"source": step.get("source", "unknown"),
"message": step.get("message", ""),
"timestamp": step.get("timestamp"),
}
if step.get("source") == "agent":
parsed["reasoning"] = step.get("reasoning_content", "")
parsed["tool_calls"] = []
for tc in step.get("tool_calls", []):
args = tc.get("arguments", {})
tool_call = {
"function": tc.get("function_name", ""),
"arguments": args,
}
# v1.2 uses "command", v1.5 uses "cmd"
cmd = args.get("command", "") or args.get("cmd", "")
if cmd:
tool_call["command"] = cmd
parsed["tool_calls"].append(tool_call)
obs = step.get("observation", {})
if obs:
if isinstance(obs, dict) and "results" in obs:
results = obs["results"]
if results:
parsed["observation"] = results[0].get("content", "") if isinstance(results[0], dict) else str(results[0])
else:
parsed["observation"] = ""
elif isinstance(obs, str):
parsed["observation"] = obs
else:
parsed["observation"] = json.dumps(obs)
parsed["metrics"] = step.get("metrics", {})
elif step.get("source") == "system":
pass # message is enough
elif step.get("source") == "user":
pass # message is enough
steps.append(parsed)
return {
"steps": steps,
"agent_info": traj.get("agent", {}),
"final_metrics": traj.get("final_metrics", {}),
}
def _parse_trajectory_raw(traj_raw: str) -> list[dict]:
"""Parse trajectory_raw into chat-style steps.
Handles two formats:
1. Flat list of OpenAI messages
2. Dict with {info, messages, trajectory_format} (mini-swe-agent format)
"""
if not traj_raw:
return []
try:
parsed = json.loads(traj_raw) if isinstance(traj_raw, str) else traj_raw
except (json.JSONDecodeError, TypeError):
return []
# Extract messages list and optional info
info = {}
if isinstance(parsed, dict):
info = parsed.get("info", {})
messages = parsed.get("messages", [])
elif isinstance(parsed, list):
messages = parsed
else:
return []
steps = []
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
continue
role = msg.get("role", "unknown")
content = msg.get("content", "")
step = {
"index": i,
"role": role,
"content": content if isinstance(content, str) else json.dumps(content) if content else "",
}
# Assistant messages may have tool_calls (OpenAI format)
if role == "assistant" and "tool_calls" in msg:
tool_calls = msg["tool_calls"]
step["tool_calls"] = []
for tc in (tool_calls if isinstance(tool_calls, list) else []):
fn = tc.get("function", {})
call = {
"id": tc.get("id", ""),
"function": fn.get("name", ""),
"arguments_raw": fn.get("arguments", ""),
}
try:
args = json.loads(fn.get("arguments", "{}"))
call["arguments"] = args
if "command" in args:
call["command"] = args["command"]
except (json.JSONDecodeError, TypeError):
call["arguments"] = {}
step["tool_calls"].append(call)
# Tool messages have tool_call_id
if role == "tool":
step["tool_call_id"] = msg.get("tool_call_id", "")
steps.append(step)
# Attach info as metadata on first step if available
if steps and info:
steps[0]["_info"] = info
return steps
def _parse_agent_output_jsonl(agent_output: str) -> list[dict]:
"""Parse Codex-style JSONL agent_output into chat-style steps.
Codex emits newline-delimited JSON with item.completed events containing
reasoning, agent_message, and command_execution items. Falls back
gracefully if the format is unrecognised.
"""
if not agent_output:
return []
steps: list[dict] = []
idx = 0
for line in agent_output.strip().split("\n"):
try:
event = json.loads(line)
except (json.JSONDecodeError, TypeError):
continue
if event.get("type") != "item.completed":
continue
item = event.get("item", {})
item_type = item.get("type", "")
if item_type == "reasoning":
steps.append({
"index": idx,
"role": "assistant",
"content": item.get("text", ""),
"_reasoning": True,
})
idx += 1
elif item_type == "agent_message":
steps.append({
"index": idx,
"role": "assistant",
"content": item.get("text", ""),
})
idx += 1
elif item_type == "command_execution":
cmd = item.get("command", "")
call_id = item.get("call_id", item.get("id", ""))
# Assistant step with tool call
steps.append({
"index": idx,
"role": "assistant",
"content": "",
"tool_calls": [{
"id": call_id,
"function": "exec_command",
"arguments_raw": json.dumps({"command": cmd}),
"arguments": {"command": cmd},
"command": cmd,
}],
})
idx += 1
# Tool response step — Codex uses "aggregated_output", others use "output"
output = item.get("output", "") or item.get("aggregated_output", "")
exit_code = item.get("exit_code")
status = item.get("status", "")
response_text = output
if exit_code is not None:
header = f"[exit code: {exit_code}]"
if status:
header = f"[{status} — exit code: {exit_code}]"
response_text = f"{header}\n{output}" if output else header
steps.append({
"index": idx,
"role": "tool",
"content": response_text,
"tool_call_id": call_id,
})
idx += 1
return steps
def _build_instance_summary(row: dict) -> dict:
"""Build a summary for one instance row."""
return {
"instance_id": row.get("instance_id", ""),
"resolved": row.get("resolved", False),
"reward": row.get("reward", 0),
"model": row.get("model", ""),
"agent": row.get("agent", ""),
"duration_seconds": row.get("duration_seconds", 0),
"error": row.get("error", ""),
}
@bp.route("/load", methods=["POST"])
def load_dataset_endpoint():
data = request.get_json()
repo = data.get("repo", "").strip()
if not repo:
return jsonify({"error": "repo is required"}), 400
split = data.get("split", "train")
try:
ds = load_dataset(repo, split=split)
except Exception as e:
return jsonify({"error": f"Failed to load dataset: {e}"}), 400
ds_id = _make_id(repo, split)
# Build instance summaries and index
instances = []
instance_index = {}
for i in range(len(ds)):
row = ds[i]
summary = _build_instance_summary(row)
instances.append(summary)
instance_index[row.get("instance_id", "")] = i
_cache[ds_id] = {
"repo": repo,
"split": split,
"dataset": ds,
"instances": instances,
"instance_index": instance_index,
}
short_name = repo.rsplit("/", 1)[-1] if "/" in repo else repo
return jsonify({
"id": ds_id,
"repo": repo,
"name": short_name,
"split": split,
"instances": instances,
"n_instances": len(instances),
})
@bp.route("/", methods=["GET"])
def list_datasets():
result = []
for ds_id, info in _cache.items():
result.append({
"id": ds_id,
"repo": info["repo"],
"name": info["repo"].rsplit("/", 1)[-1] if "/" in info["repo"] else info["repo"],
"split": info["split"],
"n_instances": len(info["instances"]),
"instances": info["instances"],
})
return jsonify(result)
@bp.route("/<ds_id>/instances", methods=["GET"])
def get_instances(ds_id):
if ds_id not in _cache:
return jsonify({"error": "Dataset not loaded"}), 404
return jsonify(_cache[ds_id]["instances"])
@bp.route("/<ds_id>/instance/<path:instance_id>", methods=["GET"])
def get_instance(ds_id, instance_id):
"""Get full parsed trajectory for one instance."""
if ds_id not in _cache:
return jsonify({"error": "Dataset not loaded"}), 404
info = _cache[ds_id]
if instance_id not in info["instance_index"]:
return jsonify({"error": f"Instance {instance_id} not found"}), 404
idx = info["instance_index"][instance_id]
row = info["dataset"][idx]
# Parse ATIF trajectory
atif = _parse_trajectory(row.get("trajectory", ""))
# Parse raw trajectory (OpenAI messages), fall back to agent_output JSONL
raw_steps = _parse_trajectory_raw(row.get("trajectory_raw", ""))
if not raw_steps and row.get("agent_output"):
raw_steps = _parse_agent_output_jsonl(row["agent_output"])
return jsonify({
"instance_id": instance_id,
"resolved": row.get("resolved", False),
"reward": row.get("reward", 0),
"model": row.get("model", ""),
"agent": row.get("agent", ""),
"duration_seconds": row.get("duration_seconds", 0),
"error": row.get("error", ""),
"atif": atif,
"raw_steps": raw_steps,
"n_atif_steps": len(atif["steps"]),
"n_raw_steps": len(raw_steps),
})
@bp.route("/<ds_id>/instance/<path:instance_id>/raw", methods=["GET"])
def get_instance_raw(ds_id, instance_id):
"""Get raw logs: agent_stdout, setup_stderr, verifier_report."""
if ds_id not in _cache:
return jsonify({"error": "Dataset not loaded"}), 404
info = _cache[ds_id]
if instance_id not in info["instance_index"]:
return jsonify({"error": f"Instance {instance_id} not found"}), 404
idx = info["instance_index"][instance_id]
row = info["dataset"][idx]
return jsonify({
"instance_id": instance_id,
"agent_stdout": row.get("agent_stdout", ""),
"setup_stderr": row.get("setup_stderr", ""),
"verifier_report": row.get("verifier_report", ""),
"verifier_stdout": row.get("verifier_stdout", ""),
})
@bp.route("/<ds_id>", methods=["DELETE"])
def unload_dataset(ds_id):
if ds_id in _cache:
del _cache[ds_id]
return jsonify({"status": "ok"})