| import json |
| from datasets import load_dataset |
|
|
| import verifiers as vf |
|
|
|
|
| def load_environment( |
| num_train_examples=7000, |
| num_eval_examples=1000, |
| **kwargs |
| ): |
| """ |
| Environment for verifying complex JSON output from models. |
| |
| The task requires models to: |
| 1. Parse multi-question prompts |
| 2. Generate valid JSON responses |
| 3. Match the expected structure with correct keys and values |
| |
| Reward structure (multiplicative to prevent local minima): |
| - If JSON fails to parse: reward = 0 |
| - Otherwise: |
| * key_accuracy = (correct_keys) / (total_keys_in_response) |
| * value_accuracy = (correct_values) / (total_values_in_response) |
| * final_reward = key_accuracy * value_accuracy |
| |
| This penalizes both missing keys/values AND adding extra incorrect ones. |
| """ |
| |
| |
| dataset = load_dataset("Delta-Vector/Tauri-Complex-JSON-Formatting", split="train") |
| |
| |
| def format_example(example): |
| return { |
| "question": example["prompt"], |
| "info": {"verification_info": example["verification_info"]}, |
| } |
| |
| dataset = dataset.map(format_example, remove_columns=dataset.column_names) |
| |
| |
| train_dataset = dataset.select(range(num_train_examples)) |
| eval_dataset = dataset.select(range(num_train_examples, num_train_examples + num_eval_examples)) |
| |
| |
| def extract_json_from_completion(completion): |
| """Extract JSON from completion, handling code blocks.""" |
| if not completion: |
| return "" |
| |
| |
| if isinstance(completion, list) and len(completion) > 0: |
| content = completion[-1].get("content", "") |
| else: |
| content = str(completion) |
| |
| |
| import re |
| code_block_pattern = r"```(?:json)?\s*\n(.*?)\n```" |
| matches = re.findall(code_block_pattern, content, re.DOTALL) |
| if matches: |
| return matches[-1].strip() |
| |
| |
| return content.strip() |
| |
| |
| parser = vf.Parser(extract_fn=extract_json_from_completion) |
| |
| def multiplicative_reward(completion, info, **kwargs) -> float: |
| """ |
| Multiplicative reward: key_accuracy * value_accuracy. |
| |
| Returns 0 if JSON fails to parse. |
| Otherwise: |
| - key_accuracy = (correct_keys) / (total_keys_in_response) |
| - value_accuracy = (correct_values) / (total_values_in_response) |
| - final_reward = key_accuracy * value_accuracy |
| |
| This penalizes both missing correct items AND adding extra incorrect ones. |
| """ |
| try: |
| response = parser.parse_answer(completion) or "" |
| response = response.strip() |
| |
| |
| if not response: |
| return 0.0 |
| |
| try: |
| parsed_response = json.loads(response) |
| except (json.JSONDecodeError, ValueError): |
| return 0.0 |
| |
| |
| if not isinstance(parsed_response, dict): |
| return 0.0 |
| |
| |
| verification_info = json.loads(info["verification_info"]) |
| ground_truth = verification_info["ground_truth"] |
| |
| |
| def get_all_keys(d, prefix=""): |
| keys = set() |
| if isinstance(d, dict): |
| for k, v in d.items(): |
| full_key = f"{prefix}.{k}" if prefix else k |
| keys.add(full_key) |
| keys.update(get_all_keys(v, full_key)) |
| return keys |
| |
| |
| def get_all_values(d): |
| values = [] |
| if isinstance(d, dict): |
| for v in d.values(): |
| if isinstance(v, dict): |
| values.extend(get_all_values(v)) |
| elif isinstance(v, list): |
| values.extend(get_all_values({"_": item} for item in v)) |
| else: |
| values.append(v) |
| return values |
| |
| ground_truth_keys = get_all_keys(ground_truth) |
| response_keys = get_all_keys(parsed_response) |
| |
| |
| if len(response_keys) == 0: |
| key_accuracy = 0.0 |
| else: |
| correct_keys = len(ground_truth_keys & response_keys) |
| key_accuracy = correct_keys / len(response_keys) |
| |
| |
| def get_value_at_path(d, path): |
| """Get value at a specific key path like 'a.b.c'""" |
| keys = path.split('.') |
| current = d |
| try: |
| for key in keys: |
| current = current[key] |
| return current |
| except (KeyError, TypeError): |
| return None |
| |
| |
| def values_equal(a, b): |
| """Compare values with numeric type tolerance (25 == 25.0)""" |
| |
| if isinstance(a, (int, float)) and isinstance(b, (int, float)): |
| return a == b |
| |
| return a == b |
| |
| |
| common_keys = ground_truth_keys & response_keys |
| total_values_checked = len(response_keys) |
| |
| if total_values_checked == 0: |
| value_accuracy = 0.0 |
| else: |
| correct_values = 0 |
| for key_path in response_keys: |
| response_val = get_value_at_path(parsed_response, key_path) |
| ground_truth_val = get_value_at_path(ground_truth, key_path) |
| |
| |
| if ground_truth_val is not None and values_equal(response_val, ground_truth_val): |
| correct_values += 1 |
| |
| value_accuracy = correct_values / total_values_checked |
| |
| |
| final_reward = key_accuracy * value_accuracy |
| return final_reward |
| |
| except (AttributeError, TypeError, KeyError) as e: |
| return 0.0 |
| |
| def format_reward(completion, **kwargs) -> float: |
| """ |
| Reward for valid JSON formatting. |
| Returns 0.33 for valid JSON dict, 0 for invalid. |
| """ |
| try: |
| response = parser.parse_answer(completion) or "" |
| response = response.strip() |
| |
| |
| if not response: |
| return 0.0 |
| |
| |
| parsed = json.loads(response) |
| |
| |
| if not isinstance(parsed, dict): |
| return 0.0 |
| |
| return 0.33 |
| except (json.JSONDecodeError, ValueError, TypeError): |
| return 0.0 |
| |
| def keys_match_reward(completion, info, **kwargs) -> float: |
| """ |
| Metric: key accuracy (correct_keys / total_keys_in_response). |
| Returns the same key_accuracy used in multiplicative_reward. |
| """ |
| try: |
| response = parser.parse_answer(completion) or "" |
| response = response.strip() |
| |
| if not response: |
| return 0.0 |
| |
| parsed_response = json.loads(response) |
| |
| if not isinstance(parsed_response, dict): |
| return 0.0 |
| |
| |
| verification_info = json.loads(info["verification_info"]) |
| ground_truth = verification_info["ground_truth"] |
| |
| |
| def get_all_keys(d, prefix=""): |
| keys = set() |
| if isinstance(d, dict): |
| for k, v in d.items(): |
| full_key = f"{prefix}.{k}" if prefix else k |
| keys.add(full_key) |
| keys.update(get_all_keys(v, full_key)) |
| return keys |
| |
| ground_truth_keys = get_all_keys(ground_truth) |
| response_keys = get_all_keys(parsed_response) |
| |
| if len(response_keys) == 0: |
| return 0.0 |
| |
| correct_keys = len(ground_truth_keys & response_keys) |
| return correct_keys / len(response_keys) |
| |
| except (json.JSONDecodeError, ValueError, AttributeError, TypeError): |
| return 0.0 |
| |
| def values_match_reward(completion, info, **kwargs) -> float: |
| """ |
| Metric: value accuracy (correct_values / total_values_in_response). |
| Returns the same value_accuracy used in multiplicative_reward. |
| """ |
| try: |
| response = parser.parse_answer(completion) or "" |
| response = response.strip() |
| |
| if not response: |
| return 0.0 |
| |
| parsed_response = json.loads(response) |
| |
| if not isinstance(parsed_response, dict): |
| return 0.0 |
| |
| |
| verification_info = json.loads(info["verification_info"]) |
| ground_truth = verification_info["ground_truth"] |
| |
| |
| def values_equal(a, b): |
| if isinstance(a, (int, float)) and isinstance(b, (int, float)): |
| return a == b |
| return a == b |
| |
| |
| def get_all_keys(d, prefix=""): |
| keys = set() |
| if isinstance(d, dict): |
| for k, v in d.items(): |
| full_key = f"{prefix}.{k}" if prefix else k |
| keys.add(full_key) |
| keys.update(get_all_keys(v, full_key)) |
| return keys |
| |
| def get_value_at_path(d, path): |
| keys = path.split('.') |
| current = d |
| try: |
| for key in keys: |
| current = current[key] |
| return current |
| except (KeyError, TypeError): |
| return None |
| |
| response_keys = get_all_keys(parsed_response) |
| |
| if len(response_keys) == 0: |
| return 0.0 |
| |
| correct_values = 0 |
| for key_path in response_keys: |
| response_val = get_value_at_path(parsed_response, key_path) |
| ground_truth_val = get_value_at_path(ground_truth, key_path) |
| |
| if ground_truth_val is not None and values_equal(response_val, ground_truth_val): |
| correct_values += 1 |
| |
| return correct_values / len(response_keys) |
| |
| except (json.JSONDecodeError, ValueError, AttributeError, TypeError): |
| return 0.0 |
| |
| |
| |
| rubric = vf.Rubric( |
| parser=parser, |
| funcs=[ |
| multiplicative_reward, |
| format_reward, |
| keys_match_reward, |
| values_match_reward, |
| ], |
| weights=[1.0, 0.0, 0.0, 0.0] |
| ) |
| |
| |
| |
| vf_env = vf.SingleTurnEnv( |
| dataset=train_dataset, |
| eval_dataset=eval_dataset, |
| parser=parser, |
| rubric=rubric, |
| ) |
| |
| return vf_env |
|
|