math500-bon-exercise / code /step2_sample_and_score.py
cmpatino's picture
cmpatino HF Staff
Upload code/step2_sample_and_score.py with huggingface_hub
02efe1b verified
"""
Step 2: Sample N=16 solutions per problem and score with Skywork PRM.
This script:
1. Loads the filtered problems from Step 1
2. Generates N=16 solutions per problem using temperature sampling
3. Loads the Skywork-o1-Open-PRM and scores each solution (last step prediction)
4. Saves all solutions + scores for the Best-of-N computation
The Skywork PRM is loaded using its custom PRM_MODEL class, which wraps
AutoModelForCausalLM with a ValueHead (linear projection to scalar).
The model outputs a sigmoid-normalized score in [0,1] at each step boundary.
Co-authored with Claude (Anthropic). I can explain all code logic.
"""
import json
import os
import sys
import torch
import subprocess
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Optional
# ──────────────────────────────────────────────────────────────────────────────
# Helper functions
# ──────────────────────────────────────────────────────────────────────────────
def extract_boxed_solution(text: str) -> Optional[str]:
"""Extract content of the last \\boxed{} in text."""
try:
start_index = text.rindex("\\boxed{")
content_start = start_index + 7
bracket_count = 1
current_pos = content_start
while bracket_count > 0 and current_pos < len(text):
if text[current_pos] == "{":
bracket_count += 1
elif text[current_pos] == "}":
bracket_count -= 1
current_pos += 1
if bracket_count == 0:
return text[content_start : current_pos - 1].strip()
return None
except (ValueError, Exception):
return None
# ──────────────────────────────────────────────────────────────────────────────
# Load filtered problems
# ──────────────────────────────────────────────────────────────────────────────
print("=" * 70)
print("STEP 2a: Loading problems and generating N=16 solutions per problem")
print("=" * 70)
with open("/Users/cmpatino/Projects/ml-intern/exercise/outputs/filtered_problems.json") as f:
problems_data = json.load(f)
print(f"Loaded {len(problems_data)} problems")
# ──────────────────────────────────────────────────────────────────────────────
# Generate N=16 solutions per problem with temperature sampling
# ──────────────────────────────────────────────────────────────────────────────
MODEL_ID = "Qwen/Qwen2.5-1.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.bfloat16,
device_map="auto",
)
SYSTEM_PROMPT = (
"You are a helpful math assistant. Solve the problem step by step, "
"showing your reasoning clearly. Put your final answer inside "
"\\boxed{answer} at the end of your solution."
)
N = 16 # Number of solutions per problem
TEMPERATURE = 0.7 # Sampling temperature — balances diversity vs quality
all_results = []
for i, p in enumerate(problems_data):
print(f"\n Problem {i+1}/{len(problems_data)}: {p['unique_id']} (Level {p['level']})")
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": p["problem"]},
]
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
solutions = []
for j in range(N):
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=2048,
do_sample=True,
temperature=TEMPERATURE,
top_p=0.95,
)
generated = output[0][inputs["input_ids"].shape[1]:]
solution_text = tokenizer.decode(generated, skip_special_tokens=True)
solutions.append(solution_text)
if (j + 1) % 4 == 0:
print(f" Generated {j+1}/{N} solutions")
result = {**p, "sampled_solutions": solutions}
all_results.append(result)
# Save solutions before scoring (in case PRM loading takes time or fails)
with open("/Users/cmpatino/Projects/ml-intern/exercise/outputs/sampled_solutions.json", "w") as f:
json.dump(all_results, f, indent=2)
print(f"\nSaved {N} solutions per problem to outputs/sampled_solutions.json")
# Free LLM memory before loading PRM
del model
torch.cuda.empty_cache()
print("Freed LLM memory.")
# ──────────────────────────────────────────────────────────────────────────────
# Score solutions with Skywork PRM
# ──────────────────────────────────────────────────────────────────────────────
print("\n" + "=" * 70)
print("STEP 2b: Scoring solutions with Skywork-o1-Open-PRM")
print("=" * 70)
# Clone the Skywork PRM inference repo for the custom model class
PRM_REPO_PATH = "/Users/cmpatino/Projects/ml-intern/exercise/skywork-o1-prm-inference"
if not os.path.exists(PRM_REPO_PATH):
print("Cloning Skywork PRM inference repo...")
subprocess.run(
["git", "clone", "https://github.com/SkyworkAI/skywork-o1-prm-inference.git", PRM_REPO_PATH],
check=True,
)
sys.path.insert(0, PRM_REPO_PATH)
from model_utils.prm_model import PRM_MODEL
from model_utils.io_utils import prepare_input, prepare_batch_input_for_model, derive_step_rewards
PRM_MODEL_ID = "Skywork/Skywork-o1-Open-PRM-Qwen-2.5-1.5B"
prm_tokenizer = AutoTokenizer.from_pretrained(PRM_MODEL_ID, trust_remote_code=True)
prm_model = PRM_MODEL.from_pretrained(PRM_MODEL_ID, device_map="auto").eval()
print("PRM model loaded successfully.")
def score_solution(problem: str, solution: str) -> list[float]:
"""
Score a single solution using the PRM.
Returns a list of per-step scores (sigmoid-normalized, [0,1]).
The last element is the 'last step prediction' — our final reward.
The PRM splits the solution by newlines (\n), and assigns a score
at the end of each step. These scores represent the model's estimate
of correctness probability at each reasoning step.
"""
input_ids, steps, reward_flags = prepare_input(problem, solution, prm_tokenizer, step_token="\n")
# Prepare batch of size 1
input_ids_t, attention_mask_t, reward_flags_t = prepare_batch_input_for_model(
[input_ids], [reward_flags], prm_tokenizer.pad_token_id
)
# Move to model device
device = next(prm_model.parameters()).device
input_ids_t = input_ids_t.to(device)
attention_mask_t = attention_mask_t.to(device)
reward_flags_t = reward_flags_t.to(device)
with torch.no_grad():
# return_probs=True applies sigmoid internally
_, _, rewards = prm_model(
input_ids=input_ids_t,
attention_mask=attention_mask_t,
return_probs=True,
)
step_rewards = derive_step_rewards(rewards, reward_flags_t)
return step_rewards[0] # Return the single sample's step scores
# Score all solutions
print("\nScoring all solutions...")
for i, result in enumerate(all_results):
print(f"\n Scoring problem {i+1}/{len(all_results)}: {result['unique_id']}")
scores = []
extracted_answers = []
for j, solution in enumerate(result["sampled_solutions"]):
# Get PRM score
step_scores = score_solution(result["problem"], solution)
# Use last step prediction as the final reward (per DeepMind Appendix E)
final_score = step_scores[-1] if step_scores else 0.0
scores.append(final_score)
# Extract the final answer from \boxed{}
answer = extract_boxed_solution(solution)
extracted_answers.append(answer)
if (j + 1) % 4 == 0:
print(f" Scored {j+1}/{N} solutions (last score: {final_score:.4f})")
result["prm_scores"] = scores
result["extracted_answers"] = extracted_answers
# Save scored results
with open("/Users/cmpatino/Projects/ml-intern/exercise/outputs/scored_results.json", "w") as f:
json.dump(all_results, f, indent=2)
print("\nSaved scored results to outputs/scored_results.json")
print("Ready for Step 3 (Best-of-N computation).")