| | """Long Code Arena benchmark adapters (6 project-level tasks). |
| | |
| | All datasets from: https://huggingface.co/collections/JetBrains-Research/long-code-arena |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | from typing import Any |
| |
|
| | from adapters import DatasetAdapter |
| |
|
| | |
| | _highlight_code = None |
| | _code_offset = None |
| | _extract_test_classes = None |
| |
|
| | |
| | |
| | |
| |
|
| | _CODE_TRIM_LIMIT = 50_000 |
| | _DESC_TRIM_LIMIT = 5_000 |
| |
|
| |
|
| | def _trim(text: str, limit: int, label: str = "Content") -> str: |
| | """Return *text* unchanged if short enough, otherwise trim with an explicit marker.""" |
| | if len(text) <= limit: |
| | return text |
| | return ( |
| | text[:limit] |
| | + f"\n\n--- {label} trimmed: showing {limit:,} of {len(text):,} characters ---" |
| | ) |
| |
|
| |
|
| | _LOG_HEAD_LIMIT = 10_000 |
| | _LOG_TAIL_LIMIT = 10_000 |
| |
|
| |
|
| | def _trim_head_tail(text: str, label: str = "Content") -> str: |
| | """Show first ~10k chars and last ~10k chars (snapped to line boundaries).""" |
| | if len(text) <= _LOG_HEAD_LIMIT + _LOG_TAIL_LIMIT: |
| | return text |
| |
|
| | |
| | head_end = text.rfind("\n", 0, _LOG_HEAD_LIMIT) |
| | if head_end <= 0: |
| | head_end = _LOG_HEAD_LIMIT |
| | head = text[:head_end] |
| |
|
| | |
| | tail_start = text.find("\n", len(text) - _LOG_TAIL_LIMIT) |
| | if tail_start < 0 or tail_start >= len(text): |
| | tail_start = len(text) - _LOG_TAIL_LIMIT |
| | tail = text[tail_start:] |
| |
|
| | total_lines = text.count("\n") + 1 |
| | head_lines = head.count("\n") + 1 |
| | tail_lines = tail.count("\n") + 1 |
| | omitted = total_lines - head_lines - tail_lines |
| |
|
| | return ( |
| | head |
| | + f"\n\n--- {label} trimmed: showing first {head_lines:,} and last" |
| | f" {tail_lines:,} lines ({omitted:,} lines omitted," |
| | f" {len(text):,} chars total) ---\n\n" |
| | + tail |
| | ) |
| |
|
| |
|
| | def _lca_repo_url(repo_slug: str) -> str: |
| | """Convert an LCA-style repo slug to a GitHub URL. |
| | |
| | LCA datasets use either ``owner__name`` (double underscore) or |
| | ``owner/name`` (slash) depending on the task. |
| | """ |
| | if not repo_slug: |
| | return "" |
| | |
| | ghname = repo_slug.replace("__", "/", 1) if "__" in repo_slug else repo_slug |
| | return f"https://github.com/{ghname}" |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCALibCodeGenAdapter(DatasetAdapter): |
| | slug = "lca-libcodegen" |
| | display_name = "LCA Library-Based Code Gen" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("repo_full_name", str(idx)), |
| | "entry_point": row.get("repo_name", f"lca_libgen_{idx}"), |
| | "num_inputs": row.get("n_unique_apis", 0), |
| | "source": row.get("repo_owner", "LCA"), |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | reference = row.get("clean_reference", row.get("reference", "")) |
| | unique_apis = list(row.get("unique_apis", [])) |
| | repo_slug = row.get("repo_full_name", "") |
| | return { |
| | "idx": idx, |
| | "task_id": repo_slug or str(idx), |
| | "entry_point": row.get("repo_name", f"lca_libgen_{idx}"), |
| | "code": reference, |
| | "highlighted_code": _highlight_code(reference), |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("repo_owner", "LCA"), |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("instruction", ""), |
| | "unique_apis": unique_apis, |
| | "n_unique_apis": row.get("n_unique_apis", 0), |
| | "repo_url": _lca_repo_url(repo_slug), |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCACodeCompletionAdapter(DatasetAdapter): |
| | slug = "lca-codecompletion" |
| | display_name = "LCA Project-Level Completion" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, rows: list[dict[str, Any]]): |
| | self._rows = rows |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._rows) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._rows[idx] |
| | completion_file = row.get("completion_file", {}) |
| | filename = completion_file.get("filename", "") if isinstance(completion_file, dict) else "" |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("repo", str(idx)), |
| | "entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}", |
| | "num_inputs": 0, |
| | "source": row.get("_context_size", "LCA"), |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._rows[idx] |
| | completion_file = row.get("completion_file", {}) |
| | if isinstance(completion_file, dict): |
| | filename = completion_file.get("filename", "") |
| | content = completion_file.get("content", "") |
| | else: |
| | filename = "" |
| | content = "" |
| |
|
| | completion_lines = row.get("completion_lines", {}) |
| | if isinstance(completion_lines, dict): |
| | committed = completion_lines.get("committed", []) |
| | else: |
| | committed = [] |
| |
|
| | lang = "python" |
| | if filename: |
| | ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" |
| | ext_map = { |
| | "py": "python", |
| | "java": "java", |
| | "kt": "kotlin", |
| | "js": "javascript", |
| | "ts": "typescript", |
| | "cpp": "cpp", |
| | "c": "c", |
| | "go": "go", |
| | "rs": "rust", |
| | "rb": "ruby", |
| | } |
| | lang = ext_map.get(ext, "python") |
| |
|
| | repo_slug = row.get("repo", "") |
| | commit_hash = row.get("commit_hash", "") |
| | repo_url = _lca_repo_url(repo_slug) |
| | commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else "" |
| |
|
| | return { |
| | "idx": idx, |
| | "task_id": repo_slug or str(idx), |
| | "entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}", |
| | "code": content, |
| | "highlighted_code": _highlight_code(content, language=lang) if content else "", |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("_context_size", "LCA"), |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": f"File: {filename}\nCommit: {commit_hash[:12]}", |
| | "completion_lines_committed": committed, |
| | "language": lang, |
| | "repo_url": repo_url, |
| | "commit_url": commit_url, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCABugLocalizationAdapter(DatasetAdapter): |
| | slug = "lca-buglocalization" |
| | display_name = "LCA Bug Localization" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("text_id", str(idx)), |
| | "entry_point": f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}", |
| | "num_inputs": row.get("changed_files_count", 0), |
| | "source": row.get("repo_language", "unknown"), |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | diff = row.get("diff", "") |
| | repo_owner = row.get("repo_owner", "") |
| | repo_name = row.get("repo_name", "") |
| | repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else "" |
| | issue_url = row.get("issue_url", "") |
| | pull_url = row.get("pull_url", "") |
| |
|
| | return { |
| | "idx": idx, |
| | "task_id": row.get("text_id", str(idx)), |
| | "entry_point": repo or f"bug_{idx}", |
| | "code": diff, |
| | "highlighted_code": "", |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("repo_language", "unknown"), |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": row.get("issue_title", "") |
| | + ("\n\n" + row.get("issue_body", "") if row.get("issue_body") else ""), |
| | "patch": diff, |
| | "repo": repo, |
| | "repo_url": f"https://github.com/{repo}" if repo else "", |
| | "issue_url": issue_url, |
| | "commit_url": pull_url, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCACommitMsgGenAdapter(DatasetAdapter): |
| | slug = "lca-commitmsg" |
| | display_name = "LCA Commit Message Gen" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | mods = row.get("mods", []) |
| | n_files = len(mods) if isinstance(mods, list) else 0 |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("hash", str(idx))[:12], |
| | "entry_point": row.get("repo", f"commit_{idx}"), |
| | "num_inputs": n_files, |
| | "source": row.get("license", "LCA")[:20], |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | message = row.get("message", "") |
| | mods = row.get("mods", []) |
| |
|
| | |
| | diff_parts = [] |
| | if isinstance(mods, list): |
| | for mod in mods: |
| | if isinstance(mod, dict): |
| | old_path = mod.get("old_path", "") |
| | new_path = mod.get("new_path", "") |
| | mod_diff = mod.get("diff", "") |
| | if mod_diff: |
| | diff_parts.append( |
| | f"diff --git a/{old_path} b/{new_path}\n" |
| | f"--- a/{old_path}\n" |
| | f"+++ b/{new_path}\n" |
| | f"{mod_diff}" |
| | ) |
| | combined_diff = "\n".join(diff_parts) |
| | trimmed_diff = _trim(combined_diff, _CODE_TRIM_LIMIT, "Diff") |
| |
|
| | repo_slug = row.get("repo", "") |
| | commit_hash = row.get("hash", "") |
| | repo_url = _lca_repo_url(repo_slug) |
| | commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else "" |
| |
|
| | return { |
| | "idx": idx, |
| | "task_id": (commit_hash or str(idx))[:12], |
| | "entry_point": repo_slug or f"commit_{idx}", |
| | "code": trimmed_diff, |
| | "highlighted_code": "", |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("license", "LCA")[:20], |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": message, |
| | "patch": trimmed_diff, |
| | "repo": repo_slug, |
| | "repo_url": repo_url, |
| | "commit_url": commit_url, |
| | "commit_hash": commit_hash, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCACIRepairAdapter(DatasetAdapter): |
| | slug = "lca-cirepair" |
| | display_name = "LCA CI Builds Repair" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | repo = f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}" |
| | return { |
| | "idx": idx, |
| | "task_id": str(row.get("id", idx)), |
| | "entry_point": repo, |
| | "num_inputs": 0, |
| | "source": f"difficulty-{row.get('difficulty', '?')}", |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | diff = row.get("diff", "") |
| | trimmed_diff = _trim(diff, _CODE_TRIM_LIMIT, "Diff") |
| | repo_owner = row.get("repo_owner", "") |
| | repo_name = row.get("repo_name", "") |
| | repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else "" |
| | commit_link = row.get("commit_link", "") |
| |
|
| | |
| | logs = row.get("logs", []) |
| | log_text = "" |
| | if isinstance(logs, list): |
| | for entry in logs: |
| | if isinstance(entry, dict): |
| | step = entry.get("step_name", "") |
| | log = entry.get("log", "") |
| | log_text += f"=== {step} ===\n{log}\n\n" |
| | trimmed_log = _trim_head_tail(log_text, "CI log") |
| |
|
| | return { |
| | "idx": idx, |
| | "task_id": str(row.get("id", idx)), |
| | "entry_point": repo or f"ci_{idx}", |
| | "code": trimmed_diff, |
| | "highlighted_code": "", |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": f"difficulty-{row.get('difficulty', '?')}", |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": f"Workflow: {row.get('workflow_name', '')}\n" |
| | f"Branch: {row.get('head_branch', '')}\n" |
| | f"Contributor: {row.get('contributor', '')}\n\n" |
| | f"CI Log:\n{trimmed_log}", |
| | "patch": trimmed_diff, |
| | "repo": repo, |
| | "repo_url": f"https://github.com/{repo}" if repo else "", |
| | "commit_url": commit_link, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class LCAModuleSummarizationAdapter(DatasetAdapter): |
| | slug = "lca-modulesumm" |
| | display_name = "LCA Module Summarization" |
| | has_ground_truth = False |
| | has_tasks = False |
| |
|
| | def __init__(self, hf_dataset): |
| | self._ds = hf_dataset |
| |
|
| | def problem_count(self) -> int: |
| | return len(self._ds) |
| |
|
| | def get_problem_summary(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | return { |
| | "idx": idx, |
| | "task_id": row.get("docfile_name", str(idx)), |
| | "entry_point": row.get("repo", f"module_{idx}"), |
| | "num_inputs": 0, |
| | "source": row.get("doc_type", "LCA"), |
| | } |
| |
|
| | def get_problem_detail(self, idx: int) -> dict[str, Any]: |
| | row = self._ds[idx] |
| | target_text = row.get("target_text", "") |
| | |
| | code_context = row.get("relevant_code_context", "") |
| | trimmed_code = _trim(code_context, _CODE_TRIM_LIMIT, "Code context") |
| |
|
| | relevant_files = row.get("relevant_code_files", []) |
| | if isinstance(relevant_files, str): |
| | try: |
| | relevant_files = json.loads(relevant_files) |
| | except (json.JSONDecodeError, TypeError): |
| | relevant_files = [relevant_files] |
| |
|
| | repo_slug = row.get("repo", "") |
| | repo_url = _lca_repo_url(repo_slug) |
| | trimmed_target = _trim(target_text, _DESC_TRIM_LIMIT, "Target documentation") |
| |
|
| | return { |
| | "idx": idx, |
| | "task_id": row.get("docfile_name", str(idx)), |
| | "entry_point": repo_slug or f"module_{idx}", |
| | "code": trimmed_code, |
| | "highlighted_code": _highlight_code(trimmed_code) if trimmed_code else "", |
| | "inputs": [], |
| | "outputs": [], |
| | "test": None, |
| | "tasks": [], |
| | "source": row.get("doc_type", "LCA"), |
| | "has_ground_truth": False, |
| | "has_tasks": False, |
| | "description": f"Intent: {row.get('intent', '')}\n\n" |
| | f"Doc file: {row.get('path_to_docfile', '')}\n" |
| | f"Relevant files: {', '.join(relevant_files) if isinstance(relevant_files, list) else ''}\n\n" |
| | f"Target documentation:\n{trimmed_target}", |
| | "repo_url": repo_url, |
| | } |
| |
|