import json import hashlib from flask import Blueprint, request, jsonify from datasets import load_dataset bp = Blueprint("harbor_datasets", __name__, url_prefix="/api/harbor/datasets") _cache: dict[str, dict] = {} def _make_id(repo: str, split: str) -> str: key = f"{repo}:{split}" return hashlib.md5(key.encode()).hexdigest()[:12] def _parse_trajectory(traj_json: str) -> dict: """Parse ATIF trajectory JSON into structured steps (v1.2 and v1.5).""" if not traj_json: return {"steps": [], "agent_info": {}, "final_metrics": {}} try: traj = json.loads(traj_json) if isinstance(traj_json, str) else traj_json except (json.JSONDecodeError, TypeError): return {"steps": [], "agent_info": {}, "final_metrics": {}} steps = [] for step in traj.get("steps", []): parsed = { "index": len(steps), "source": step.get("source", "unknown"), "message": step.get("message", ""), "timestamp": step.get("timestamp"), } if step.get("source") == "agent": parsed["reasoning"] = step.get("reasoning_content", "") parsed["tool_calls"] = [] for tc in step.get("tool_calls", []): args = tc.get("arguments", {}) tool_call = { "function": tc.get("function_name", ""), "arguments": args, } # v1.2 uses "command", v1.5 uses "cmd" cmd = args.get("command", "") or args.get("cmd", "") if cmd: tool_call["command"] = cmd parsed["tool_calls"].append(tool_call) obs = step.get("observation", {}) if obs: if isinstance(obs, dict) and "results" in obs: results = obs["results"] if results: parsed["observation"] = results[0].get("content", "") if isinstance(results[0], dict) else str(results[0]) else: parsed["observation"] = "" elif isinstance(obs, str): parsed["observation"] = obs else: parsed["observation"] = json.dumps(obs) parsed["metrics"] = step.get("metrics", {}) elif step.get("source") == "system": pass # message is enough elif step.get("source") == "user": pass # message is enough steps.append(parsed) return { "steps": steps, "agent_info": traj.get("agent", {}), "final_metrics": traj.get("final_metrics", {}), } def _parse_trajectory_raw(traj_raw: str) -> list[dict]: """Parse trajectory_raw into chat-style steps. Handles two formats: 1. Flat list of OpenAI messages 2. Dict with {info, messages, trajectory_format} (mini-swe-agent format) """ if not traj_raw: return [] try: parsed = json.loads(traj_raw) if isinstance(traj_raw, str) else traj_raw except (json.JSONDecodeError, TypeError): return [] # Extract messages list and optional info info = {} if isinstance(parsed, dict): info = parsed.get("info", {}) messages = parsed.get("messages", []) elif isinstance(parsed, list): messages = parsed else: return [] steps = [] for i, msg in enumerate(messages): if not isinstance(msg, dict): continue role = msg.get("role", "unknown") content = msg.get("content", "") step = { "index": i, "role": role, "content": content if isinstance(content, str) else json.dumps(content) if content else "", } # Assistant messages may have tool_calls (OpenAI format) if role == "assistant" and "tool_calls" in msg: tool_calls = msg["tool_calls"] step["tool_calls"] = [] for tc in (tool_calls if isinstance(tool_calls, list) else []): fn = tc.get("function", {}) call = { "id": tc.get("id", ""), "function": fn.get("name", ""), "arguments_raw": fn.get("arguments", ""), } try: args = json.loads(fn.get("arguments", "{}")) call["arguments"] = args if "command" in args: call["command"] = args["command"] except (json.JSONDecodeError, TypeError): call["arguments"] = {} step["tool_calls"].append(call) # Tool messages have tool_call_id if role == "tool": step["tool_call_id"] = msg.get("tool_call_id", "") steps.append(step) # Attach info as metadata on first step if available if steps and info: steps[0]["_info"] = info return steps def _parse_agent_output_jsonl(agent_output: str) -> list[dict]: """Parse Codex-style JSONL agent_output into chat-style steps. Codex emits newline-delimited JSON with item.completed events containing reasoning, agent_message, and command_execution items. Falls back gracefully if the format is unrecognised. """ if not agent_output: return [] steps: list[dict] = [] idx = 0 for line in agent_output.strip().split("\n"): try: event = json.loads(line) except (json.JSONDecodeError, TypeError): continue if event.get("type") != "item.completed": continue item = event.get("item", {}) item_type = item.get("type", "") if item_type == "reasoning": steps.append({ "index": idx, "role": "assistant", "content": item.get("text", ""), "_reasoning": True, }) idx += 1 elif item_type == "agent_message": steps.append({ "index": idx, "role": "assistant", "content": item.get("text", ""), }) idx += 1 elif item_type == "command_execution": cmd = item.get("command", "") call_id = item.get("call_id", item.get("id", "")) # Assistant step with tool call steps.append({ "index": idx, "role": "assistant", "content": "", "tool_calls": [{ "id": call_id, "function": "exec_command", "arguments_raw": json.dumps({"command": cmd}), "arguments": {"command": cmd}, "command": cmd, }], }) idx += 1 # Tool response step — Codex uses "aggregated_output", others use "output" output = item.get("output", "") or item.get("aggregated_output", "") exit_code = item.get("exit_code") status = item.get("status", "") response_text = output if exit_code is not None: header = f"[exit code: {exit_code}]" if status: header = f"[{status} — exit code: {exit_code}]" response_text = f"{header}\n{output}" if output else header steps.append({ "index": idx, "role": "tool", "content": response_text, "tool_call_id": call_id, }) idx += 1 return steps def _build_instance_summary(row: dict) -> dict: """Build a summary for one instance row.""" return { "instance_id": row.get("instance_id", ""), "resolved": row.get("resolved", False), "reward": row.get("reward", 0), "model": row.get("model", ""), "agent": row.get("agent", ""), "duration_seconds": row.get("duration_seconds", 0), "error": row.get("error", ""), } @bp.route("/load", methods=["POST"]) def load_dataset_endpoint(): data = request.get_json() repo = data.get("repo", "").strip() if not repo: return jsonify({"error": "repo is required"}), 400 split = data.get("split", "train") try: ds = load_dataset(repo, split=split) except Exception as e: return jsonify({"error": f"Failed to load dataset: {e}"}), 400 ds_id = _make_id(repo, split) # Build instance summaries and index instances = [] instance_index = {} for i in range(len(ds)): row = ds[i] summary = _build_instance_summary(row) instances.append(summary) instance_index[row.get("instance_id", "")] = i _cache[ds_id] = { "repo": repo, "split": split, "dataset": ds, "instances": instances, "instance_index": instance_index, } short_name = repo.rsplit("/", 1)[-1] if "/" in repo else repo return jsonify({ "id": ds_id, "repo": repo, "name": short_name, "split": split, "instances": instances, "n_instances": len(instances), }) @bp.route("/", methods=["GET"]) def list_datasets(): result = [] for ds_id, info in _cache.items(): result.append({ "id": ds_id, "repo": info["repo"], "name": info["repo"].rsplit("/", 1)[-1] if "/" in info["repo"] else info["repo"], "split": info["split"], "n_instances": len(info["instances"]), "instances": info["instances"], }) return jsonify(result) @bp.route("//instances", methods=["GET"]) def get_instances(ds_id): if ds_id not in _cache: return jsonify({"error": "Dataset not loaded"}), 404 return jsonify(_cache[ds_id]["instances"]) @bp.route("//instance/", methods=["GET"]) def get_instance(ds_id, instance_id): """Get full parsed trajectory for one instance.""" if ds_id not in _cache: return jsonify({"error": "Dataset not loaded"}), 404 info = _cache[ds_id] if instance_id not in info["instance_index"]: return jsonify({"error": f"Instance {instance_id} not found"}), 404 idx = info["instance_index"][instance_id] row = info["dataset"][idx] # Parse ATIF trajectory atif = _parse_trajectory(row.get("trajectory", "")) # Parse raw trajectory (OpenAI messages), fall back to agent_output JSONL raw_steps = _parse_trajectory_raw(row.get("trajectory_raw", "")) if not raw_steps and row.get("agent_output"): raw_steps = _parse_agent_output_jsonl(row["agent_output"]) return jsonify({ "instance_id": instance_id, "resolved": row.get("resolved", False), "reward": row.get("reward", 0), "model": row.get("model", ""), "agent": row.get("agent", ""), "duration_seconds": row.get("duration_seconds", 0), "error": row.get("error", ""), "atif": atif, "raw_steps": raw_steps, "n_atif_steps": len(atif["steps"]), "n_raw_steps": len(raw_steps), }) @bp.route("//instance//raw", methods=["GET"]) def get_instance_raw(ds_id, instance_id): """Get raw logs: agent_stdout, setup_stderr, verifier_report.""" if ds_id not in _cache: return jsonify({"error": "Dataset not loaded"}), 404 info = _cache[ds_id] if instance_id not in info["instance_index"]: return jsonify({"error": f"Instance {instance_id} not found"}), 404 idx = info["instance_index"][instance_id] row = info["dataset"][idx] return jsonify({ "instance_id": instance_id, "agent_stdout": row.get("agent_stdout", ""), "setup_stderr": row.get("setup_stderr", ""), "verifier_report": row.get("verifier_report", ""), "verifier_stdout": row.get("verifier_stdout", ""), }) @bp.route("/", methods=["DELETE"]) def unload_dataset(ds_id): if ds_id in _cache: del _cache[ds_id] return jsonify({"status": "ok"})