| | import json |
| | import hashlib |
| | from flask import Blueprint, request, jsonify |
| | from datasets import load_dataset |
| |
|
| | bp = Blueprint("harbor_datasets", __name__, url_prefix="/api/harbor/datasets") |
| |
|
| | _cache: dict[str, dict] = {} |
| |
|
| |
|
| | def _make_id(repo: str, split: str) -> str: |
| | key = f"{repo}:{split}" |
| | return hashlib.md5(key.encode()).hexdigest()[:12] |
| |
|
| |
|
| | def _parse_trajectory(traj_json: str) -> dict: |
| | """Parse ATIF trajectory JSON into structured steps (v1.2 and v1.5).""" |
| | if not traj_json: |
| | return {"steps": [], "agent_info": {}, "final_metrics": {}} |
| |
|
| | try: |
| | traj = json.loads(traj_json) if isinstance(traj_json, str) else traj_json |
| | except (json.JSONDecodeError, TypeError): |
| | return {"steps": [], "agent_info": {}, "final_metrics": {}} |
| |
|
| | steps = [] |
| | for step in traj.get("steps", []): |
| | parsed = { |
| | "index": len(steps), |
| | "source": step.get("source", "unknown"), |
| | "message": step.get("message", ""), |
| | "timestamp": step.get("timestamp"), |
| | } |
| | if step.get("source") == "agent": |
| | parsed["reasoning"] = step.get("reasoning_content", "") |
| | parsed["tool_calls"] = [] |
| | for tc in step.get("tool_calls", []): |
| | args = tc.get("arguments", {}) |
| | tool_call = { |
| | "function": tc.get("function_name", ""), |
| | "arguments": args, |
| | } |
| | |
| | cmd = args.get("command", "") or args.get("cmd", "") |
| | if cmd: |
| | tool_call["command"] = cmd |
| | parsed["tool_calls"].append(tool_call) |
| |
|
| | obs = step.get("observation", {}) |
| | if obs: |
| | if isinstance(obs, dict) and "results" in obs: |
| | results = obs["results"] |
| | if results: |
| | parsed["observation"] = results[0].get("content", "") if isinstance(results[0], dict) else str(results[0]) |
| | else: |
| | parsed["observation"] = "" |
| | elif isinstance(obs, str): |
| | parsed["observation"] = obs |
| | else: |
| | parsed["observation"] = json.dumps(obs) |
| |
|
| | parsed["metrics"] = step.get("metrics", {}) |
| | elif step.get("source") == "system": |
| | pass |
| | elif step.get("source") == "user": |
| | pass |
| |
|
| | steps.append(parsed) |
| |
|
| | return { |
| | "steps": steps, |
| | "agent_info": traj.get("agent", {}), |
| | "final_metrics": traj.get("final_metrics", {}), |
| | } |
| |
|
| |
|
| | def _parse_trajectory_raw(traj_raw: str) -> list[dict]: |
| | """Parse trajectory_raw into chat-style steps. |
| | |
| | Handles two formats: |
| | 1. Flat list of OpenAI messages |
| | 2. Dict with {info, messages, trajectory_format} (mini-swe-agent format) |
| | """ |
| | if not traj_raw: |
| | return [] |
| |
|
| | try: |
| | parsed = json.loads(traj_raw) if isinstance(traj_raw, str) else traj_raw |
| | except (json.JSONDecodeError, TypeError): |
| | return [] |
| |
|
| | |
| | info = {} |
| | if isinstance(parsed, dict): |
| | info = parsed.get("info", {}) |
| | messages = parsed.get("messages", []) |
| | elif isinstance(parsed, list): |
| | messages = parsed |
| | else: |
| | return [] |
| |
|
| | steps = [] |
| | for i, msg in enumerate(messages): |
| | if not isinstance(msg, dict): |
| | continue |
| |
|
| | role = msg.get("role", "unknown") |
| | content = msg.get("content", "") |
| |
|
| | step = { |
| | "index": i, |
| | "role": role, |
| | "content": content if isinstance(content, str) else json.dumps(content) if content else "", |
| | } |
| |
|
| | |
| | if role == "assistant" and "tool_calls" in msg: |
| | tool_calls = msg["tool_calls"] |
| | step["tool_calls"] = [] |
| | for tc in (tool_calls if isinstance(tool_calls, list) else []): |
| | fn = tc.get("function", {}) |
| | call = { |
| | "id": tc.get("id", ""), |
| | "function": fn.get("name", ""), |
| | "arguments_raw": fn.get("arguments", ""), |
| | } |
| | try: |
| | args = json.loads(fn.get("arguments", "{}")) |
| | call["arguments"] = args |
| | if "command" in args: |
| | call["command"] = args["command"] |
| | except (json.JSONDecodeError, TypeError): |
| | call["arguments"] = {} |
| | step["tool_calls"].append(call) |
| |
|
| | |
| | if role == "tool": |
| | step["tool_call_id"] = msg.get("tool_call_id", "") |
| |
|
| | steps.append(step) |
| |
|
| | |
| | if steps and info: |
| | steps[0]["_info"] = info |
| |
|
| | return steps |
| |
|
| |
|
| | def _parse_agent_output_jsonl(agent_output: str) -> list[dict]: |
| | """Parse Codex-style JSONL agent_output into chat-style steps. |
| | |
| | Codex emits newline-delimited JSON with item.completed events containing |
| | reasoning, agent_message, and command_execution items. Falls back |
| | gracefully if the format is unrecognised. |
| | """ |
| | if not agent_output: |
| | return [] |
| |
|
| | steps: list[dict] = [] |
| | idx = 0 |
| |
|
| | for line in agent_output.strip().split("\n"): |
| | try: |
| | event = json.loads(line) |
| | except (json.JSONDecodeError, TypeError): |
| | continue |
| |
|
| | if event.get("type") != "item.completed": |
| | continue |
| |
|
| | item = event.get("item", {}) |
| | item_type = item.get("type", "") |
| |
|
| | if item_type == "reasoning": |
| | steps.append({ |
| | "index": idx, |
| | "role": "assistant", |
| | "content": item.get("text", ""), |
| | "_reasoning": True, |
| | }) |
| | idx += 1 |
| |
|
| | elif item_type == "agent_message": |
| | steps.append({ |
| | "index": idx, |
| | "role": "assistant", |
| | "content": item.get("text", ""), |
| | }) |
| | idx += 1 |
| |
|
| | elif item_type == "command_execution": |
| | cmd = item.get("command", "") |
| | call_id = item.get("call_id", item.get("id", "")) |
| | |
| | steps.append({ |
| | "index": idx, |
| | "role": "assistant", |
| | "content": "", |
| | "tool_calls": [{ |
| | "id": call_id, |
| | "function": "exec_command", |
| | "arguments_raw": json.dumps({"command": cmd}), |
| | "arguments": {"command": cmd}, |
| | "command": cmd, |
| | }], |
| | }) |
| | idx += 1 |
| | |
| | output = item.get("output", "") or item.get("aggregated_output", "") |
| | exit_code = item.get("exit_code") |
| | status = item.get("status", "") |
| | response_text = output |
| | if exit_code is not None: |
| | header = f"[exit code: {exit_code}]" |
| | if status: |
| | header = f"[{status} — exit code: {exit_code}]" |
| | response_text = f"{header}\n{output}" if output else header |
| | steps.append({ |
| | "index": idx, |
| | "role": "tool", |
| | "content": response_text, |
| | "tool_call_id": call_id, |
| | }) |
| | idx += 1 |
| |
|
| | return steps |
| |
|
| |
|
| | def _build_instance_summary(row: dict) -> dict: |
| | """Build a summary for one instance row.""" |
| | return { |
| | "instance_id": row.get("instance_id", ""), |
| | "resolved": row.get("resolved", False), |
| | "reward": row.get("reward", 0), |
| | "model": row.get("model", ""), |
| | "agent": row.get("agent", ""), |
| | "duration_seconds": row.get("duration_seconds", 0), |
| | "error": row.get("error", ""), |
| | } |
| |
|
| |
|
| | @bp.route("/load", methods=["POST"]) |
| | def load_dataset_endpoint(): |
| | data = request.get_json() |
| | repo = data.get("repo", "").strip() |
| | if not repo: |
| | return jsonify({"error": "repo is required"}), 400 |
| |
|
| | split = data.get("split", "train") |
| |
|
| | try: |
| | ds = load_dataset(repo, split=split) |
| | except Exception as e: |
| | return jsonify({"error": f"Failed to load dataset: {e}"}), 400 |
| |
|
| | ds_id = _make_id(repo, split) |
| |
|
| | |
| | instances = [] |
| | instance_index = {} |
| | for i in range(len(ds)): |
| | row = ds[i] |
| | summary = _build_instance_summary(row) |
| | instances.append(summary) |
| | instance_index[row.get("instance_id", "")] = i |
| |
|
| | _cache[ds_id] = { |
| | "repo": repo, |
| | "split": split, |
| | "dataset": ds, |
| | "instances": instances, |
| | "instance_index": instance_index, |
| | } |
| |
|
| | short_name = repo.rsplit("/", 1)[-1] if "/" in repo else repo |
| |
|
| | return jsonify({ |
| | "id": ds_id, |
| | "repo": repo, |
| | "name": short_name, |
| | "split": split, |
| | "instances": instances, |
| | "n_instances": len(instances), |
| | }) |
| |
|
| |
|
| | @bp.route("/", methods=["GET"]) |
| | def list_datasets(): |
| | result = [] |
| | for ds_id, info in _cache.items(): |
| | result.append({ |
| | "id": ds_id, |
| | "repo": info["repo"], |
| | "name": info["repo"].rsplit("/", 1)[-1] if "/" in info["repo"] else info["repo"], |
| | "split": info["split"], |
| | "n_instances": len(info["instances"]), |
| | "instances": info["instances"], |
| | }) |
| | return jsonify(result) |
| |
|
| |
|
| | @bp.route("/<ds_id>/instances", methods=["GET"]) |
| | def get_instances(ds_id): |
| | if ds_id not in _cache: |
| | return jsonify({"error": "Dataset not loaded"}), 404 |
| | return jsonify(_cache[ds_id]["instances"]) |
| |
|
| |
|
| | @bp.route("/<ds_id>/instance/<path:instance_id>", methods=["GET"]) |
| | def get_instance(ds_id, instance_id): |
| | """Get full parsed trajectory for one instance.""" |
| | if ds_id not in _cache: |
| | return jsonify({"error": "Dataset not loaded"}), 404 |
| |
|
| | info = _cache[ds_id] |
| | if instance_id not in info["instance_index"]: |
| | return jsonify({"error": f"Instance {instance_id} not found"}), 404 |
| |
|
| | idx = info["instance_index"][instance_id] |
| | row = info["dataset"][idx] |
| |
|
| | |
| | atif = _parse_trajectory(row.get("trajectory", "")) |
| |
|
| | |
| | raw_steps = _parse_trajectory_raw(row.get("trajectory_raw", "")) |
| | if not raw_steps and row.get("agent_output"): |
| | raw_steps = _parse_agent_output_jsonl(row["agent_output"]) |
| |
|
| | return jsonify({ |
| | "instance_id": instance_id, |
| | "resolved": row.get("resolved", False), |
| | "reward": row.get("reward", 0), |
| | "model": row.get("model", ""), |
| | "agent": row.get("agent", ""), |
| | "duration_seconds": row.get("duration_seconds", 0), |
| | "error": row.get("error", ""), |
| | "atif": atif, |
| | "raw_steps": raw_steps, |
| | "n_atif_steps": len(atif["steps"]), |
| | "n_raw_steps": len(raw_steps), |
| | }) |
| |
|
| |
|
| | @bp.route("/<ds_id>/instance/<path:instance_id>/raw", methods=["GET"]) |
| | def get_instance_raw(ds_id, instance_id): |
| | """Get raw logs: agent_stdout, setup_stderr, verifier_report.""" |
| | if ds_id not in _cache: |
| | return jsonify({"error": "Dataset not loaded"}), 404 |
| |
|
| | info = _cache[ds_id] |
| | if instance_id not in info["instance_index"]: |
| | return jsonify({"error": f"Instance {instance_id} not found"}), 404 |
| |
|
| | idx = info["instance_index"][instance_id] |
| | row = info["dataset"][idx] |
| |
|
| | return jsonify({ |
| | "instance_id": instance_id, |
| | "agent_stdout": row.get("agent_stdout", ""), |
| | "setup_stderr": row.get("setup_stderr", ""), |
| | "verifier_report": row.get("verifier_report", ""), |
| | "verifier_stdout": row.get("verifier_stdout", ""), |
| | }) |
| |
|
| |
|
| | @bp.route("/<ds_id>", methods=["DELETE"]) |
| | def unload_dataset(ds_id): |
| | if ds_id in _cache: |
| | del _cache[ds_id] |
| | return jsonify({"status": "ok"}) |
| |
|