Spaces:
Running
Running
| """ | |
| Unified Task Manager: Abstractly load tasks from both local and SWE-bench datasets. | |
| This module provides a single interface to load tasks from: | |
| 1. Local hardcoded dataset (dataset/problem_1, problem_10, etc.) | |
| 2. SWE-bench Lite (if available and configured) | |
| Configuration via environment variables: | |
| TASK_SOURCE "local" | "swebench" | "auto" (default: "auto") | |
| SWEBENCH_FALLBACK "1" (enable fallback when SWE-bench fails, default: "1") | |
| SWEBENCH_TASKS_ROOT Path to SWE-bench tasks directory | |
| SWEBENCH_INDEX Preferred task index within difficulty band | |
| """ | |
| import os | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, Literal | |
| from rl_code_fix_env.dataset.loader import get_hardcoded_task | |
| from rl_code_fix_env.dataset.swebench_adapter import get_swebench_task | |
| logger = logging.getLogger(__name__) | |
| TaskSource = Literal["local", "swebench", "auto"] | |
| Difficulty = Literal["easy", "medium", "hard"] | |
| class TaskLoadError(Exception): | |
| """Raised when task loading fails.""" | |
| pass | |
| class TaskManager: | |
| """ | |
| Unified interface for loading tasks from any dataset. | |
| Handles fallback logic, logging, and error recovery. | |
| """ | |
| def __init__(self, source: Optional[TaskSource] = None): | |
| """ | |
| Initialize TaskManager. | |
| Args: | |
| source: "local", "swebench", or "auto" (tries swebench first, falls back to local) | |
| If None, reads from TASK_SOURCE env var (default: "auto") | |
| """ | |
| self.source = (source or os.getenv("TASK_SOURCE", "auto")).strip().lower() | |
| self.enable_fallback = ( | |
| os.getenv("SWEBENCH_FALLBACK", "1").strip().lower() in {"1", "true", "yes"} | |
| ) | |
| if self.source not in {"local", "swebench", "auto"}: | |
| raise ValueError( | |
| f"Invalid TASK_SOURCE='{self.source}'. " | |
| f"Must be one of: local, swebench, auto" | |
| ) | |
| logger.info( | |
| f"TaskManager initialized: source={self.source}, " | |
| f"fallback_enabled={self.enable_fallback}" | |
| ) | |
| def load_task(self, difficulty: Difficulty) -> Dict[str, Any]: | |
| """ | |
| Load a task by difficulty level. | |
| Args: | |
| difficulty: "easy", "medium", or "hard" | |
| Returns: | |
| Task dict with structure: | |
| { | |
| "code": str, # buggy Python code | |
| "tests": str, # path to test.py | |
| "metadata": dict, # source, repo, problem_statement, etc. | |
| "problem_dir": str, # directory containing buggy.py and test.py | |
| "problem_id": str, # unique identifier for this task | |
| } | |
| Raises: | |
| TaskLoadError: If no task can be loaded from any source | |
| """ | |
| difficulty = (difficulty or "").strip().lower() | |
| if difficulty not in {"easy", "medium", "hard"}: | |
| raise ValueError( | |
| f"Invalid difficulty='{difficulty}'. Must be one of: easy, medium, hard" | |
| ) | |
| # Strategy: try sources in order, with fallback if enabled | |
| if self.source == "local": | |
| return self._load_local(difficulty) | |
| elif self.source == "swebench": | |
| return self._load_swebench(difficulty) | |
| else: # "auto" mode | |
| logger.debug("Auto mode: trying SWE-bench first...") | |
| swebench_error = None | |
| try: | |
| return self._load_swebench(difficulty) | |
| except Exception as e: | |
| swebench_error = str(e) | |
| logger.debug(f"SWE-bench failed: {e}") | |
| if self.enable_fallback: | |
| logger.info("SWE-bench unavailable, falling back to local dataset") | |
| try: | |
| return self._load_local(difficulty) | |
| except Exception as local_error: | |
| raise TaskLoadError( | |
| f"Both SWE-bench and local fallback failed:\n" | |
| f" SWE-bench: {swebench_error}\n" | |
| f" Local: {local_error}" | |
| ) from local_error | |
| else: | |
| raise TaskLoadError( | |
| f"SWE-bench loading failed and fallback disabled: {swebench_error}" | |
| ) | |
| def _load_local(self, difficulty: Difficulty) -> Dict[str, Any]: | |
| """Load from local hardcoded dataset.""" | |
| try: | |
| task = get_hardcoded_task(difficulty) | |
| task["metadata"]["source"] = "local" | |
| logger.info(f"Loaded task from local dataset: {task.get('problem_id')}") | |
| return task | |
| except Exception as e: | |
| error_msg = f"Failed to load from local dataset: {e}" | |
| logger.warning(error_msg) | |
| raise TaskLoadError(error_msg) from e | |
| def _load_swebench(self, difficulty: Difficulty) -> Dict[str, Any]: | |
| """Load from SWE-bench Lite dataset.""" | |
| try: | |
| task = get_swebench_task(difficulty) | |
| task["metadata"]["source"] = "swebench" | |
| logger.info( | |
| f"Loaded task from SWE-bench: {task.get('problem_id')} " | |
| f"(repo: {task['metadata'].get('repo', '?')})" | |
| ) | |
| return task | |
| except Exception as e: | |
| error_msg = f"Failed to load from SWE-bench: {e}" | |
| logger.debug(error_msg) | |
| raise TaskLoadError(error_msg) from e | |
| # Global singleton instance for backward compatibility | |
| _default_manager: Optional[TaskManager] = None | |
| def get_task_manager(source: Optional[TaskSource] = None) -> TaskManager: | |
| """ | |
| Get or create the default TaskManager instance. | |
| Args: | |
| source: Override the source selection. If None, uses TASK_SOURCE env var. | |
| Returns: | |
| TaskManager instance | |
| """ | |
| global _default_manager | |
| if _default_manager is None or source is not None: | |
| _default_manager = TaskManager(source=source) | |
| return _default_manager | |
| def load_task(difficulty: Difficulty, source: Optional[TaskSource] = None) -> Dict[str, Any]: | |
| """ | |
| Convenience function: load a task in one call. | |
| Args: | |
| difficulty: "easy", "medium", or "hard" | |
| source: Optional override for task source | |
| Returns: | |
| Task dict | |
| """ | |
| manager = get_task_manager(source=source) | |
| return manager.load_task(difficulty) | |