skillforge / server /environment.py
seatyyy's picture
Upload folder using huggingface_hub
697e014 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Skill Forge Environment Implementation.
An RL training environment where LLM Agents evolve from "reinventing the wheel" to "building a skill library."
"""
import json
import traceback
from uuid import uuid4
import pandas as pd
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import SkillForgeAction, SkillForgeObservation
from .data_generator import TASKS
except ImportError:
from models import SkillForgeAction, SkillForgeObservation
from data_generator import TASKS
class SkillForgeEnvironment(Environment):
"""
SkillForge RL environment.
The agent solves chained pandas tasks and can build a reusable skill library.
Skills persist across episodes so the agent can discover and reuse patterns.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._state = State(episode_id=str(uuid4()), step_count=0)
self.skill_library: dict = {}
self.task_idx: int = 0
self.tasks_solved: int = 0
self.total_tokens: int = 0
def reset(self) -> SkillForgeObservation:
"""
Reset episode state. skill_library is NOT reset — persists across episodes.
"""
self._state = State(episode_id=str(uuid4()), step_count=0)
self.task_idx = 0
self.tasks_solved = 0
self.total_tokens = 0
task = TASKS[self.task_idx]
return self._make_observation(task, result_correct=False, result_output="", reward=0.0, done=False)
def step(self, action: SkillForgeAction) -> SkillForgeObservation:
self._state.step_count += 1
task = TASKS[self.task_idx]
reward = 0.0
# --- create_skill: store template, stay on current task ---
if action.action_type == "create_skill":
token_cost = len(action.content)
self.total_tokens += token_cost
self.skill_library[action.skill_name] = {
"template": action.content,
"description": action.reasoning,
"used_count": 0,
}
reward = 0.5
return self._make_observation(
task, result_correct=False,
result_output=f"Skill '{action.skill_name}' saved.",
reward=reward, done=False,
)
# --- use_skill or raw_code: execute and evaluate ---
if action.action_type == "use_skill":
skill = self.skill_library.get(action.content)
if skill is None:
# skill not found — treat as error
self.total_tokens += len(action.content)
return self._make_observation(
task, result_correct=False,
result_output=f"Skill '{action.content}' not found in library.",
reward=-0.3, done=False,
)
exec_code = skill["template"].format(**(action.params or {}))
skill["used_count"] += 1
# token cost for use_skill: skill name + serialized params (much shorter than full code)
skill_call_repr = action.content + json.dumps(action.params or {})
token_cost = len(skill_call_repr)
else:
# raw_code
exec_code = action.content
token_cost = len(action.content)
self.total_tokens += token_cost
result_correct, result_output = self._evaluate(exec_code, task["dataframe"], task["expected_output"])
if action.action_type == "create_skill":
result_correct = True
if result_correct:
reward += 2.0
reward -= 0.001 * token_cost
if action.action_type == "use_skill":
reward += 0.5
self.tasks_solved += 1
self.task_idx += 1
else:
reward = -0.3
done = self.task_idx >= len(TASKS)
next_task = TASKS[self.task_idx] if not done else task
return self._make_observation(
next_task,
result_correct=result_correct,
result_output=result_output,
reward=reward,
done=done,
)
def _evaluate(self, exec_code: str | None, dataframe: pd.DataFrame, expected_output) -> tuple[bool, str]:
if exec_code is None:
return False, "No code to execute."
try:
namespace = {"df": dataframe.copy(), "pd": pd, "__builtins__": {"len": len, "str": str, "int": int, "float": float, "list": list, "dict": dict, "bool": bool, "range": range, "abs": abs, "min": min, "max": max, "sum": sum, "sorted": sorted, "round": round, "True": True, "False": False, "None": None}}
result = eval(exec_code, namespace)
# normalize for comparison
if isinstance(result, pd.DataFrame):
result = result.values.tolist()
if isinstance(result, pd.Series):
result = result.tolist()
if isinstance(result, pd.Index):
result = result.tolist()
expected = expected_output
if isinstance(expected, pd.Series):
expected = expected.tolist()
try:
is_correct = result == expected
except (ValueError, TypeError):
is_correct = False
return bool(is_correct), str(result)
except Exception:
return False, traceback.format_exc()
def _make_observation(self, task: dict, result_correct: bool, result_output: str,
reward: float, done: bool) -> SkillForgeObservation:
return SkillForgeObservation(
task_id=task["id"],
task_description=task["description"],
snapshot_data=task["dataframe"].head(5).to_string(),
skill_library=self.skill_library,
context="",
step_count=self._state.step_count,
total_tokens=self.total_tokens,
result_correct=result_correct,
result_output=result_output,
expected_output=str(task["expected_output"]),
reward=reward,
done=done,
)
@property
def state(self) -> State:
return self._state