Spaces:
Sleeping
Sleeping
| """ | |
| AgentOps Gym β Task definitions and deterministic graders. | |
| 3 tasks with a clear difficulty gradient: | |
| task_1 (easy) β Bug Localization | |
| task_2 (medium) β Config Patching | |
| task_3 (hard) β Caching Implementation | |
| Each grader returns a float in [0.0, 1.0] and a breakdown dict. | |
| Graders check the in-memory snapshot state, not keyword matching. | |
| """ | |
| import json | |
| import re | |
| from typing import Dict, Any, List, Tuple, Optional | |
| # --------------------------------------------------------------------------- | |
| # Task registry | |
| # --------------------------------------------------------------------------- | |
| TASK_REGISTRY: Dict[str, Dict[str, Any]] = { | |
| "task_1": { | |
| "name": "Bug Localization", | |
| "difficulty": "easy", | |
| "max_steps": 8, | |
| "optimal_steps": 3, | |
| "description": ( | |
| "The fetch_user function in this project is broken. " | |
| "Users report it always returns None instead of user data. " | |
| "Find the bug and report which file and line number contains it." | |
| ), | |
| "initial_visible_files": ["README.md"], | |
| }, | |
| "task_2": { | |
| "name": "Config Patching", | |
| "difficulty": "medium", | |
| "max_steps": 10, | |
| "optimal_steps": 4, | |
| "description": ( | |
| "Production is timing out. Someone reported the API timeout is misconfigured. " | |
| "Find the config file and change the timeout value from 30 to 10." | |
| ), | |
| "initial_visible_files": ["main.py", "README.md"], | |
| }, | |
| "task_3": { | |
| "name": "Caching Implementation", | |
| "difficulty": "hard", | |
| "max_steps": 8, | |
| "optimal_steps": 6, | |
| "description": ( | |
| "API latency is high. Logs show fetch_user() is being called repeatedly " | |
| "with the same user_id. Implement simple in-memory caching for fetch_user. " | |
| "You have 8 tool calls max. Plan before acting." | |
| ), | |
| "initial_visible_files": ["README.md"], | |
| }, | |
| "task_4": { | |
| "name": "Secret Migration", | |
| "difficulty": "medium", | |
| "max_steps": 10, | |
| "optimal_steps": 4, | |
| "description": ( | |
| "Security audit found a hardcoded API key in main.py. " | |
| "Move the key 'SECRET_TOKEN_XYZ' to a new .env file as API_KEY=SECRET_TOKEN_XYZ " | |
| "and update main.py to load it using os.getenv('API_KEY')." | |
| ), | |
| "initial_visible_files": ["main.py", "README.md"], | |
| }, | |
| } | |
| def get_task(task_id: str) -> Dict[str, Any]: | |
| if task_id not in TASK_REGISTRY: | |
| raise KeyError(f"Unknown task_id: {task_id!r}. Available: {list(TASK_REGISTRY.keys())}") | |
| return TASK_REGISTRY[task_id] | |
| def list_task_ids() -> List[str]: | |
| return list(TASK_REGISTRY.keys()) | |
| # --------------------------------------------------------------------------- | |
| # Step-level reward (called on every step) | |
| # --------------------------------------------------------------------------- | |
| def compute_step_reward( | |
| task_id: str, | |
| tool: str, | |
| parameters: Dict[str, Any], | |
| tool_result: str, | |
| action_history: List[str], | |
| discovered_files: List[str], | |
| snapshot: Dict[str, str], | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """Compute per-step reward signal. | |
| action_history is the history BEFORE this step was appended, | |
| so the current action is NOT yet in the list. | |
| Returns (reward_value, breakdown_dict). | |
| """ | |
| reward = 0.0 | |
| breakdown: Dict[str, float] = {} | |
| current_action = f"{tool}({parameters})" | |
| # ββ Penalty: exact repeated call (compare against previous entries only) ββ | |
| if len(action_history) >= 1 and action_history[-1] == current_action: | |
| reward -= 0.15 | |
| breakdown["repeat_penalty"] = -0.15 | |
| # ββ Penalty: FileRead/FileWrite on unknown file ββ | |
| if tool in ("FileRead", "FileWrite"): | |
| fname = parameters.get("filename", "") | |
| if fname and fname not in discovered_files: | |
| reward -= 0.10 | |
| breakdown["hallucination_penalty"] = -0.10 | |
| # ββ Bonus: TodoWrite at step 0 (planning bonus) ββ | |
| # action_history is pre-append, so empty means this IS step 1 | |
| if tool == "TodoWrite" and len(action_history) == 0: | |
| reward += 0.05 | |
| breakdown["planning_bonus"] = 0.05 | |
| # ββ Penalty: error result ββ | |
| if tool_result.startswith("ERROR:"): | |
| reward -= 0.05 | |
| breakdown["error_penalty"] = -0.05 | |
| # ββ Task-specific step signals ββ | |
| step_signal = _task_step_signal(task_id, tool, parameters, tool_result, action_history) | |
| if step_signal != 0.0: | |
| reward += step_signal | |
| breakdown["task_signal"] = step_signal | |
| return round(reward, 3), breakdown | |
| def _task_step_signal( | |
| task_id: str, tool: str, params: Dict, result: str, history: List[str] | |
| ) -> float: | |
| """Small positive reward for productive actions toward the task goal.""" | |
| if task_id == "task_1": | |
| # Reward discovering relevant files/patterns | |
| if tool == "Grep" and "json" in str(params).lower(): | |
| return 0.05 | |
| if tool == "FileRead" and params.get("filename") == "main.py": | |
| return 0.10 | |
| if tool == "Bash" and "lint" in str(params).lower(): | |
| return 0.05 | |
| elif task_id == "task_2": | |
| if tool == "Grep" and "timeout" in str(params).lower(): | |
| return 0.05 | |
| if tool == "FileRead" and params.get("filename") == "config.json": | |
| return 0.10 | |
| if tool == "FileWrite" and params.get("filename") == "config.json": | |
| return 0.05 | |
| elif task_id == "task_3": | |
| if tool == "TodoWrite": | |
| return 0.05 | |
| if tool == "WebSearch" and "cache" in str(params).lower(): | |
| return 0.05 | |
| if tool == "FileRead" and params.get("filename") == "main.py": | |
| return 0.05 | |
| if tool == "FileWrite" and params.get("filename") == "main.py": | |
| return 0.05 | |
| elif task_id == "task_4": | |
| if tool == "FileWrite" and params.get("filename") == ".env": | |
| return 0.10 | |
| if tool == "FileRead" and params.get("filename") == "main.py": | |
| return 0.05 | |
| if tool == "Grep" and "SECRET_TOKEN" in str(params).upper(): | |
| return 0.05 | |
| return 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Episode-level graders (called at done=True) | |
| # --------------------------------------------------------------------------- | |
| def grade_episode( | |
| task_id: str, | |
| snapshot: Dict[str, str], | |
| action_history: List[str], | |
| steps_used: int, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """Compute final episode score. Returns (score, breakdown).""" | |
| graders = { | |
| "task_1": _grade_task1, | |
| "task_2": _grade_task2, | |
| "task_3": _grade_task3, | |
| "task_4": _grade_task4, | |
| } | |
| fn = graders.get(task_id) | |
| if fn is None: | |
| return 0.0, {"error": f"No grader for {task_id}"} | |
| try: | |
| return fn(snapshot, action_history, steps_used) | |
| except Exception as e: | |
| return 0.0, {"error": str(e)} | |
| def _efficiency_score(steps_used: int, optimal_steps: int) -> float: | |
| """Efficiency component: 1.0 at optimal, -0.08 per extra step, min 0.""" | |
| return max(0.0, 1.0 - (steps_used - optimal_steps) * 0.08) | |
| def _history_contains(history: List[str], *keywords: str) -> bool: | |
| """True if any history entry contains ALL keywords (case-insensitive).""" | |
| for entry in history: | |
| upper = entry.upper() | |
| if all(kw.upper() in upper for kw in keywords): | |
| return True | |
| return False | |
| def _history_contains_any(history: List[str], *keywords: str) -> bool: | |
| for entry in history: | |
| upper = entry.upper() | |
| if any(kw.upper() in upper for kw in keywords): | |
| return True | |
| return False | |
| # ββ Task 1: Bug Localization ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_task1( | |
| snapshot: Dict[str, str], | |
| history: List[str], | |
| steps_used: int, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """ | |
| Grader checks: | |
| +0.30 β agent found correct file (main.py referenced) | |
| +0.40 β agent found correct line (line 6 or mentions the bug location) | |
| +0.30 β agent's answer mentions .json() fix | |
| Efficiency multiplier applied to correctness * 0.7 + efficiency * 0.3 | |
| """ | |
| breakdown: Dict[str, float] = {} | |
| score = 0.0 | |
| # Found correct file | |
| if _history_contains_any(history, "MAIN.PY"): | |
| breakdown["found_correct_file"] = 0.30 | |
| score += 0.30 | |
| # Found correct line β check if agent read main.py and referenced line 6 | |
| main_read = _history_contains(history, "FILEREAD", "MAIN.PY") | |
| grep_json = _history_contains_any(history, "RESPONSE.JSON", "JSON") | |
| if main_read and grep_json: | |
| breakdown["found_correct_line"] = 0.40 | |
| score += 0.40 | |
| # Answer mentions fix | |
| bash_lint = _history_contains_any(history, "BASH", "LINT") | |
| if bash_lint: | |
| breakdown["ran_linter"] = 0.30 | |
| score += 0.30 | |
| eff = _efficiency_score(steps_used, TASK_REGISTRY["task_1"]["optimal_steps"]) | |
| final = score * 0.7 + eff * 0.3 | |
| breakdown["efficiency"] = round(eff, 3) | |
| return round(min(1.0, final), 4), breakdown | |
| # ββ Task 2: Config Patching ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_task2( | |
| snapshot: Dict[str, str], | |
| history: List[str], | |
| steps_used: int, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """ | |
| +0.20 β found config.json (referenced in history) | |
| +0.20 β read config before writing (FileRead before FileWrite) | |
| +0.40 β timeout correctly set to 10 in the snapshot | |
| +0.20 β config is valid JSON after write | |
| """ | |
| breakdown: Dict[str, float] = {} | |
| score = 0.0 | |
| # Found config.json | |
| if _history_contains_any(history, "CONFIG.JSON"): | |
| breakdown["found_config"] = 0.20 | |
| score += 0.20 | |
| # Read before write (good safety practice) | |
| read_idx = next((i for i, h in enumerate(history) if "FILEREAD" in h.upper() and "CONFIG" in h.upper()), None) | |
| write_idx = next((i for i, h in enumerate(history) if "FILEWRITE" in h.upper() and "CONFIG" in h.upper()), None) | |
| if read_idx is not None and write_idx is not None and read_idx < write_idx: | |
| breakdown["read_before_write"] = 0.20 | |
| score += 0.20 | |
| elif write_idx is not None and read_idx is None: | |
| # Destructive write without reading | |
| breakdown["destructive_write_penalty"] = -0.20 | |
| score -= 0.20 | |
| # Correct value in snapshot | |
| config_content = snapshot.get("config.json", "") | |
| try: | |
| cfg = json.loads(config_content) | |
| if cfg.get("timeout") == 10: | |
| breakdown["correct_timeout_value"] = 0.40 | |
| score += 0.40 | |
| # Valid JSON | |
| breakdown["valid_json"] = 0.20 | |
| score += 0.20 | |
| except (json.JSONDecodeError, Exception): | |
| breakdown["invalid_json_penalty"] = -0.10 | |
| score -= 0.10 | |
| eff = _efficiency_score(steps_used, TASK_REGISTRY["task_2"]["optimal_steps"]) | |
| final = score * 0.7 + eff * 0.3 | |
| breakdown["efficiency"] = round(eff, 3) | |
| return round(min(1.0, max(0.0, final)), 4), breakdown | |
| # ββ Task 3: Caching Implementation βββββββββββββββββββββββββββββββββββββββ | |
| def _grade_task3( | |
| snapshot: Dict[str, str], | |
| history: List[str], | |
| steps_used: int, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """ | |
| +0.30 β cache mechanism present in main.py (lru_cache or dict cache) | |
| +0.30 β correct function decorated/modified (fetch_user) | |
| +0.20 β code is syntactically clean (Bash lint passes) | |
| +0.10 β used TodoWrite before acting | |
| +0.10 β used WebSearch for docs | |
| Hard cap: if steps > 8, done=True and score capped at 0.3 | |
| """ | |
| breakdown: Dict[str, float] = {} | |
| score = 0.0 | |
| main_content = snapshot.get("main.py", "") | |
| # Cache mechanism present | |
| has_lru = "lru_cache" in main_content | |
| has_dict_cache = re.search(r'_cache\s*=\s*\{', main_content) or re.search(r'cache\s*=\s*\{\}', main_content) | |
| if has_lru or has_dict_cache: | |
| breakdown["cache_mechanism_present"] = 0.30 | |
| score += 0.30 | |
| # Correct function modified | |
| if "fetch_user" in main_content and (has_lru or has_dict_cache): | |
| # Check lru_cache is on the right function | |
| if re.search(r'@.*lru_cache.*\ndef fetch_user', main_content, re.DOTALL) or \ | |
| re.search(r'lru_cache.*fetch_user', main_content): | |
| breakdown["correct_function_modified"] = 0.30 | |
| score += 0.30 | |
| elif has_dict_cache and "fetch_user" in main_content: | |
| breakdown["correct_function_modified"] = 0.20 | |
| score += 0.20 | |
| # Lint passed β no obvious bugs introduced | |
| bash_lint = _history_contains_any(history, "BASH", "LINT") | |
| if bash_lint and not _history_contains_any(history, "ISSUE(S) FOUND", "ERROR"): | |
| breakdown["lint_passes"] = 0.20 | |
| score += 0.20 | |
| # Used TodoWrite at start | |
| if _history_contains_any(history, "TODOWRITE"): | |
| breakdown["planning_bonus"] = 0.10 | |
| score += 0.10 | |
| # Used WebSearch | |
| if _history_contains_any(history, "WEBSEARCH"): | |
| breakdown["websearch_bonus"] = 0.10 | |
| score += 0.10 | |
| # Hard cap for exceeding 8 steps | |
| if steps_used > 8: | |
| score = min(score, 0.30) | |
| breakdown["hard_cap_applied"] = True | |
| eff = _efficiency_score(steps_used, TASK_REGISTRY["task_3"]["optimal_steps"]) | |
| final = score * 0.7 + eff * 0.3 | |
| breakdown["efficiency"] = round(eff, 3) | |
| return round(min(1.0, max(0.0, final)), 4), breakdown | |
| # ββ Task 4: Secret Migration ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _grade_task4( | |
| snapshot: Dict[str, str], | |
| history: List[str], | |
| steps_used: int, | |
| ) -> Tuple[float, Dict[str, float]]: | |
| """ | |
| +0.30 β .env file contains API_KEY=SECRET_TOKEN_XYZ | |
| +0.40 β main.py imports os and uses os.getenv('API_KEY') | |
| +0.20 β main.py no longer contains hardcoded secret | |
| +0.10 β planning bonus (TodoWrite) | |
| """ | |
| breakdown: Dict[str, float] = {} | |
| score = 0.0 | |
| env_content = snapshot.get(".env", "") | |
| main_content = snapshot.get("main.py", "") | |
| # .env check | |
| if "API_KEY=SECRET_TOKEN_XYZ" in env_content.replace(" ", ""): | |
| breakdown["env_file_correct"] = 0.30 | |
| score += 0.30 | |
| # main.py check | |
| if "import os" in main_content and "os.getenv('API_KEY')" in main_content: | |
| breakdown["main_uses_getenv"] = 0.40 | |
| score += 0.40 | |
| elif "import os" in main_content and 'os.getenv("API_KEY")' in main_content: | |
| breakdown["main_uses_getenv"] = 0.40 | |
| score += 0.40 | |
| # Secret removal | |
| if "SECRET_TOKEN_XYZ" not in main_content: | |
| breakdown["secret_removed_from_main"] = 0.20 | |
| score += 0.20 | |
| # Planning bonus | |
| if _history_contains_any(history, "TODOWRITE"): | |
| breakdown["planning_bonus"] = 0.10 | |
| score += 0.10 | |
| eff = _efficiency_score(steps_used, TASK_REGISTRY["task_4"]["optimal_steps"]) | |
| final = score * 0.7 + eff * 0.3 | |
| breakdown["efficiency"] = round(eff, 3) | |
| return round(min(1.0, max(0.0, final)), 4), breakdown |