Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Skill Forge Environment Implementation. | |
| An RL training environment where LLM Agents evolve from "reinventing the wheel" to "building a skill library." | |
| """ | |
| import json | |
| import traceback | |
| from uuid import uuid4 | |
| import pandas as pd | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import SkillForgeAction, SkillForgeObservation | |
| from .data_generator import TASKS | |
| except ImportError: | |
| from models import SkillForgeAction, SkillForgeObservation | |
| from data_generator import TASKS | |
| class SkillForgeEnvironment(Environment): | |
| """ | |
| SkillForge RL environment. | |
| The agent solves chained pandas tasks and can build a reusable skill library. | |
| Skills persist across episodes so the agent can discover and reuse patterns. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self.skill_library: dict = {} | |
| self.task_idx: int = 0 | |
| self.tasks_solved: int = 0 | |
| self.total_tokens: int = 0 | |
| def reset(self) -> SkillForgeObservation: | |
| """ | |
| Reset episode state. skill_library is NOT reset — persists across episodes. | |
| """ | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self.task_idx = 0 | |
| self.tasks_solved = 0 | |
| self.total_tokens = 0 | |
| task = TASKS[self.task_idx] | |
| return self._make_observation(task, result_correct=False, result_output="", reward=0.0, done=False) | |
| def step(self, action: SkillForgeAction) -> SkillForgeObservation: | |
| self._state.step_count += 1 | |
| task = TASKS[self.task_idx] | |
| reward = 0.0 | |
| # --- create_skill: store template, stay on current task --- | |
| if action.action_type == "create_skill": | |
| token_cost = len(action.content) | |
| self.total_tokens += token_cost | |
| self.skill_library[action.skill_name] = { | |
| "template": action.content, | |
| "description": action.reasoning, | |
| "used_count": 0, | |
| } | |
| reward = 0.5 | |
| return self._make_observation( | |
| task, result_correct=False, | |
| result_output=f"Skill '{action.skill_name}' saved.", | |
| reward=reward, done=False, | |
| ) | |
| # --- use_skill or raw_code: execute and evaluate --- | |
| if action.action_type == "use_skill": | |
| skill = self.skill_library.get(action.content) | |
| if skill is None: | |
| # skill not found — treat as error | |
| self.total_tokens += len(action.content) | |
| return self._make_observation( | |
| task, result_correct=False, | |
| result_output=f"Skill '{action.content}' not found in library.", | |
| reward=-0.3, done=False, | |
| ) | |
| exec_code = skill["template"].format(**(action.params or {})) | |
| skill["used_count"] += 1 | |
| # token cost for use_skill: skill name + serialized params (much shorter than full code) | |
| skill_call_repr = action.content + json.dumps(action.params or {}) | |
| token_cost = len(skill_call_repr) | |
| else: | |
| # raw_code | |
| exec_code = action.content | |
| token_cost = len(action.content) | |
| self.total_tokens += token_cost | |
| result_correct, result_output = self._evaluate(exec_code, task["dataframe"], task["expected_output"]) | |
| if action.action_type == "create_skill": | |
| result_correct = True | |
| if result_correct: | |
| reward += 2.0 | |
| reward -= 0.001 * token_cost | |
| if action.action_type == "use_skill": | |
| reward += 0.5 | |
| self.tasks_solved += 1 | |
| self.task_idx += 1 | |
| else: | |
| reward = -0.3 | |
| done = self.task_idx >= len(TASKS) | |
| next_task = TASKS[self.task_idx] if not done else task | |
| return self._make_observation( | |
| next_task, | |
| result_correct=result_correct, | |
| result_output=result_output, | |
| reward=reward, | |
| done=done, | |
| ) | |
| def _evaluate(self, exec_code: str | None, dataframe: pd.DataFrame, expected_output) -> tuple[bool, str]: | |
| if exec_code is None: | |
| return False, "No code to execute." | |
| try: | |
| namespace = {"df": dataframe.copy(), "pd": pd, "__builtins__": {"len": len, "str": str, "int": int, "float": float, "list": list, "dict": dict, "bool": bool, "range": range, "abs": abs, "min": min, "max": max, "sum": sum, "sorted": sorted, "round": round, "True": True, "False": False, "None": None}} | |
| result = eval(exec_code, namespace) | |
| # normalize for comparison | |
| if isinstance(result, pd.DataFrame): | |
| result = result.values.tolist() | |
| if isinstance(result, pd.Series): | |
| result = result.tolist() | |
| if isinstance(result, pd.Index): | |
| result = result.tolist() | |
| expected = expected_output | |
| if isinstance(expected, pd.Series): | |
| expected = expected.tolist() | |
| try: | |
| is_correct = result == expected | |
| except (ValueError, TypeError): | |
| is_correct = False | |
| return bool(is_correct), str(result) | |
| except Exception: | |
| return False, traceback.format_exc() | |
| def _make_observation(self, task: dict, result_correct: bool, result_output: str, | |
| reward: float, done: bool) -> SkillForgeObservation: | |
| return SkillForgeObservation( | |
| task_id=task["id"], | |
| task_description=task["description"], | |
| snapshot_data=task["dataframe"].head(5).to_string(), | |
| skill_library=self.skill_library, | |
| context="", | |
| step_count=self._state.step_count, | |
| total_tokens=self.total_tokens, | |
| result_correct=result_correct, | |
| result_output=result_output, | |
| expected_output=str(task["expected_output"]), | |
| reward=reward, | |
| done=done, | |
| ) | |
| def state(self) -> State: | |
| return self._state | |