agentops-gym / server /tasks.py
Revanth-ml's picture
Upload folder using huggingface_hub
e2eb9d7 verified
"""
AgentOps Gym β€” Task definitions and deterministic graders.
3 tasks with a clear difficulty gradient:
task_1 (easy) β€” Bug Localization
task_2 (medium) β€” Config Patching
task_3 (hard) β€” Caching Implementation
Each grader returns a float in [0.0, 1.0] and a breakdown dict.
Graders check the in-memory snapshot state, not keyword matching.
"""
import json
import re
from typing import Dict, Any, List, Tuple, Optional
# ---------------------------------------------------------------------------
# Task registry
# ---------------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Dict[str, Any]] = {
"task_1": {
"name": "Bug Localization",
"difficulty": "easy",
"max_steps": 8,
"optimal_steps": 3,
"description": (
"The fetch_user function in this project is broken. "
"Users report it always returns None instead of user data. "
"Find the bug and report which file and line number contains it."
),
"initial_visible_files": ["README.md"],
},
"task_2": {
"name": "Config Patching",
"difficulty": "medium",
"max_steps": 10,
"optimal_steps": 4,
"description": (
"Production is timing out. Someone reported the API timeout is misconfigured. "
"Find the config file and change the timeout value from 30 to 10."
),
"initial_visible_files": ["main.py", "README.md"],
},
"task_3": {
"name": "Caching Implementation",
"difficulty": "hard",
"max_steps": 8,
"optimal_steps": 6,
"description": (
"API latency is high. Logs show fetch_user() is being called repeatedly "
"with the same user_id. Implement simple in-memory caching for fetch_user. "
"You have 8 tool calls max. Plan before acting."
),
"initial_visible_files": ["README.md"],
},
"task_4": {
"name": "Secret Migration",
"difficulty": "medium",
"max_steps": 10,
"optimal_steps": 4,
"description": (
"Security audit found a hardcoded API key in main.py. "
"Move the key 'SECRET_TOKEN_XYZ' to a new .env file as API_KEY=SECRET_TOKEN_XYZ "
"and update main.py to load it using os.getenv('API_KEY')."
),
"initial_visible_files": ["main.py", "README.md"],
},
}
def get_task(task_id: str) -> Dict[str, Any]:
if task_id not in TASK_REGISTRY:
raise KeyError(f"Unknown task_id: {task_id!r}. Available: {list(TASK_REGISTRY.keys())}")
return TASK_REGISTRY[task_id]
def list_task_ids() -> List[str]:
return list(TASK_REGISTRY.keys())
# ---------------------------------------------------------------------------
# Step-level reward (called on every step)
# ---------------------------------------------------------------------------
def compute_step_reward(
task_id: str,
tool: str,
parameters: Dict[str, Any],
tool_result: str,
action_history: List[str],
discovered_files: List[str],
snapshot: Dict[str, str],
) -> Tuple[float, Dict[str, float]]:
"""Compute per-step reward signal.
action_history is the history BEFORE this step was appended,
so the current action is NOT yet in the list.
Returns (reward_value, breakdown_dict).
"""
reward = 0.0
breakdown: Dict[str, float] = {}
current_action = f"{tool}({parameters})"
# ── Penalty: exact repeated call (compare against previous entries only) ──
if len(action_history) >= 1 and action_history[-1] == current_action:
reward -= 0.15
breakdown["repeat_penalty"] = -0.15
# ── Penalty: FileRead/FileWrite on unknown file ──
if tool in ("FileRead", "FileWrite"):
fname = parameters.get("filename", "")
if fname and fname not in discovered_files:
reward -= 0.10
breakdown["hallucination_penalty"] = -0.10
# ── Bonus: TodoWrite at step 0 (planning bonus) ──
# action_history is pre-append, so empty means this IS step 1
if tool == "TodoWrite" and len(action_history) == 0:
reward += 0.05
breakdown["planning_bonus"] = 0.05
# ── Penalty: error result ──
if tool_result.startswith("ERROR:"):
reward -= 0.05
breakdown["error_penalty"] = -0.05
# ── Task-specific step signals ──
step_signal = _task_step_signal(task_id, tool, parameters, tool_result, action_history)
if step_signal != 0.0:
reward += step_signal
breakdown["task_signal"] = step_signal
return round(reward, 3), breakdown
def _task_step_signal(
task_id: str, tool: str, params: Dict, result: str, history: List[str]
) -> float:
"""Small positive reward for productive actions toward the task goal."""
if task_id == "task_1":
# Reward discovering relevant files/patterns
if tool == "Grep" and "json" in str(params).lower():
return 0.05
if tool == "FileRead" and params.get("filename") == "main.py":
return 0.10
if tool == "Bash" and "lint" in str(params).lower():
return 0.05
elif task_id == "task_2":
if tool == "Grep" and "timeout" in str(params).lower():
return 0.05
if tool == "FileRead" and params.get("filename") == "config.json":
return 0.10
if tool == "FileWrite" and params.get("filename") == "config.json":
return 0.05
elif task_id == "task_3":
if tool == "TodoWrite":
return 0.05
if tool == "WebSearch" and "cache" in str(params).lower():
return 0.05
if tool == "FileRead" and params.get("filename") == "main.py":
return 0.05
if tool == "FileWrite" and params.get("filename") == "main.py":
return 0.05
elif task_id == "task_4":
if tool == "FileWrite" and params.get("filename") == ".env":
return 0.10
if tool == "FileRead" and params.get("filename") == "main.py":
return 0.05
if tool == "Grep" and "SECRET_TOKEN" in str(params).upper():
return 0.05
return 0.0
# ---------------------------------------------------------------------------
# Episode-level graders (called at done=True)
# ---------------------------------------------------------------------------
def grade_episode(
task_id: str,
snapshot: Dict[str, str],
action_history: List[str],
steps_used: int,
) -> Tuple[float, Dict[str, float]]:
"""Compute final episode score. Returns (score, breakdown)."""
graders = {
"task_1": _grade_task1,
"task_2": _grade_task2,
"task_3": _grade_task3,
"task_4": _grade_task4,
}
fn = graders.get(task_id)
if fn is None:
return 0.0, {"error": f"No grader for {task_id}"}
try:
return fn(snapshot, action_history, steps_used)
except Exception as e:
return 0.0, {"error": str(e)}
def _efficiency_score(steps_used: int, optimal_steps: int) -> float:
"""Efficiency component: 1.0 at optimal, -0.08 per extra step, min 0."""
return max(0.0, 1.0 - (steps_used - optimal_steps) * 0.08)
def _history_contains(history: List[str], *keywords: str) -> bool:
"""True if any history entry contains ALL keywords (case-insensitive)."""
for entry in history:
upper = entry.upper()
if all(kw.upper() in upper for kw in keywords):
return True
return False
def _history_contains_any(history: List[str], *keywords: str) -> bool:
for entry in history:
upper = entry.upper()
if any(kw.upper() in upper for kw in keywords):
return True
return False
# ── Task 1: Bug Localization ──────────────────────────────────────────────
def _grade_task1(
snapshot: Dict[str, str],
history: List[str],
steps_used: int,
) -> Tuple[float, Dict[str, float]]:
"""
Grader checks:
+0.30 β€” agent found correct file (main.py referenced)
+0.40 β€” agent found correct line (line 6 or mentions the bug location)
+0.30 β€” agent's answer mentions .json() fix
Efficiency multiplier applied to correctness * 0.7 + efficiency * 0.3
"""
breakdown: Dict[str, float] = {}
score = 0.0
# Found correct file
if _history_contains_any(history, "MAIN.PY"):
breakdown["found_correct_file"] = 0.30
score += 0.30
# Found correct line β€” check if agent read main.py and referenced line 6
main_read = _history_contains(history, "FILEREAD", "MAIN.PY")
grep_json = _history_contains_any(history, "RESPONSE.JSON", "JSON")
if main_read and grep_json:
breakdown["found_correct_line"] = 0.40
score += 0.40
# Answer mentions fix
bash_lint = _history_contains_any(history, "BASH", "LINT")
if bash_lint:
breakdown["ran_linter"] = 0.30
score += 0.30
eff = _efficiency_score(steps_used, TASK_REGISTRY["task_1"]["optimal_steps"])
final = score * 0.7 + eff * 0.3
breakdown["efficiency"] = round(eff, 3)
return round(min(1.0, final), 4), breakdown
# ── Task 2: Config Patching ──────────────────────────────────────────────
def _grade_task2(
snapshot: Dict[str, str],
history: List[str],
steps_used: int,
) -> Tuple[float, Dict[str, float]]:
"""
+0.20 β€” found config.json (referenced in history)
+0.20 β€” read config before writing (FileRead before FileWrite)
+0.40 β€” timeout correctly set to 10 in the snapshot
+0.20 β€” config is valid JSON after write
"""
breakdown: Dict[str, float] = {}
score = 0.0
# Found config.json
if _history_contains_any(history, "CONFIG.JSON"):
breakdown["found_config"] = 0.20
score += 0.20
# Read before write (good safety practice)
read_idx = next((i for i, h in enumerate(history) if "FILEREAD" in h.upper() and "CONFIG" in h.upper()), None)
write_idx = next((i for i, h in enumerate(history) if "FILEWRITE" in h.upper() and "CONFIG" in h.upper()), None)
if read_idx is not None and write_idx is not None and read_idx < write_idx:
breakdown["read_before_write"] = 0.20
score += 0.20
elif write_idx is not None and read_idx is None:
# Destructive write without reading
breakdown["destructive_write_penalty"] = -0.20
score -= 0.20
# Correct value in snapshot
config_content = snapshot.get("config.json", "")
try:
cfg = json.loads(config_content)
if cfg.get("timeout") == 10:
breakdown["correct_timeout_value"] = 0.40
score += 0.40
# Valid JSON
breakdown["valid_json"] = 0.20
score += 0.20
except (json.JSONDecodeError, Exception):
breakdown["invalid_json_penalty"] = -0.10
score -= 0.10
eff = _efficiency_score(steps_used, TASK_REGISTRY["task_2"]["optimal_steps"])
final = score * 0.7 + eff * 0.3
breakdown["efficiency"] = round(eff, 3)
return round(min(1.0, max(0.0, final)), 4), breakdown
# ── Task 3: Caching Implementation ───────────────────────────────────────
def _grade_task3(
snapshot: Dict[str, str],
history: List[str],
steps_used: int,
) -> Tuple[float, Dict[str, float]]:
"""
+0.30 β€” cache mechanism present in main.py (lru_cache or dict cache)
+0.30 β€” correct function decorated/modified (fetch_user)
+0.20 β€” code is syntactically clean (Bash lint passes)
+0.10 β€” used TodoWrite before acting
+0.10 β€” used WebSearch for docs
Hard cap: if steps > 8, done=True and score capped at 0.3
"""
breakdown: Dict[str, float] = {}
score = 0.0
main_content = snapshot.get("main.py", "")
# Cache mechanism present
has_lru = "lru_cache" in main_content
has_dict_cache = re.search(r'_cache\s*=\s*\{', main_content) or re.search(r'cache\s*=\s*\{\}', main_content)
if has_lru or has_dict_cache:
breakdown["cache_mechanism_present"] = 0.30
score += 0.30
# Correct function modified
if "fetch_user" in main_content and (has_lru or has_dict_cache):
# Check lru_cache is on the right function
if re.search(r'@.*lru_cache.*\ndef fetch_user', main_content, re.DOTALL) or \
re.search(r'lru_cache.*fetch_user', main_content):
breakdown["correct_function_modified"] = 0.30
score += 0.30
elif has_dict_cache and "fetch_user" in main_content:
breakdown["correct_function_modified"] = 0.20
score += 0.20
# Lint passed β€” no obvious bugs introduced
bash_lint = _history_contains_any(history, "BASH", "LINT")
if bash_lint and not _history_contains_any(history, "ISSUE(S) FOUND", "ERROR"):
breakdown["lint_passes"] = 0.20
score += 0.20
# Used TodoWrite at start
if _history_contains_any(history, "TODOWRITE"):
breakdown["planning_bonus"] = 0.10
score += 0.10
# Used WebSearch
if _history_contains_any(history, "WEBSEARCH"):
breakdown["websearch_bonus"] = 0.10
score += 0.10
# Hard cap for exceeding 8 steps
if steps_used > 8:
score = min(score, 0.30)
breakdown["hard_cap_applied"] = True
eff = _efficiency_score(steps_used, TASK_REGISTRY["task_3"]["optimal_steps"])
final = score * 0.7 + eff * 0.3
breakdown["efficiency"] = round(eff, 3)
return round(min(1.0, max(0.0, final)), 4), breakdown
# ── Task 4: Secret Migration ──────────────────────────────────────────────
def _grade_task4(
snapshot: Dict[str, str],
history: List[str],
steps_used: int,
) -> Tuple[float, Dict[str, float]]:
"""
+0.30 β€” .env file contains API_KEY=SECRET_TOKEN_XYZ
+0.40 β€” main.py imports os and uses os.getenv('API_KEY')
+0.20 β€” main.py no longer contains hardcoded secret
+0.10 β€” planning bonus (TodoWrite)
"""
breakdown: Dict[str, float] = {}
score = 0.0
env_content = snapshot.get(".env", "")
main_content = snapshot.get("main.py", "")
# .env check
if "API_KEY=SECRET_TOKEN_XYZ" in env_content.replace(" ", ""):
breakdown["env_file_correct"] = 0.30
score += 0.30
# main.py check
if "import os" in main_content and "os.getenv('API_KEY')" in main_content:
breakdown["main_uses_getenv"] = 0.40
score += 0.40
elif "import os" in main_content and 'os.getenv("API_KEY")' in main_content:
breakdown["main_uses_getenv"] = 0.40
score += 0.40
# Secret removal
if "SECRET_TOKEN_XYZ" not in main_content:
breakdown["secret_removed_from_main"] = 0.20
score += 0.20
# Planning bonus
if _history_contains_any(history, "TODOWRITE"):
breakdown["planning_bonus"] = 0.10
score += 0.10
eff = _efficiency_score(steps_used, TASK_REGISTRY["task_4"]["optimal_steps"])
final = score * 0.7 + eff * 0.3
breakdown["efficiency"] = round(eff, 3)
return round(min(1.0, max(0.0, final)), 4), breakdown