Spaces:
Sleeping
Sleeping
| """ | |
| tests/test_environment.py -- Unit + integration tests for HypothesisLab. | |
| Run with: pytest tests/ -v | |
| """ | |
| import math | |
| import pytest | |
| from models import ActionType, ExperimentType, HypLabAction, NoiseLevelTag | |
| from server.causal_world import generate_world, CausalWorld, CausalRule, InteractionRule | |
| from server.rubric import InfoGainTracker, score_hypothesis | |
| from server.hypothesis_lab_environment import HypothesisLabEnvironment | |
| from tasks.task_easy import grade_easy | |
| from tasks.task_medium import grade_medium | |
| from tasks.task_hard import grade_hard | |
| class TestCausalWorld: | |
| def test_generate_world_returns_correct_n_variables(self): | |
| for n in [2, 3, 4]: | |
| world = generate_world(n_variables=n, domain="system_alpha", seed=42) | |
| assert len(world.variables) == n | |
| def test_all_domains_generate_without_error(self): | |
| for domain in ["system_alpha", "system_beta", "system_gamma", "system_delta"]: | |
| world = generate_world(n_variables=3, domain=domain, seed=0) | |
| assert world.domain == domain | |
| assert len(world.rules) >= 1 | |
| def test_linear_rule_evaluation(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="linear", | |
| params={"a": 2.0, "b": 3.0}, | |
| description="Y = 2.0 * X + 3.0", | |
| ) | |
| assert rule.evaluate(0) == pytest.approx(3.0) | |
| assert rule.evaluate(5) == pytest.approx(13.0) | |
| def test_inverse_rule_avoids_division_by_zero(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="inverse", | |
| params={"a": 10.0}, | |
| description="Y = 10 / X", | |
| ) | |
| result = rule.evaluate(0) | |
| assert math.isnan(result) | |
| def test_intervention_with_noise_is_noisy(self): | |
| world = generate_world(n_variables=2, domain="system_alpha", seed=1) | |
| cause, effect = world.variables[0], world.variables[1] | |
| results = [world.query_intervention(cause, 5.0, effect, sigma=0.5) for _ in range(20)] | |
| unique = len(set(results)) | |
| assert unique > 1, "Noisy results should not be identical" | |
| def test_correlation_returns_correct_n_points(self): | |
| world = generate_world(n_variables=2, domain="system_beta", seed=2) | |
| cause, effect = world.variables[0], world.variables[1] | |
| pairs = world.query_correlation(cause, [1.0, 10.0, 5.0], effect, sigma=0.0) | |
| assert len(pairs) == 5 | |
| def test_ground_truth_summary_contains_all_variables(self): | |
| world = generate_world(n_variables=3, domain="system_gamma", seed=3) | |
| summary = world.ground_truth_summary() | |
| for v in world.variables: | |
| assert v in summary | |
| def test_seed_reproducibility(self): | |
| world1 = generate_world(n_variables=3, domain="system_alpha", seed=99) | |
| world2 = generate_world(n_variables=3, domain="system_alpha", seed=99) | |
| assert world1.variables == world2.variables | |
| assert world1.rules[0].rule_type == world2.rules[0].rule_type | |
| def test_quadratic_rule_evaluation(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="quadratic", | |
| params={"a": 0.5, "b": 1.0, "c": 2.0}, | |
| description="Y = 0.5*X^2 + 1.0*X + 2.0", | |
| ) | |
| assert rule.evaluate(0) == pytest.approx(2.0) | |
| assert rule.evaluate(4) == pytest.approx(0.5*16 + 4 + 2) | |
| def test_exponential_rule_evaluation(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="exponential", | |
| params={"a": 2.0, "k": 0.0}, | |
| description="Y = 2 * exp(0 * X)", | |
| ) | |
| assert rule.evaluate(5) == pytest.approx(2.0) | |
| def test_logarithmic_rule_nan_for_zero(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="logarithmic", | |
| params={"a": 3.0, "b": 0.0}, | |
| ) | |
| assert math.isnan(rule.evaluate(0)) | |
| def test_saturating_rule_approaches_vmax(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="saturating", | |
| params={"v_max": 10.0, "k_m": 2.0}, | |
| ) | |
| assert rule.evaluate(1000) == pytest.approx(10.0, abs=0.1) | |
| assert rule.evaluate(2.0) == pytest.approx(5.0, abs=0.01) | |
| def test_piecewise_linear_changes_slope(self): | |
| rule = CausalRule( | |
| cause="X", effect="Y", | |
| rule_type="piecewise_linear", | |
| params={"knot": 5.0, "a1": 2.0, "a2": -1.0, "b": 0.0}, | |
| ) | |
| assert rule.evaluate(3) == pytest.approx(6.0) | |
| assert rule.evaluate(7) == pytest.approx(10.0 + (-1.0) * 2) | |
| def test_interaction_rule_multiplicative(self): | |
| inter = InteractionRule( | |
| cause1="X", cause2="Y", effect="Z", | |
| interaction_type="multiplicative", | |
| params={"a": 0.5}, | |
| ) | |
| assert inter.evaluate(4.0, 6.0) == pytest.approx(12.0) | |
| def test_interaction_rule_min(self): | |
| inter = InteractionRule( | |
| cause1="X", cause2="Y", effect="Z", | |
| interaction_type="min", | |
| ) | |
| assert inter.evaluate(3.0, 7.0) == pytest.approx(3.0) | |
| def test_diverse_rule_types_generated_over_many_seeds(self): | |
| """Over many seeds we should see more than 3 distinct rule types.""" | |
| types_seen: set[str] = set() | |
| for seed in range(100): | |
| world = generate_world(n_variables=3, seed=seed) | |
| for rule in world.rules: | |
| types_seen.add(rule.rule_type) | |
| assert len(types_seen) >= 5, f"Only saw {types_seen}" | |
| def test_delta_domain_works(self): | |
| world = generate_world(n_variables=3, domain="system_delta", seed=42) | |
| assert world.domain == "system_delta" | |
| assert len(world.rules) >= 1 | |
| def test_variable_names_are_abstract(self): | |
| """Variables should NOT be real-world names that give LLMs prior knowledge.""" | |
| real_world_names = { | |
| "temperature", "pressure", "volume", "density", "entropy", | |
| "price", "demand", "supply", "wage", "inflation", | |
| "genea", "proteinb", "enzymec", "concentration", "ph", | |
| } | |
| for seed in range(50): | |
| world = generate_world(n_variables=4, seed=seed) | |
| for v in world.variables: | |
| assert v.lower() not in real_world_names, ( | |
| f"Variable '{v}' is a real-world name that gives LLM agents prior knowledge" | |
| ) | |
| class TestInfoGainTracker: | |
| def test_first_experiment_gives_positive_reward(self): | |
| tracker = InfoGainTracker() | |
| reward, is_redundant = tracker.record_and_score("A", "B", "intervention", 1.0) | |
| assert reward > 0 | |
| assert not is_redundant | |
| def test_repeated_experiments_become_redundant(self): | |
| tracker = InfoGainTracker() | |
| for _ in range(4): | |
| reward, is_redundant = tracker.record_and_score("A", "B", "intervention", 1.0) | |
| assert is_redundant | |
| assert reward < 0 | |
| def test_different_exp_type_gives_triangulation_bonus(self): | |
| tracker = InfoGainTracker() | |
| tracker.record_and_score("A", "B", "intervention", 1.0) | |
| reward2, _ = tracker.record_and_score("A", "B", "correlation", [1, 5, 3]) | |
| assert reward2 >= 0.25 | |
| def test_cumulative_gain_accumulates(self): | |
| tracker = InfoGainTracker() | |
| tracker.record_and_score("A", "B", "intervention", 1.0) | |
| tracker.record_and_score("B", "C", "intervention", 2.0) | |
| assert tracker.cumulative_gain > 0 | |
| class TestRubric: | |
| def _make_linear_world(self): | |
| import numpy as np | |
| rule = CausalRule( | |
| cause="Alpha", effect="Beta", | |
| rule_type="linear", | |
| params={"a": 2.0, "b": 3.0}, | |
| description="Beta = 2.0 * Alpha + 3.0", | |
| ) | |
| return CausalWorld( | |
| domain="system_alpha", | |
| variables=["Alpha", "Beta"], | |
| units={"Alpha": "units", "Beta": "units"}, | |
| rules=[rule], | |
| default_values={"Alpha": 5.0, "Beta": 13.0}, | |
| rng=np.random.default_rng(0), | |
| ) | |
| def test_perfect_linear_hypothesis_scores_high(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="Beta = 2.0 * Alpha + 3.0. Linear relationship.", | |
| hypothesis_equations=["Beta = 2.0 * Alpha + 3.0"], | |
| confidence=0.9, | |
| world=world, | |
| budget_remaining=3, | |
| budget_total=10, | |
| ) | |
| assert result.accuracy_score >= 0.70 | |
| def test_empty_hypothesis_scores_zero(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="", | |
| hypothesis_equations=None, | |
| confidence=None, | |
| world=world, | |
| budget_remaining=0, | |
| budget_total=10, | |
| ) | |
| assert result.accuracy_score < 0.10 | |
| def test_efficiency_bonus_for_early_submit(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="Beta = 2.0 * Alpha + 3.0", | |
| hypothesis_equations=["Beta = 2.0 * Alpha + 3.0"], | |
| confidence=0.9, | |
| world=world, | |
| budget_remaining=5, | |
| budget_total=10, | |
| ) | |
| assert result.efficiency_bonus > 0.0 | |
| def test_no_efficiency_bonus_when_budget_exhausted(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="Beta = 2.0 * Alpha + 3.0", | |
| hypothesis_equations=None, | |
| confidence=0.9, | |
| world=world, | |
| budget_remaining=0, | |
| budget_total=10, | |
| ) | |
| assert result.efficiency_bonus == 0.0 | |
| def test_overconfident_calibration_penalised(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="I have no idea", | |
| hypothesis_equations=None, | |
| confidence=0.99, | |
| world=world, | |
| budget_remaining=0, | |
| budget_total=10, | |
| ) | |
| assert result.calibration_score <= 0.05 | |
| def test_feedback_text_is_not_empty(self): | |
| world = self._make_linear_world() | |
| result = score_hypothesis( | |
| hypothesis_text="Alpha causes Beta to increase linearly", | |
| hypothesis_equations=None, | |
| confidence=0.7, | |
| world=world, | |
| budget_remaining=2, | |
| budget_total=10, | |
| ) | |
| assert len(result.feedback) > 10 | |
| class TestEnvironmentIntegration: | |
| def test_full_episode_with_submit(self): | |
| env = HypothesisLabEnvironment() | |
| obs = env.reset(seed=42, noise_level="low", domain="physics") | |
| assert obs.budget_remaining > 0 | |
| assert len(obs.available_variables) >= 2 | |
| assert not obs.done | |
| vars_ = obs.available_variables | |
| action = HypLabAction( | |
| action_type=ActionType.EXPERIMENT, | |
| experiment_type=ExperimentType.INTERVENTION, | |
| control_variable=vars_[0], | |
| target_variable=vars_[1], | |
| control_value=5.0, | |
| ) | |
| obs = env.step(action) | |
| assert obs.result_value is not None | |
| assert not obs.done | |
| submit = HypLabAction( | |
| action_type=ActionType.SUBMIT, | |
| hypothesis_text=f"{vars_[1]} is linearly related to {vars_[0]}", | |
| hypothesis_equations=[f"{vars_[1]} = a * {vars_[0]} + b"], | |
| confidence=0.6, | |
| ) | |
| obs = env.step(submit) | |
| assert obs.done | |
| assert obs.total_episode_reward is not None | |
| assert obs.ground_truth_revealed is not None | |
| def test_budget_exhaustion_ends_episode(self): | |
| env = HypothesisLabEnvironment() | |
| obs = env.reset(seed=42, noise_level="low") | |
| budget = obs.budget_remaining | |
| vars_ = obs.available_variables | |
| for _ in range(budget): | |
| if obs.done: | |
| break | |
| action = HypLabAction( | |
| action_type=ActionType.EXPERIMENT, | |
| experiment_type=ExperimentType.INTERVENTION, | |
| control_variable=vars_[0], | |
| target_variable=vars_[1], | |
| control_value=5.0, | |
| ) | |
| obs = env.step(action) | |
| assert obs.done or obs.budget_remaining == 0 | |
| def test_redundant_experiment_gets_penalty(self): | |
| env = HypothesisLabEnvironment() | |
| obs = env.reset(seed=42, noise_level="low") | |
| vars_ = obs.available_variables | |
| action = HypLabAction( | |
| action_type=ActionType.EXPERIMENT, | |
| experiment_type=ExperimentType.INTERVENTION, | |
| control_variable=vars_[0], | |
| target_variable=vars_[1], | |
| control_value=5.0, | |
| ) | |
| for _ in range(4): | |
| obs = env.step(action) | |
| if obs.done: | |
| break | |
| assert obs.is_redundant | |
| assert obs.info_gain_reward < 0 | |
| def test_invalid_variable_returns_error(self): | |
| env = HypothesisLabEnvironment() | |
| env.reset(seed=42, noise_level="low") | |
| action = HypLabAction( | |
| action_type=ActionType.EXPERIMENT, | |
| experiment_type=ExperimentType.INTERVENTION, | |
| control_variable="NONEXISTENT_VAR", | |
| target_variable="ALSO_NONEXISTENT", | |
| control_value=5.0, | |
| ) | |
| obs = env.step(action) | |
| assert "Error" in obs.system_message or "Unknown" in obs.system_message | |
| def test_state_does_not_leak_hidden_world(self): | |
| env = HypothesisLabEnvironment() | |
| env.reset(seed=42, noise_level="low") | |
| st = env.state | |
| state_str = str(st.model_dump()) | |
| assert "rule_type" not in state_str | |
| assert "params" not in state_str | |
| def test_multiple_domains_all_work(self): | |
| for domain in ["system_alpha", "system_beta", "system_gamma", "system_delta"]: | |
| env = HypothesisLabEnvironment() | |
| obs = env.reset(seed=42, domain=domain, noise_level="medium") | |
| assert not obs.done | |
| assert obs.budget_remaining > 0 | |
| class TestGraders: | |
| def test_grader_easy_returns_valid_range(self): | |
| score = grade_easy({ | |
| "accuracy_score": 0.8, | |
| "efficiency_bonus": 0.15, | |
| "calibration_score": 0.20, | |
| }) | |
| assert 0.0 <= score <= 1.0 | |
| def test_grader_medium_returns_valid_range(self): | |
| score = grade_medium({ | |
| "accuracy_score": 0.5, | |
| "precision_bonus": 0.10, | |
| "efficiency_bonus": 0.07, | |
| "calibration_score": 0.10, | |
| }) | |
| assert 0.0 <= score <= 1.0 | |
| def test_grader_hard_returns_valid_range(self): | |
| score = grade_hard({ | |
| "accuracy_score": 0.3, | |
| "precision_bonus": 0.0, | |
| "efficiency_bonus": 0.0, | |
| "calibration_score": 0.05, | |
| "contradiction_penalty": 0.0, | |
| }) | |
| assert 0.0 <= score <= 1.0 | |
| def test_grader_zero_input_returns_zero(self): | |
| score = grade_easy({}) | |
| assert score == 0.0 | |
| def test_grader_perfect_input_returns_one(self): | |
| score = grade_easy({ | |
| "accuracy_score": 1.0, | |
| "efficiency_bonus": 0.15, | |
| "calibration_score": 0.20, | |
| }) | |
| assert score == pytest.approx(1.0) | |