Buckets:

|
download
raw
18.5 kB
# Rubrics: Composable Reward Computation
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/huggingface/OpenEnv/blob/main/examples/rubrics.ipynb)
Rubrics are OpenEnv's first-class abstraction for computing rewards. They let you build multi-criteria reward functions from small reusable pieces. This tutorial walks through the API end-to-end, from a one-line rubric to a full environment that introspects its reward signal at training time.
## Why Rubrics?
Before rubrics, each environment rolled its own reward logic. Three pain points surfaced repeatedly:
1. **No standard interface**. Every environment author invented their own `compute_reward(...)` shape, so reusing a reward component across environments meant copy-pasting.
2. **Multi-criteria evaluation was ad-hoc**. "Code must compile, tests must pass, style matters a bit" becomes a tangle of nested `if`/`else` and hand-rolled weighted averages. There was no consistent way to ask *which* criterion caused a low reward.
3. **LLM judges and sandboxed checks are slow**. Without a framework-level concept of "reward component", batch evaluation couldn't parallelise the I/O-bound pieces.
The Rubric API is small: you subclass, implement `forward`, and the framework gives you composition, introspection, and parallel evaluation for free.
## Your First Rubric
A rubric is a callable with a `forward(action, observation) -> float` method.
```python
from openenv.core.rubrics import Rubric
class MessageLengthRubric(Rubric):
"""Reward 1.0 if the message is 5–20 characters long, else 0.0."""
def forward(self, action, observation) -> float:
length = len(action.message)
return 1.0 if 5 <= length <= 20 else 0.0
```
That's the whole contract. Instantiate it and call it:
```python
rubric = MessageLengthRubric()
score = rubric(action, observation) # runs forward + hooks
print(rubric.last_score) # latest score is cached on the rubric
```
`Rubric.__call__` runs pre- and post-hooks around your `forward`, caches the result on `self.last_score`, and supports async `forward` implementations transparently. (If you've used `torch.nn.Module`, the subclass-and-implement-`forward` pattern will feel familiar — children assigned as instance attributes auto-register with the parent.)
### Optional hooks for observability
You can attach hooks without subclassing — useful for logging every component's score without polluting `forward`. Post-hooks run after `forward` completes and see the returned score; pre-hooks run before `forward` and are handy for input validation or instrumentation. When a rubric is async, hooks are awaited transparently.
```python
def log_score(rubric, action, obs, result):
print(f"{type(rubric).__name__}: {result:.2f}")
rubric.register_forward_hook(log_score) # fires after forward()
rubric.register_forward_pre_hook(lambda r, a, o: None) # fires before forward()
```
### State dict
Rubrics implement `state_dict()` / `load_state_dict(state)` so their configuration (thresholds, prompt templates, etc.) can be serialised alongside model checkpoints. The default implementations return an empty dict — override them when your rubric has tunable parameters.
## Composing Rubrics
The real power shows up when you stack rubrics. `openenv.core.rubrics` ships with four containers.
### `WeightedSum` — multi-criteria averaging
Use when several independent criteria each contribute to the final score.
```python
from openenv.core.rubrics import WeightedSum
class TestsPassRubric(Rubric):
def forward(self, action, observation) -> float:
return observation.tests_passed / max(observation.tests_total, 1)
class StyleRubric(Rubric):
def forward(self, action, observation) -> float:
return 1.0 if action.code.count("\n\n\n") == 0 else 0.6
reward = WeightedSum(
[TestsPassRubric(), StyleRubric()],
weights=[0.7, 0.3],
)
```
Weights must sum to `1.0`. `WeightedSum` evaluates its children with `asyncio.gather` when any of them is async, so an LLM-backed child does not block the synchronous ones.
### `Gate` — hard constraints
Use when a child score below a threshold should short-circuit the reward to zero.
```python
from openenv.core.rubrics import Gate
reward = Gate(TestsPassRubric(), threshold=0.5) # 0.0 if fewer than half the tests pass
```
`Gate` returns `0.0` when the child score is below the threshold, and passes the child score through unchanged otherwise.
### `Sequential` — fail-fast pipeline
Use when criteria are ordered: a later criterion only matters if the earlier ones passed. Sequential returns `0.0` the moment any child returns `0.0` and does not evaluate the remaining children — great for gating expensive checks like sandboxed test runs or LLM calls.
```python
from openenv.core.rubrics import Sequential
reward = Sequential(
Gate(CompilesRubric(), threshold=1.0), # skip everything if it doesn't compile
Gate(TestsPassRubric(), threshold=0.5), # and skip style if tests are failing
WeightedSum([TestsPassRubric(), StyleRubric()], [0.7, 0.3]),
)
```
### `RubricList` and `RubricDict` — dynamic dispatch
When the right rubric depends on the current observation (e.g. one rubric per game in a multi-game environment), wrap the options in a `RubricList` or `RubricDict` and dispatch in your parent rubric's `forward`.
```python
from openenv.core.rubrics import Rubric, RubricDict
class MultiGameRubric(Rubric):
def __init__(self):
super().__init__()
self.games = RubricDict({
"pong": PongRubric(),
"breakout": BreakoutRubric(),
})
def forward(self, action, observation) -> float:
return self.games[observation.game_id](action, observation)
```
`RubricList` and `RubricDict` do not aggregate on their own — calling them directly raises. Their job is auto-registration (so their children show up in `named_rubrics()`) and indexed access. Reach for them when the parent rubric needs to pick a child *at runtime* based on the observation — if the set of children is fixed, plain attributes are simpler.
### Introspection: `named_rubrics()`
Assigning a child rubric as an attribute auto-registers it with the parent. Training code can then walk the tree:
```python
composite = WeightedSum(
[Gate(CompilesRubric(), 1.0), TestsPassRubric(), StyleRubric()],
[0.2, 0.5, 0.3],
)
for name, child in composite.named_rubrics():
print(f"{name:30s} last_score={child.last_score}")
```
After running the composite once, every component's most recent score is cached on `last_score` — no manual bookkeeping.
## LLM-as-judge: `LLMJudge`
When a criterion is too subjective for a handwritten heuristic ("is this argument persuasive?", "is this explanation clear?"), use an LLM as the judge. `LLMJudge` wraps an `LLMClient` with a prompt template and a score extractor.
Any OpenAI-compatible endpoint works: hosted OpenAI / Anthropic, or open-weight models served through vLLM, Ollama, Hugging Face Inference Providers, etc. Pick a client and hand it to `LLMJudge`:
```python
import os
from openenv.core.llm_client import OpenAIClient, create_llm_client
from openenv.core.rubrics import LLMJudge
# Option 1 — hosted OpenAI (the factory also supports "anthropic").
client = create_llm_client(
"openai",
model="gpt-4.1-mini",
api_key=os.environ["OPENAI_API_KEY"],
)
# Option 2 — open-weight model served via a local OpenAI-compatible endpoint
# (vLLM, Ollama, Hugging Face Inference Providers, …). Point OpenAIClient
# at the base URL and the model id the server exposes. `api_key` is optional
# and defaults to "not-needed" for local endpoints.
client = OpenAIClient(
endpoint="http://localhost",
port=8000,
model="Qwen/Qwen3-1.7B",
)
clarity_judge = LLMJudge(
client=client,
prompt_template=(
"Rate the clarity of this explanation on a 0-10 scale. "
"Reply with the number only.\n\n"
"Explanation:\n{action}\n"
),
score_pattern=r"(\d+(?:\.\d+)?)",
normalize=True, # clamps extracted score to [0, 1]
)
```
`LLMJudge.forward` is async. When you put it inside `WeightedSum` or `Sequential`, the container awaits it transparently. A few caveats worth stating up front:
- **Cost and latency** scale with the number of episodes and the number of rubric calls per step. `Sequential` + `Gate` earlier in the pipeline is the usual answer.
- **Determinism** is not free. Cache scores when you can, and consider temperature 0 for repeatable eval runs.
- **API keys** belong in environment variables (`OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, …), not in code that ships to the Hub.
## Delayed Rewards: `TrajectoryRubric`
Some signals only materialise at the end of an episode — chess win/loss, unit-test suite success, a goal reached after many steps. `TrajectoryRubric` accumulates `(action, observation)` pairs internally and only invokes your scoring logic on the terminal observation.
```python
from openenv.core.rubrics import TrajectoryRubric
class WinLossRubric(TrajectoryRubric):
def score_trajectory(self, trajectory) -> float:
_, final_obs = trajectory[-1]
return final_obs.reward # +1 win, -1 loss, 0 draw
def compute_step_rewards(self):
# Credit assignment: distribute the final score across steps however you like.
final = self.score_trajectory(self._trajectory)
return [final] * len(self._trajectory)
```
`forward(action, obs)` returns `intermediate_reward` (default `0.0`) until `observation.done` is `True`, then calls `score_trajectory`. After the episode ends, call `rubric.compute_step_rewards()` to get one reward per step — same length as the trajectory. This is the hook for credit assignment: training code feeds these per-step rewards back into advantage estimation, return-to-go, or whatever your optimizer expects. `ExponentialDiscountingTrajectoryRubric` precomputes `gamma^(T-1-t) * final_score` for you; override `compute_step_rewards` in your subclass if you want a different strategy (all-to-last, equal split, task-specific shaping).
> [!CAUTION]
> If `observation.done` never becomes `True`, `score_trajectory` is never called and the trajectory grows unbounded in memory. Make sure `step` flips `done` on every terminal transition, and call `self._reset_rubric()` in `Environment.reset` so trajectories do not leak across episodes.
For the common exponentially-discounted case, subclass `ExponentialDiscountingTrajectoryRubric` instead and only implement `score_trajectory`:
```python
from openenv.core.rubrics import ExponentialDiscountingTrajectoryRubric
class ChessOutcomeRubric(ExponentialDiscountingTrajectoryRubric):
def score_trajectory(self, trajectory) -> float:
_, final_obs = trajectory[-1]
return final_obs.reward # already +1 / 0 / -1 from the engine
```
This is exactly the pattern the built-in `envs/chess_env/` uses — see `envs/chess_env/server/rubrics.py` for the complete real-world example.
> [!CAUTION]
> The `TrajectoryRubric` keeps the trajectory in CPU memory. If your observation carries GPU tensors (images, embeddings), detach and move them to CPU before returning from `step()` — otherwise the trajectory holds onto GPU memory across the whole episode.
## Wiring a Rubric into an `Environment`
Rubrics are **server-side**. Each environment declares its rubric in `__init__`, and `step` runs it via the `_apply_rubric` helper. The base `Environment` class accepts the rubric through its constructor and stores it as `self.rubric`.
Here is a complete minimal environment that composes a `Sequential` gate-then-`WeightedSum` pipeline and exposes the reward through its observation:
```python
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import Action, Observation, State
from openenv.core.rubrics import Gate, Rubric, Sequential, WeightedSum
class CodeAction(Action):
code: str
class CodeObservation(Observation):
compiles: bool = False
tests_passed: int = 0
tests_total: int = 0
class CodeState(State):
attempts: int = 0
class CompilesRubric(Rubric):
def forward(self, action, observation) -> float:
return 1.0 if observation.compiles else 0.0
class TestsPassRubric(Rubric):
def forward(self, action, observation) -> float:
if observation.tests_total == 0:
return 0.0
return observation.tests_passed / observation.tests_total
class StyleRubric(Rubric):
def forward(self, action, observation) -> float:
return 1.0 if action.code.count("\n\n\n") == 0 else 0.6
def build_code_rubric() -> Rubric:
return Sequential(
Gate(CompilesRubric(), threshold=1.0), # gate everything on compilation
WeightedSum(
[
TestsPassRubric(),
StyleRubric(),
],
weights=[0.7, 0.3],
),
)
class CodeEnvironment(Environment[CodeAction, CodeObservation, CodeState]):
def __init__(self):
super().__init__(rubric=build_code_rubric())
self._state = CodeState()
def reset(self, seed=None, episode_id=None, **kwargs) -> CodeObservation:
self._reset_rubric() # clear any trajectory / cached last_score
self._state = CodeState()
return CodeObservation()
def step(self, action: CodeAction, timeout_s=None, **kwargs) -> CodeObservation:
self._state.attempts += 1
obs = self._run_code(action) # your domain-specific execution
obs.reward = self._apply_rubric(action, obs)
return obs
@property
def state(self) -> CodeState:
return self._state
def _run_code(self, action: CodeAction) -> CodeObservation:
# Placeholder for whatever your environment actually does.
compiles = "def " in action.code
return CodeObservation(
compiles=compiles,
tests_passed=3 if compiles else 0,
tests_total=3,
)
```
The three pieces the base class expects from you:
1. **Pass the rubric to `super().__init__(rubric=...)`** so `self.rubric` is set.
2. **Call `self._reset_rubric()` from `reset`** so trajectory state does not leak between episodes.
3. **Call `self._apply_rubric(action, obs)` from `step`** and attach the result to `obs.reward`. There is also `_apply_rubric_async` for `step_async`.
> [!NOTE]
> Some environments already compute `obs.reward` from game mechanics or a handcrafted multi-component signal (see `envs/chess_env/` and `envs/carla_env/`). In that case, call `self._apply_rubric(action, obs)` without assigning its return value — the rubric still accumulates the trajectory for `compute_step_rewards()` and still exposes per-component scores via `named_rubrics()`, but `obs.reward` stays authoritative.
### Inspecting rewards from training code
Because children are auto-registered, the training loop can walk the rubric tree and log component-level diagnostics without the environment exposing a custom API:
```python
env = CodeEnvironment()
obs = env.reset()
obs = env.step(CodeAction(code="def solution(): return 42"))
for name, component in env.rubric.named_rubrics():
print(f"{name:30s} last_score={component.last_score:.2f}")
```
That snippet works for *any* OpenEnv environment that sets `self.rubric`, regardless of whether the rubric is a single scalar or a deeply nested composition.
### Where the reward ends up during training
Training frameworks consume the reward through the same channel as any other OpenEnv observation field: `step()` returns an `Observation` whose `reward` is the rubric's output, and the client delivers it via `result.reward`.
With [TRL](https://huggingface.co/docs/trl/main/en/openenv), the recommended path is `GRPOTrainer`'s `environment_factory`: you define a thin wrapper class with tool methods that call the OpenEnv client, store `self.reward = result.observation.reward` after each step, and a plain reward function reads it off the `environments` parameter. The [TRL OpenEnv integration guide](https://huggingface.co/docs/trl/main/en/openenv) has the full recipe, and [`examples/scripts/openenv/`](https://github.com/huggingface/trl/tree/main/examples/scripts/openenv) ships ready-to-run scripts. The same observation shape works with [torchforge](https://github.com/pytorch-labs/torchforge) and other OpenEnv-compatible training stacks.
`named_rubrics()` is orthogonal: use it to **log per-component scores** (to Weights & Biases, TensorBoard, trackio, …) while training, without changing the reward the optimiser sees.
## Using Rubrics for Evaluation
A rubric is just a callable — nothing forces you to run it inside a training loop. Drop it into a for-loop over a static dataset and you have a multi-criteria scoring function for offline eval:
```python
rubric = build_code_rubric()
scores = []
for action, obs in eval_dataset:
scores.append(rubric(action, obs))
print(f"mean reward: {sum(scores) / len(scores):.3f}")
for name, component in rubric.named_rubrics():
print(f" {name:30s} last_score={component.last_score:.3f}")
```
The same rubric object used to compute training rewards doubles as the eval metric — one source of truth for "what is a good response". Per-component `last_score` gives you a per-criterion breakdown for free (useful for regression dashboards and failure analysis). When a component like `LLMJudge` is async, wrap the loop with `asyncio.run(...)` and `await rubric(action, obs)` so the judge calls can overlap.
## Next Steps
- **Real-world trajectory example** — walk through `envs/chess_env/server/rubrics.py` and `chess_environment.py` to see `ExponentialDiscountingTrajectoryRubric` wired into a game environment.
- **Design details** — [RFC 004](https://github.com/huggingface/OpenEnv/blob/main/rfcs/004-rubrics.md) covers the rationale for the composable API and the "rewards inside the environment" invariant.
- **Reward design basics** — the [Reward Design](../guides/rewards) guide covers sparse-vs-dense signals and common pitfalls that still apply on top of any rubric composition.
- **Training loop integration** — see the [RL Framework Integration](../guides/rl-integration) guide and the [TRL OpenEnv integration guide](https://huggingface.co/docs/trl/main/en/openenv) for the recommended `environment_factory` pattern.

Xet Storage Details

Size:
18.5 kB
·
Xet hash:
98947de34fac74e9598062f4d034a85616a4eb3766eed7e4f8bedc155c993a3d

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.