BioDesignBench-Leaderboard / eval_dispatcher.py
Jasonkim8652's picture
Phase X: contamination-safe submission flow (we host the agent)
8476db7 verified
"""In-process task dispatcher for contamination-safe submissions.
Replaces the previous HTTP-endpoint dispatcher: instead of POSTing each
task to a submitter-hosted endpoint (which leaked task content), this
runs the agent loop here in the leaderboard backend using:
- the submitter's LLM provider + API key (transient, scrubbed after run)
- the reference protein-design-mcp endpoint (PROTEIN_MCP_URL secret)
or, if the submitter opted in, their own custom MCP URL
For each of the 73 hidden tasks:
1. Build the task payload (now includes a per-submission canary token).
2. Run the agent loop in process via eval_agent.run_agent_on_task().
3. Compute CPU-side scores (approach, orchestration, feasibility,
novelty, diversity); quality is deferred to the Boltz post-eval.
4. Save the per-task result back to the submission queue.
"""
from __future__ import annotations
import logging
import os
import time
from typing import Any, Generator
logger = logging.getLogger(__name__)
# Sequence/log limits (reused from the old HTTP validator)
MAX_SEQUENCES = 50
MAX_SEQUENCE_LENGTH = 2000
MAX_LOG_ENTRIES = 200
DISPATCH_TIMEOUT = 600 # per-task agent-loop budget (seconds)
def _validate_agent_output(result: dict[str, Any]) -> tuple[bool, str]:
"""Sanity-check the result returned by eval_agent.run_agent_on_task."""
if not isinstance(result, dict):
return False, "agent result must be a dict"
if not result.get("success"):
return False, result.get("error", "agent loop reported failure")
sequences = result.get("sequences") or []
if not isinstance(sequences, list):
return False, "sequences must be a list"
if len(sequences) > MAX_SEQUENCES:
return False, f"too many sequences: {len(sequences)} > {MAX_SEQUENCES}"
for i, s in enumerate(sequences):
if not isinstance(s, str):
return False, f"sequences[{i}] must be a string"
if not s:
return False, f"sequences[{i}] is empty"
if len(s) > MAX_SEQUENCE_LENGTH:
return False, f"sequences[{i}] too long: {len(s)} > {MAX_SEQUENCE_LENGTH}"
run_log = result.get("run_log") or []
if not isinstance(run_log, list):
return False, "run_log must be a list"
if len(run_log) > MAX_LOG_ENTRIES:
return False, f"too many run_log entries: {len(run_log)} > {MAX_LOG_ENTRIES}"
return True, ""
def _resolve_mcp(submission: dict[str, Any]) -> tuple[str, str]:
"""Pick the MCP endpoint for this submission.
Custom MCP URL takes priority if the submitter opted in; otherwise
we fall back to the lab-hosted reference protein-design-mcp at
PROTEIN_MCP_URL (set as an HF Space secret).
"""
custom_url = (submission.get("custom_mcp_url") or "").strip()
if custom_url:
return custom_url, (submission.get("custom_mcp_token") or "").strip()
return (
os.environ.get("PROTEIN_MCP_URL", "").strip(),
os.environ.get("PROTEIN_MCP_TOKEN", "").strip(),
)
def score_cpu_components(
task_id: str,
sequences: list[str],
run_log: list[dict[str, Any]],
ground_truth: dict[str, Any],
oracle_sequences: list[str] | None = None,
) -> dict[str, Any]:
"""Run CPU-only scoring components for one task.
Quality is deferred until the Boltz post-eval supplies pLDDT/ipTM
metrics; the other 5 components are computed here.
"""
from eval_scorer import (
get_category,
score_approach,
score_diversity,
score_feasibility,
score_novelty,
score_orchestration,
)
thresholds = ground_truth.get("thresholds", {})
reference_seq = ground_truth.get("reference_sequence")
constraints = ground_truth.get("design_constraints", {})
tools_expected = ground_truth.get("tools_expected", [])
max_designs = ground_truth.get("max_designs", 10)
cat = get_category(task_id)
task_type = cat.task_type if cat else None
tools_used = [e.get("tool", "") for e in run_log if e.get("tool")]
approach_result = score_approach(
tools_used=tools_used, tools_expected=tools_expected, task_type=task_type,
)
orchestration_result = score_orchestration(
tool_call_log=run_log, task_id=task_id,
)
feasibility_result = score_feasibility(designs=sequences, constraints=constraints)
novelty_result = score_novelty(
designs=sequences, reference_seq=reference_seq, thresholds=thresholds,
)
diversity_result = score_diversity(designs=sequences, max_designs=max_designs)
return {
"task_id": task_id,
"num_designs": len(sequences),
"sequences": sequences,
"cpu_scores": {
"approach": approach_result["score"],
"orchestration": orchestration_result["score"],
"feasibility": feasibility_result["score"],
"novelty": novelty_result["score"],
"diversity": diversity_result["score"],
},
"cpu_details": {
"approach": approach_result,
"orchestration": orchestration_result,
"feasibility": feasibility_result,
"novelty": novelty_result,
"diversity": diversity_result,
},
"quality_pending": True,
"oracle_sequences": oracle_sequences or [],
"ground_truth_thresholds": thresholds,
}
def dispatch_all_tasks(
submission_id: str,
progress_callback=None,
) -> list[dict[str, Any]]:
"""Run the agent loop on every hidden task for one submission.
Loads the submission record (including the transient api_key),
picks the MCP endpoint, runs eval_agent.run_agent_on_task() per
task, scores the CPU components, and persists each per-task result
back into the submission record. Scrubs the api_key and custom MCP
token from the record at the end.
"""
from eval_agent import run_agent_on_task
from eval_queue import (
get_submission, save_task_result, scrub_credentials, update_status,
)
from eval_tasks import build_task_payload, get_hidden_task_ids, get_task
sub = get_submission(submission_id)
if sub is None:
logger.error(f"dispatch_all_tasks: submission {submission_id} not found")
return []
api_key = (sub.get("api_key") or "").strip()
if not api_key:
update_status(submission_id, "failed",
error_message="api_key missing or already scrubbed")
return []
provider = sub.get("provider") or ""
model = sub.get("model_name") or ""
canary_token = sub.get("canary_token") or ""
mcp_url, mcp_token = _resolve_mcp(sub)
task_ids = get_hidden_task_ids()
total = len(task_ids)
update_status(submission_id, "dispatching", tasks_total=total)
results: list[dict[str, Any]] = []
try:
for i, task_id in enumerate(task_ids):
payload = build_task_payload(task_id, canary_token=canary_token)
if payload is None:
results.append({
"task_id": task_id, "success": False, "error": "Task not found",
})
save_task_result(submission_id, task_id, results[-1])
continue
t0 = time.monotonic()
try:
agent_result = run_agent_on_task(
provider=provider,
api_key=api_key,
model=model,
task_prompt=payload["task_description"],
mcp_url=mcp_url,
mcp_token=mcp_token,
)
except Exception as e:
logger.exception(f"agent loop crashed for task {task_id}")
agent_result = {
"success": False,
"error": f"agent loop crashed: {type(e).__name__}: {str(e)[:300]}",
}
latency = round(time.monotonic() - t0, 1)
ok, err = _validate_agent_output(agent_result)
if not ok:
result = {
"task_id": task_id, "success": False, "error": err,
"latency_sec": latency,
}
results.append(result)
save_task_result(submission_id, task_id, result)
else:
task_data = get_task(task_id) or {}
ground_truth = task_data.get("ground_truth", {}) if task_data else {}
oracle_seqs = task_data.get("oracle_sequences", []) if task_data else []
cpu_result = score_cpu_components(
task_id=task_id,
sequences=agent_result["sequences"],
run_log=agent_result["run_log"],
ground_truth=ground_truth,
oracle_sequences=oracle_seqs,
)
cpu_result["latency_sec"] = latency
cpu_result["success"] = True
cpu_result["agent_metrics"] = agent_result.get("metrics", {})
cpu_result["agent_total_steps"] = agent_result.get("total_steps", 0)
results.append(cpu_result)
save_task_result(submission_id, task_id, cpu_result)
if progress_callback:
progress_callback(task_id, i + 1, total, results[-1])
logger.info(
f"[{i+1}/{total}] {task_id}: "
f"{'OK' if results[-1].get('success') else 'FAIL'} "
f"({results[-1].get('latency_sec', 0):.1f}s)"
)
finally:
# Always scrub credentials, regardless of success/failure
scrub_credentials(submission_id)
return results