ml4se-evals-visualization / adapters /long_code_arena.py
egor-bogomolov's picture
Add 13 new benchmark datasets (batches 6-8)
9f85fac
"""Long Code Arena benchmark adapters (6 project-level tasks).
All datasets from: https://huggingface.co/collections/JetBrains-Research/long-code-arena
"""
from __future__ import annotations
import json
from typing import Any
from adapters import DatasetAdapter
# Injected at runtime by _set_helpers()
_highlight_code = None
_code_offset = None
_extract_test_classes = None
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
_CODE_TRIM_LIMIT = 50_000 # chars for code / diff fields
_DESC_TRIM_LIMIT = 5_000 # chars for description / log fields
def _trim(text: str, limit: int, label: str = "Content") -> str:
"""Return *text* unchanged if short enough, otherwise trim with an explicit marker."""
if len(text) <= limit:
return text
return (
text[:limit]
+ f"\n\n--- {label} trimmed: showing {limit:,} of {len(text):,} characters ---"
)
_LOG_HEAD_LIMIT = 10_000 # chars budget for head part of CI log
_LOG_TAIL_LIMIT = 10_000 # chars budget for tail part of CI log
def _trim_head_tail(text: str, label: str = "Content") -> str:
"""Show first ~10k chars and last ~10k chars (snapped to line boundaries)."""
if len(text) <= _LOG_HEAD_LIMIT + _LOG_TAIL_LIMIT:
return text
# Head: find the last newline within the budget
head_end = text.rfind("\n", 0, _LOG_HEAD_LIMIT)
if head_end <= 0:
head_end = _LOG_HEAD_LIMIT
head = text[:head_end]
# Tail: find the first newline after the cut point
tail_start = text.find("\n", len(text) - _LOG_TAIL_LIMIT)
if tail_start < 0 or tail_start >= len(text):
tail_start = len(text) - _LOG_TAIL_LIMIT
tail = text[tail_start:]
total_lines = text.count("\n") + 1
head_lines = head.count("\n") + 1
tail_lines = tail.count("\n") + 1
omitted = total_lines - head_lines - tail_lines
return (
head
+ f"\n\n--- {label} trimmed: showing first {head_lines:,} and last"
f" {tail_lines:,} lines ({omitted:,} lines omitted,"
f" {len(text):,} chars total) ---\n\n"
+ tail
)
def _lca_repo_url(repo_slug: str) -> str:
"""Convert an LCA-style repo slug to a GitHub URL.
LCA datasets use either ``owner__name`` (double underscore) or
``owner/name`` (slash) depending on the task.
"""
if not repo_slug:
return ""
# Normalise double-underscore to slash
ghname = repo_slug.replace("__", "/", 1) if "__" in repo_slug else repo_slug
return f"https://github.com/{ghname}"
# ---------------------------------------------------------------------------
# LCA Library-Based Code Generation
# (HuggingFace: JetBrains-Research/lca-library-based-code-generation)
# ---------------------------------------------------------------------------
class LCALibCodeGenAdapter(DatasetAdapter):
slug = "lca-libcodegen"
display_name = "LCA Library-Based Code Gen"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row.get("repo_full_name", str(idx)),
"entry_point": row.get("repo_name", f"lca_libgen_{idx}"),
"num_inputs": row.get("n_unique_apis", 0),
"source": row.get("repo_owner", "LCA"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
reference = row.get("clean_reference", row.get("reference", ""))
unique_apis = list(row.get("unique_apis", []))
repo_slug = row.get("repo_full_name", "")
return {
"idx": idx,
"task_id": repo_slug or str(idx),
"entry_point": row.get("repo_name", f"lca_libgen_{idx}"),
"code": reference,
"highlighted_code": _highlight_code(reference),
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": row.get("repo_owner", "LCA"),
"has_ground_truth": False,
"has_tasks": False,
"description": row.get("instruction", ""),
"unique_apis": unique_apis,
"n_unique_apis": row.get("n_unique_apis", 0),
"repo_url": _lca_repo_url(repo_slug),
}
# ---------------------------------------------------------------------------
# LCA Project-Level Code Completion
# (HuggingFace: JetBrains-Research/lca-project-level-code-completion)
# ---------------------------------------------------------------------------
class LCACodeCompletionAdapter(DatasetAdapter):
slug = "lca-codecompletion"
display_name = "LCA Project-Level Completion"
has_ground_truth = False
has_tasks = False
def __init__(self, rows: list[dict[str, Any]]):
self._rows = rows
def problem_count(self) -> int:
return len(self._rows)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._rows[idx]
completion_file = row.get("completion_file", {})
filename = completion_file.get("filename", "") if isinstance(completion_file, dict) else ""
return {
"idx": idx,
"task_id": row.get("repo", str(idx)),
"entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}",
"num_inputs": 0,
"source": row.get("_context_size", "LCA"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._rows[idx]
completion_file = row.get("completion_file", {})
if isinstance(completion_file, dict):
filename = completion_file.get("filename", "")
content = completion_file.get("content", "")
else:
filename = ""
content = ""
completion_lines = row.get("completion_lines", {})
if isinstance(completion_lines, dict):
committed = completion_lines.get("committed", [])
else:
committed = []
lang = "python"
if filename:
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
ext_map = {
"py": "python",
"java": "java",
"kt": "kotlin",
"js": "javascript",
"ts": "typescript",
"cpp": "cpp",
"c": "c",
"go": "go",
"rs": "rust",
"rb": "ruby",
}
lang = ext_map.get(ext, "python")
repo_slug = row.get("repo", "")
commit_hash = row.get("commit_hash", "")
repo_url = _lca_repo_url(repo_slug)
commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else ""
return {
"idx": idx,
"task_id": repo_slug or str(idx),
"entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}",
"code": content,
"highlighted_code": _highlight_code(content, language=lang) if content else "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": row.get("_context_size", "LCA"),
"has_ground_truth": False,
"has_tasks": False,
"description": f"File: {filename}\nCommit: {commit_hash[:12]}",
"completion_lines_committed": committed,
"language": lang,
"repo_url": repo_url,
"commit_url": commit_url,
}
# ---------------------------------------------------------------------------
# LCA Bug Localization
# (HuggingFace: JetBrains-Research/lca-bug-localization)
# ---------------------------------------------------------------------------
class LCABugLocalizationAdapter(DatasetAdapter):
slug = "lca-buglocalization"
display_name = "LCA Bug Localization"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row.get("text_id", str(idx)),
"entry_point": f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}",
"num_inputs": row.get("changed_files_count", 0),
"source": row.get("repo_language", "unknown"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
diff = row.get("diff", "")
repo_owner = row.get("repo_owner", "")
repo_name = row.get("repo_name", "")
repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else ""
issue_url = row.get("issue_url", "")
pull_url = row.get("pull_url", "")
return {
"idx": idx,
"task_id": row.get("text_id", str(idx)),
"entry_point": repo or f"bug_{idx}",
"code": diff,
"highlighted_code": "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": row.get("repo_language", "unknown"),
"has_ground_truth": False,
"has_tasks": False,
"description": row.get("issue_title", "")
+ ("\n\n" + row.get("issue_body", "") if row.get("issue_body") else ""),
"patch": diff,
"repo": repo,
"repo_url": f"https://github.com/{repo}" if repo else "",
"issue_url": issue_url,
"commit_url": pull_url,
}
# ---------------------------------------------------------------------------
# LCA Commit Message Generation
# (HuggingFace: JetBrains-Research/lca-commit-message-generation)
# ---------------------------------------------------------------------------
class LCACommitMsgGenAdapter(DatasetAdapter):
slug = "lca-commitmsg"
display_name = "LCA Commit Message Gen"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
mods = row.get("mods", [])
n_files = len(mods) if isinstance(mods, list) else 0
return {
"idx": idx,
"task_id": row.get("hash", str(idx))[:12],
"entry_point": row.get("repo", f"commit_{idx}"),
"num_inputs": n_files,
"source": row.get("license", "LCA")[:20],
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
message = row.get("message", "")
mods = row.get("mods", [])
# Build a unified diff from all modifications
diff_parts = []
if isinstance(mods, list):
for mod in mods:
if isinstance(mod, dict):
old_path = mod.get("old_path", "")
new_path = mod.get("new_path", "")
mod_diff = mod.get("diff", "")
if mod_diff:
diff_parts.append(
f"diff --git a/{old_path} b/{new_path}\n"
f"--- a/{old_path}\n"
f"+++ b/{new_path}\n"
f"{mod_diff}"
)
combined_diff = "\n".join(diff_parts)
trimmed_diff = _trim(combined_diff, _CODE_TRIM_LIMIT, "Diff")
repo_slug = row.get("repo", "")
commit_hash = row.get("hash", "")
repo_url = _lca_repo_url(repo_slug)
commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else ""
return {
"idx": idx,
"task_id": (commit_hash or str(idx))[:12],
"entry_point": repo_slug or f"commit_{idx}",
"code": trimmed_diff,
"highlighted_code": "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": row.get("license", "LCA")[:20],
"has_ground_truth": False,
"has_tasks": False,
"description": message,
"patch": trimmed_diff,
"repo": repo_slug,
"repo_url": repo_url,
"commit_url": commit_url,
"commit_hash": commit_hash,
}
# ---------------------------------------------------------------------------
# LCA CI Builds Repair
# (HuggingFace: JetBrains-Research/lca-ci-builds-repair)
# ---------------------------------------------------------------------------
class LCACIRepairAdapter(DatasetAdapter):
slug = "lca-cirepair"
display_name = "LCA CI Builds Repair"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
repo = f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}"
return {
"idx": idx,
"task_id": str(row.get("id", idx)),
"entry_point": repo,
"num_inputs": 0,
"source": f"difficulty-{row.get('difficulty', '?')}",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
diff = row.get("diff", "")
trimmed_diff = _trim(diff, _CODE_TRIM_LIMIT, "Diff")
repo_owner = row.get("repo_owner", "")
repo_name = row.get("repo_name", "")
repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else ""
commit_link = row.get("commit_link", "")
# Extract log text — can be several MB; trim explicitly
logs = row.get("logs", [])
log_text = ""
if isinstance(logs, list):
for entry in logs:
if isinstance(entry, dict):
step = entry.get("step_name", "")
log = entry.get("log", "")
log_text += f"=== {step} ===\n{log}\n\n"
trimmed_log = _trim_head_tail(log_text, "CI log")
return {
"idx": idx,
"task_id": str(row.get("id", idx)),
"entry_point": repo or f"ci_{idx}",
"code": trimmed_diff,
"highlighted_code": "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": f"difficulty-{row.get('difficulty', '?')}",
"has_ground_truth": False,
"has_tasks": False,
"description": f"Workflow: {row.get('workflow_name', '')}\n"
f"Branch: {row.get('head_branch', '')}\n"
f"Contributor: {row.get('contributor', '')}\n\n"
f"CI Log:\n{trimmed_log}",
"patch": trimmed_diff,
"repo": repo,
"repo_url": f"https://github.com/{repo}" if repo else "",
"commit_url": commit_link,
}
# ---------------------------------------------------------------------------
# LCA Module Summarization
# (HuggingFace: JetBrains-Research/lca-module-summarization)
# ---------------------------------------------------------------------------
class LCAModuleSummarizationAdapter(DatasetAdapter):
slug = "lca-modulesumm"
display_name = "LCA Module Summarization"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row.get("docfile_name", str(idx)),
"entry_point": row.get("repo", f"module_{idx}"),
"num_inputs": 0,
"source": row.get("doc_type", "LCA"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
target_text = row.get("target_text", "")
# Code context can be extremely large (up to 23 MB); trim with explicit marker
code_context = row.get("relevant_code_context", "")
trimmed_code = _trim(code_context, _CODE_TRIM_LIMIT, "Code context")
relevant_files = row.get("relevant_code_files", [])
if isinstance(relevant_files, str):
try:
relevant_files = json.loads(relevant_files)
except (json.JSONDecodeError, TypeError):
relevant_files = [relevant_files]
repo_slug = row.get("repo", "")
repo_url = _lca_repo_url(repo_slug)
trimmed_target = _trim(target_text, _DESC_TRIM_LIMIT, "Target documentation")
return {
"idx": idx,
"task_id": row.get("docfile_name", str(idx)),
"entry_point": repo_slug or f"module_{idx}",
"code": trimmed_code,
"highlighted_code": _highlight_code(trimmed_code) if trimmed_code else "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": row.get("doc_type", "LCA"),
"has_ground_truth": False,
"has_tasks": False,
"description": f"Intent: {row.get('intent', '')}\n\n"
f"Doc file: {row.get('path_to_docfile', '')}\n"
f"Relevant files: {', '.join(relevant_files) if isinstance(relevant_files, list) else ''}\n\n"
f"Target documentation:\n{trimmed_target}",
"repo_url": repo_url,
}