"""Long Code Arena benchmark adapters (6 project-level tasks). All datasets from: https://huggingface.co/collections/JetBrains-Research/long-code-arena """ from __future__ import annotations import json from typing import Any from adapters import DatasetAdapter # Injected at runtime by _set_helpers() _highlight_code = None _code_offset = None _extract_test_classes = None # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _CODE_TRIM_LIMIT = 50_000 # chars for code / diff fields _DESC_TRIM_LIMIT = 5_000 # chars for description / log fields def _trim(text: str, limit: int, label: str = "Content") -> str: """Return *text* unchanged if short enough, otherwise trim with an explicit marker.""" if len(text) <= limit: return text return ( text[:limit] + f"\n\n--- {label} trimmed: showing {limit:,} of {len(text):,} characters ---" ) _LOG_HEAD_LIMIT = 10_000 # chars budget for head part of CI log _LOG_TAIL_LIMIT = 10_000 # chars budget for tail part of CI log def _trim_head_tail(text: str, label: str = "Content") -> str: """Show first ~10k chars and last ~10k chars (snapped to line boundaries).""" if len(text) <= _LOG_HEAD_LIMIT + _LOG_TAIL_LIMIT: return text # Head: find the last newline within the budget head_end = text.rfind("\n", 0, _LOG_HEAD_LIMIT) if head_end <= 0: head_end = _LOG_HEAD_LIMIT head = text[:head_end] # Tail: find the first newline after the cut point tail_start = text.find("\n", len(text) - _LOG_TAIL_LIMIT) if tail_start < 0 or tail_start >= len(text): tail_start = len(text) - _LOG_TAIL_LIMIT tail = text[tail_start:] total_lines = text.count("\n") + 1 head_lines = head.count("\n") + 1 tail_lines = tail.count("\n") + 1 omitted = total_lines - head_lines - tail_lines return ( head + f"\n\n--- {label} trimmed: showing first {head_lines:,} and last" f" {tail_lines:,} lines ({omitted:,} lines omitted," f" {len(text):,} chars total) ---\n\n" + tail ) def _lca_repo_url(repo_slug: str) -> str: """Convert an LCA-style repo slug to a GitHub URL. LCA datasets use either ``owner__name`` (double underscore) or ``owner/name`` (slash) depending on the task. """ if not repo_slug: return "" # Normalise double-underscore to slash ghname = repo_slug.replace("__", "/", 1) if "__" in repo_slug else repo_slug return f"https://github.com/{ghname}" # --------------------------------------------------------------------------- # LCA Library-Based Code Generation # (HuggingFace: JetBrains-Research/lca-library-based-code-generation) # --------------------------------------------------------------------------- class LCALibCodeGenAdapter(DatasetAdapter): slug = "lca-libcodegen" display_name = "LCA Library-Based Code Gen" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row.get("repo_full_name", str(idx)), "entry_point": row.get("repo_name", f"lca_libgen_{idx}"), "num_inputs": row.get("n_unique_apis", 0), "source": row.get("repo_owner", "LCA"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] reference = row.get("clean_reference", row.get("reference", "")) unique_apis = list(row.get("unique_apis", [])) repo_slug = row.get("repo_full_name", "") return { "idx": idx, "task_id": repo_slug or str(idx), "entry_point": row.get("repo_name", f"lca_libgen_{idx}"), "code": reference, "highlighted_code": _highlight_code(reference), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("repo_owner", "LCA"), "has_ground_truth": False, "has_tasks": False, "description": row.get("instruction", ""), "unique_apis": unique_apis, "n_unique_apis": row.get("n_unique_apis", 0), "repo_url": _lca_repo_url(repo_slug), } # --------------------------------------------------------------------------- # LCA Project-Level Code Completion # (HuggingFace: JetBrains-Research/lca-project-level-code-completion) # --------------------------------------------------------------------------- class LCACodeCompletionAdapter(DatasetAdapter): slug = "lca-codecompletion" display_name = "LCA Project-Level Completion" has_ground_truth = False has_tasks = False def __init__(self, rows: list[dict[str, Any]]): self._rows = rows def problem_count(self) -> int: return len(self._rows) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._rows[idx] completion_file = row.get("completion_file", {}) filename = completion_file.get("filename", "") if isinstance(completion_file, dict) else "" return { "idx": idx, "task_id": row.get("repo", str(idx)), "entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}", "num_inputs": 0, "source": row.get("_context_size", "LCA"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._rows[idx] completion_file = row.get("completion_file", {}) if isinstance(completion_file, dict): filename = completion_file.get("filename", "") content = completion_file.get("content", "") else: filename = "" content = "" completion_lines = row.get("completion_lines", {}) if isinstance(completion_lines, dict): committed = completion_lines.get("committed", []) else: committed = [] lang = "python" if filename: ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" ext_map = { "py": "python", "java": "java", "kt": "kotlin", "js": "javascript", "ts": "typescript", "cpp": "cpp", "c": "c", "go": "go", "rs": "rust", "rb": "ruby", } lang = ext_map.get(ext, "python") repo_slug = row.get("repo", "") commit_hash = row.get("commit_hash", "") repo_url = _lca_repo_url(repo_slug) commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else "" return { "idx": idx, "task_id": repo_slug or str(idx), "entry_point": filename.rsplit("/", 1)[-1] if filename else f"completion_{idx}", "code": content, "highlighted_code": _highlight_code(content, language=lang) if content else "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("_context_size", "LCA"), "has_ground_truth": False, "has_tasks": False, "description": f"File: {filename}\nCommit: {commit_hash[:12]}", "completion_lines_committed": committed, "language": lang, "repo_url": repo_url, "commit_url": commit_url, } # --------------------------------------------------------------------------- # LCA Bug Localization # (HuggingFace: JetBrains-Research/lca-bug-localization) # --------------------------------------------------------------------------- class LCABugLocalizationAdapter(DatasetAdapter): slug = "lca-buglocalization" display_name = "LCA Bug Localization" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row.get("text_id", str(idx)), "entry_point": f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}", "num_inputs": row.get("changed_files_count", 0), "source": row.get("repo_language", "unknown"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] diff = row.get("diff", "") repo_owner = row.get("repo_owner", "") repo_name = row.get("repo_name", "") repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else "" issue_url = row.get("issue_url", "") pull_url = row.get("pull_url", "") return { "idx": idx, "task_id": row.get("text_id", str(idx)), "entry_point": repo or f"bug_{idx}", "code": diff, "highlighted_code": "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("repo_language", "unknown"), "has_ground_truth": False, "has_tasks": False, "description": row.get("issue_title", "") + ("\n\n" + row.get("issue_body", "") if row.get("issue_body") else ""), "patch": diff, "repo": repo, "repo_url": f"https://github.com/{repo}" if repo else "", "issue_url": issue_url, "commit_url": pull_url, } # --------------------------------------------------------------------------- # LCA Commit Message Generation # (HuggingFace: JetBrains-Research/lca-commit-message-generation) # --------------------------------------------------------------------------- class LCACommitMsgGenAdapter(DatasetAdapter): slug = "lca-commitmsg" display_name = "LCA Commit Message Gen" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] mods = row.get("mods", []) n_files = len(mods) if isinstance(mods, list) else 0 return { "idx": idx, "task_id": row.get("hash", str(idx))[:12], "entry_point": row.get("repo", f"commit_{idx}"), "num_inputs": n_files, "source": row.get("license", "LCA")[:20], } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] message = row.get("message", "") mods = row.get("mods", []) # Build a unified diff from all modifications diff_parts = [] if isinstance(mods, list): for mod in mods: if isinstance(mod, dict): old_path = mod.get("old_path", "") new_path = mod.get("new_path", "") mod_diff = mod.get("diff", "") if mod_diff: diff_parts.append( f"diff --git a/{old_path} b/{new_path}\n" f"--- a/{old_path}\n" f"+++ b/{new_path}\n" f"{mod_diff}" ) combined_diff = "\n".join(diff_parts) trimmed_diff = _trim(combined_diff, _CODE_TRIM_LIMIT, "Diff") repo_slug = row.get("repo", "") commit_hash = row.get("hash", "") repo_url = _lca_repo_url(repo_slug) commit_url = f"{repo_url}/commit/{commit_hash}" if repo_url and commit_hash else "" return { "idx": idx, "task_id": (commit_hash or str(idx))[:12], "entry_point": repo_slug or f"commit_{idx}", "code": trimmed_diff, "highlighted_code": "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("license", "LCA")[:20], "has_ground_truth": False, "has_tasks": False, "description": message, "patch": trimmed_diff, "repo": repo_slug, "repo_url": repo_url, "commit_url": commit_url, "commit_hash": commit_hash, } # --------------------------------------------------------------------------- # LCA CI Builds Repair # (HuggingFace: JetBrains-Research/lca-ci-builds-repair) # --------------------------------------------------------------------------- class LCACIRepairAdapter(DatasetAdapter): slug = "lca-cirepair" display_name = "LCA CI Builds Repair" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] repo = f"{row.get('repo_owner', '')}/{row.get('repo_name', '')}" return { "idx": idx, "task_id": str(row.get("id", idx)), "entry_point": repo, "num_inputs": 0, "source": f"difficulty-{row.get('difficulty', '?')}", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] diff = row.get("diff", "") trimmed_diff = _trim(diff, _CODE_TRIM_LIMIT, "Diff") repo_owner = row.get("repo_owner", "") repo_name = row.get("repo_name", "") repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else "" commit_link = row.get("commit_link", "") # Extract log text — can be several MB; trim explicitly logs = row.get("logs", []) log_text = "" if isinstance(logs, list): for entry in logs: if isinstance(entry, dict): step = entry.get("step_name", "") log = entry.get("log", "") log_text += f"=== {step} ===\n{log}\n\n" trimmed_log = _trim_head_tail(log_text, "CI log") return { "idx": idx, "task_id": str(row.get("id", idx)), "entry_point": repo or f"ci_{idx}", "code": trimmed_diff, "highlighted_code": "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": f"difficulty-{row.get('difficulty', '?')}", "has_ground_truth": False, "has_tasks": False, "description": f"Workflow: {row.get('workflow_name', '')}\n" f"Branch: {row.get('head_branch', '')}\n" f"Contributor: {row.get('contributor', '')}\n\n" f"CI Log:\n{trimmed_log}", "patch": trimmed_diff, "repo": repo, "repo_url": f"https://github.com/{repo}" if repo else "", "commit_url": commit_link, } # --------------------------------------------------------------------------- # LCA Module Summarization # (HuggingFace: JetBrains-Research/lca-module-summarization) # --------------------------------------------------------------------------- class LCAModuleSummarizationAdapter(DatasetAdapter): slug = "lca-modulesumm" display_name = "LCA Module Summarization" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row.get("docfile_name", str(idx)), "entry_point": row.get("repo", f"module_{idx}"), "num_inputs": 0, "source": row.get("doc_type", "LCA"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] target_text = row.get("target_text", "") # Code context can be extremely large (up to 23 MB); trim with explicit marker code_context = row.get("relevant_code_context", "") trimmed_code = _trim(code_context, _CODE_TRIM_LIMIT, "Code context") relevant_files = row.get("relevant_code_files", []) if isinstance(relevant_files, str): try: relevant_files = json.loads(relevant_files) except (json.JSONDecodeError, TypeError): relevant_files = [relevant_files] repo_slug = row.get("repo", "") repo_url = _lca_repo_url(repo_slug) trimmed_target = _trim(target_text, _DESC_TRIM_LIMIT, "Target documentation") return { "idx": idx, "task_id": row.get("docfile_name", str(idx)), "entry_point": repo_slug or f"module_{idx}", "code": trimmed_code, "highlighted_code": _highlight_code(trimmed_code) if trimmed_code else "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": row.get("doc_type", "LCA"), "has_ground_truth": False, "has_tasks": False, "description": f"Intent: {row.get('intent', '')}\n\n" f"Doc file: {row.get('path_to_docfile', '')}\n" f"Relevant files: {', '.join(relevant_files) if isinstance(relevant_files, list) else ''}\n\n" f"Target documentation:\n{trimmed_target}", "repo_url": repo_url, }