""" tests/test_environment.py -- Unit + integration tests for HypothesisLab. Run with: pytest tests/ -v """ import math import pytest from models import ActionType, ExperimentType, HypLabAction, NoiseLevelTag from server.causal_world import generate_world, CausalWorld, CausalRule, InteractionRule from server.rubric import InfoGainTracker, score_hypothesis from server.hypothesis_lab_environment import HypothesisLabEnvironment from tasks.task_easy import grade_easy from tasks.task_medium import grade_medium from tasks.task_hard import grade_hard class TestCausalWorld: def test_generate_world_returns_correct_n_variables(self): for n in [2, 3, 4]: world = generate_world(n_variables=n, domain="system_alpha", seed=42) assert len(world.variables) == n def test_all_domains_generate_without_error(self): for domain in ["system_alpha", "system_beta", "system_gamma", "system_delta"]: world = generate_world(n_variables=3, domain=domain, seed=0) assert world.domain == domain assert len(world.rules) >= 1 def test_linear_rule_evaluation(self): rule = CausalRule( cause="X", effect="Y", rule_type="linear", params={"a": 2.0, "b": 3.0}, description="Y = 2.0 * X + 3.0", ) assert rule.evaluate(0) == pytest.approx(3.0) assert rule.evaluate(5) == pytest.approx(13.0) def test_inverse_rule_avoids_division_by_zero(self): rule = CausalRule( cause="X", effect="Y", rule_type="inverse", params={"a": 10.0}, description="Y = 10 / X", ) result = rule.evaluate(0) assert math.isnan(result) def test_intervention_with_noise_is_noisy(self): world = generate_world(n_variables=2, domain="system_alpha", seed=1) cause, effect = world.variables[0], world.variables[1] results = [world.query_intervention(cause, 5.0, effect, sigma=0.5) for _ in range(20)] unique = len(set(results)) assert unique > 1, "Noisy results should not be identical" def test_correlation_returns_correct_n_points(self): world = generate_world(n_variables=2, domain="system_beta", seed=2) cause, effect = world.variables[0], world.variables[1] pairs = world.query_correlation(cause, [1.0, 10.0, 5.0], effect, sigma=0.0) assert len(pairs) == 5 def test_ground_truth_summary_contains_all_variables(self): world = generate_world(n_variables=3, domain="system_gamma", seed=3) summary = world.ground_truth_summary() for v in world.variables: assert v in summary def test_seed_reproducibility(self): world1 = generate_world(n_variables=3, domain="system_alpha", seed=99) world2 = generate_world(n_variables=3, domain="system_alpha", seed=99) assert world1.variables == world2.variables assert world1.rules[0].rule_type == world2.rules[0].rule_type def test_quadratic_rule_evaluation(self): rule = CausalRule( cause="X", effect="Y", rule_type="quadratic", params={"a": 0.5, "b": 1.0, "c": 2.0}, description="Y = 0.5*X^2 + 1.0*X + 2.0", ) assert rule.evaluate(0) == pytest.approx(2.0) assert rule.evaluate(4) == pytest.approx(0.5*16 + 4 + 2) def test_exponential_rule_evaluation(self): rule = CausalRule( cause="X", effect="Y", rule_type="exponential", params={"a": 2.0, "k": 0.0}, description="Y = 2 * exp(0 * X)", ) assert rule.evaluate(5) == pytest.approx(2.0) def test_logarithmic_rule_nan_for_zero(self): rule = CausalRule( cause="X", effect="Y", rule_type="logarithmic", params={"a": 3.0, "b": 0.0}, ) assert math.isnan(rule.evaluate(0)) def test_saturating_rule_approaches_vmax(self): rule = CausalRule( cause="X", effect="Y", rule_type="saturating", params={"v_max": 10.0, "k_m": 2.0}, ) assert rule.evaluate(1000) == pytest.approx(10.0, abs=0.1) assert rule.evaluate(2.0) == pytest.approx(5.0, abs=0.01) def test_piecewise_linear_changes_slope(self): rule = CausalRule( cause="X", effect="Y", rule_type="piecewise_linear", params={"knot": 5.0, "a1": 2.0, "a2": -1.0, "b": 0.0}, ) assert rule.evaluate(3) == pytest.approx(6.0) assert rule.evaluate(7) == pytest.approx(10.0 + (-1.0) * 2) def test_interaction_rule_multiplicative(self): inter = InteractionRule( cause1="X", cause2="Y", effect="Z", interaction_type="multiplicative", params={"a": 0.5}, ) assert inter.evaluate(4.0, 6.0) == pytest.approx(12.0) def test_interaction_rule_min(self): inter = InteractionRule( cause1="X", cause2="Y", effect="Z", interaction_type="min", ) assert inter.evaluate(3.0, 7.0) == pytest.approx(3.0) def test_diverse_rule_types_generated_over_many_seeds(self): """Over many seeds we should see more than 3 distinct rule types.""" types_seen: set[str] = set() for seed in range(100): world = generate_world(n_variables=3, seed=seed) for rule in world.rules: types_seen.add(rule.rule_type) assert len(types_seen) >= 5, f"Only saw {types_seen}" def test_delta_domain_works(self): world = generate_world(n_variables=3, domain="system_delta", seed=42) assert world.domain == "system_delta" assert len(world.rules) >= 1 def test_variable_names_are_abstract(self): """Variables should NOT be real-world names that give LLMs prior knowledge.""" real_world_names = { "temperature", "pressure", "volume", "density", "entropy", "price", "demand", "supply", "wage", "inflation", "genea", "proteinb", "enzymec", "concentration", "ph", } for seed in range(50): world = generate_world(n_variables=4, seed=seed) for v in world.variables: assert v.lower() not in real_world_names, ( f"Variable '{v}' is a real-world name that gives LLM agents prior knowledge" ) class TestInfoGainTracker: def test_first_experiment_gives_positive_reward(self): tracker = InfoGainTracker() reward, is_redundant = tracker.record_and_score("A", "B", "intervention", 1.0) assert reward > 0 assert not is_redundant def test_repeated_experiments_become_redundant(self): tracker = InfoGainTracker() for _ in range(4): reward, is_redundant = tracker.record_and_score("A", "B", "intervention", 1.0) assert is_redundant assert reward < 0 def test_different_exp_type_gives_triangulation_bonus(self): tracker = InfoGainTracker() tracker.record_and_score("A", "B", "intervention", 1.0) reward2, _ = tracker.record_and_score("A", "B", "correlation", [1, 5, 3]) assert reward2 >= 0.25 def test_cumulative_gain_accumulates(self): tracker = InfoGainTracker() tracker.record_and_score("A", "B", "intervention", 1.0) tracker.record_and_score("B", "C", "intervention", 2.0) assert tracker.cumulative_gain > 0 class TestRubric: def _make_linear_world(self): import numpy as np rule = CausalRule( cause="Alpha", effect="Beta", rule_type="linear", params={"a": 2.0, "b": 3.0}, description="Beta = 2.0 * Alpha + 3.0", ) return CausalWorld( domain="system_alpha", variables=["Alpha", "Beta"], units={"Alpha": "units", "Beta": "units"}, rules=[rule], default_values={"Alpha": 5.0, "Beta": 13.0}, rng=np.random.default_rng(0), ) def test_perfect_linear_hypothesis_scores_high(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="Beta = 2.0 * Alpha + 3.0. Linear relationship.", hypothesis_equations=["Beta = 2.0 * Alpha + 3.0"], confidence=0.9, world=world, budget_remaining=3, budget_total=10, ) assert result.accuracy_score >= 0.70 def test_empty_hypothesis_scores_zero(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="", hypothesis_equations=None, confidence=None, world=world, budget_remaining=0, budget_total=10, ) assert result.accuracy_score < 0.10 def test_efficiency_bonus_for_early_submit(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="Beta = 2.0 * Alpha + 3.0", hypothesis_equations=["Beta = 2.0 * Alpha + 3.0"], confidence=0.9, world=world, budget_remaining=5, budget_total=10, ) assert result.efficiency_bonus > 0.0 def test_no_efficiency_bonus_when_budget_exhausted(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="Beta = 2.0 * Alpha + 3.0", hypothesis_equations=None, confidence=0.9, world=world, budget_remaining=0, budget_total=10, ) assert result.efficiency_bonus == 0.0 def test_overconfident_calibration_penalised(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="I have no idea", hypothesis_equations=None, confidence=0.99, world=world, budget_remaining=0, budget_total=10, ) assert result.calibration_score <= 0.05 def test_feedback_text_is_not_empty(self): world = self._make_linear_world() result = score_hypothesis( hypothesis_text="Alpha causes Beta to increase linearly", hypothesis_equations=None, confidence=0.7, world=world, budget_remaining=2, budget_total=10, ) assert len(result.feedback) > 10 class TestEnvironmentIntegration: def test_full_episode_with_submit(self): env = HypothesisLabEnvironment() obs = env.reset(seed=42, noise_level="low", domain="physics") assert obs.budget_remaining > 0 assert len(obs.available_variables) >= 2 assert not obs.done vars_ = obs.available_variables action = HypLabAction( action_type=ActionType.EXPERIMENT, experiment_type=ExperimentType.INTERVENTION, control_variable=vars_[0], target_variable=vars_[1], control_value=5.0, ) obs = env.step(action) assert obs.result_value is not None assert not obs.done submit = HypLabAction( action_type=ActionType.SUBMIT, hypothesis_text=f"{vars_[1]} is linearly related to {vars_[0]}", hypothesis_equations=[f"{vars_[1]} = a * {vars_[0]} + b"], confidence=0.6, ) obs = env.step(submit) assert obs.done assert obs.total_episode_reward is not None assert obs.ground_truth_revealed is not None def test_budget_exhaustion_ends_episode(self): env = HypothesisLabEnvironment() obs = env.reset(seed=42, noise_level="low") budget = obs.budget_remaining vars_ = obs.available_variables for _ in range(budget): if obs.done: break action = HypLabAction( action_type=ActionType.EXPERIMENT, experiment_type=ExperimentType.INTERVENTION, control_variable=vars_[0], target_variable=vars_[1], control_value=5.0, ) obs = env.step(action) assert obs.done or obs.budget_remaining == 0 def test_redundant_experiment_gets_penalty(self): env = HypothesisLabEnvironment() obs = env.reset(seed=42, noise_level="low") vars_ = obs.available_variables action = HypLabAction( action_type=ActionType.EXPERIMENT, experiment_type=ExperimentType.INTERVENTION, control_variable=vars_[0], target_variable=vars_[1], control_value=5.0, ) for _ in range(4): obs = env.step(action) if obs.done: break assert obs.is_redundant assert obs.info_gain_reward < 0 def test_invalid_variable_returns_error(self): env = HypothesisLabEnvironment() env.reset(seed=42, noise_level="low") action = HypLabAction( action_type=ActionType.EXPERIMENT, experiment_type=ExperimentType.INTERVENTION, control_variable="NONEXISTENT_VAR", target_variable="ALSO_NONEXISTENT", control_value=5.0, ) obs = env.step(action) assert "Error" in obs.system_message or "Unknown" in obs.system_message def test_state_does_not_leak_hidden_world(self): env = HypothesisLabEnvironment() env.reset(seed=42, noise_level="low") st = env.state state_str = str(st.model_dump()) assert "rule_type" not in state_str assert "params" not in state_str def test_multiple_domains_all_work(self): for domain in ["system_alpha", "system_beta", "system_gamma", "system_delta"]: env = HypothesisLabEnvironment() obs = env.reset(seed=42, domain=domain, noise_level="medium") assert not obs.done assert obs.budget_remaining > 0 class TestGraders: def test_grader_easy_returns_valid_range(self): score = grade_easy({ "accuracy_score": 0.8, "efficiency_bonus": 0.15, "calibration_score": 0.20, }) assert 0.0 <= score <= 1.0 def test_grader_medium_returns_valid_range(self): score = grade_medium({ "accuracy_score": 0.5, "precision_bonus": 0.10, "efficiency_bonus": 0.07, "calibration_score": 0.10, }) assert 0.0 <= score <= 1.0 def test_grader_hard_returns_valid_range(self): score = grade_hard({ "accuracy_score": 0.3, "precision_bonus": 0.0, "efficiency_bonus": 0.0, "calibration_score": 0.05, "contradiction_penalty": 0.0, }) assert 0.0 <= score <= 1.0 def test_grader_zero_input_returns_zero(self): score = grade_easy({}) assert score == 0.0 def test_grader_perfect_input_returns_one(self): score = grade_easy({ "accuracy_score": 1.0, "efficiency_bonus": 0.15, "calibration_score": 0.20, }) assert score == pytest.approx(1.0)