"""End-to-end test script for the dce_analyzer backend. Run from project root: python scripts/test_e2e.py """ from __future__ import annotations import sys import traceback from pathlib import Path # Ensure src/ is importable ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT / "src")) import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _results: list[tuple[str, bool, str]] = [] def _run(name: str, fn): """Run *fn* and record PASS / FAIL.""" try: fn() _results.append((name, True, "")) print(f" PASS {name}") except Exception as exc: msg = f"{exc.__class__.__name__}: {exc}" _results.append((name, False, msg)) print(f" FAIL {name}") traceback.print_exc() print() # =================================================================== # 1. Import all backend modules # =================================================================== def test_imports(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.simulate import generate_simulated_dce from dce_analyzer.data import prepare_choice_tensors, ChoiceTensors from dce_analyzer.model import ( MixedLogitEstimator, ConditionalLogitEstimator, EstimationResult, ) from dce_analyzer.latent_class import LatentClassEstimator, LatentClassResult from dce_analyzer.pipeline import estimate_dataframe, PipelineResult from dce_analyzer.wtp import compute_wtp from dce_analyzer.bootstrap import run_bootstrap, BootstrapResult from dce_analyzer.format_converter import ( detect_format, wide_to_long, infer_structure, normalize_choice_column, ColumnInference, ) from dce_analyzer.apollo import APOLLO_DATASETS # all imported without error _run("1. Import all backend modules", test_imports) # =================================================================== # 2. Generate simulated data # =================================================================== sim_output = None def test_simulate(): global sim_output from dce_analyzer.simulate import generate_simulated_dce sim_output = generate_simulated_dce( n_individuals=100, n_tasks=4, n_alts=3, seed=42 ) df = sim_output.data assert isinstance(df, pd.DataFrame), "Expected DataFrame" assert len(df) == 100 * 4 * 3, f"Expected 1200 rows, got {len(df)}" for col in ["respondent_id", "task_id", "alternative", "choice", "price", "time", "comfort", "reliability"]: assert col in df.columns, f"Missing column: {col}" assert isinstance(sim_output.true_parameters, dict) assert len(sim_output.true_parameters) > 0 _run("2. Generate simulated data (100 ind, 4 tasks, 3 alts)", test_simulate) # =================================================================== # 3. Conditional Logit estimation # =================================================================== cl_result = None def test_conditional_logit(): global cl_result from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="conditional", maxiter=200, seed=42 ) cl_result = result est = result.estimation assert est.success, f"CL did not converge: {est.message}" assert est.n_parameters == 4 assert est.n_observations == 100 * 4 # 400 choice tasks assert not est.estimates.empty assert "estimate" in est.estimates.columns _run("3. Conditional Logit estimation", test_conditional_logit) # =================================================================== # 4. Mixed Logit estimation (n_draws=50) # =================================================================== mxl_result = None def test_mixed_logit(): global mxl_result from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42 ) mxl_result = result est = result.estimation # 2 normal (mu+sd each) + 2 fixed = 6 params assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" assert not est.estimates.empty # Should have mu_price, sd_price, mu_time, sd_time, beta_comfort, beta_reliability param_names = set(est.estimates["parameter"]) for expected in ["mu_price", "sd_price", "mu_time", "sd_time", "beta_comfort", "beta_reliability"]: assert expected in param_names, f"Missing param: {expected}" _run("4. Mixed Logit estimation (n_draws=50)", test_mixed_logit) # =================================================================== # 5. Latent Class estimation (n_classes=2, n_starts=3) # =================================================================== lc_result = None def test_latent_class(): global lc_result from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], n_classes=2, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="latent_class", maxiter=200, seed=42, n_classes=2, n_starts=3, ) lc_result = result est = result.estimation assert est.n_classes == 2 assert len(est.class_probabilities) == 2 assert abs(sum(est.class_probabilities) - 1.0) < 1e-4, "Class probs must sum to 1" assert not est.estimates.empty assert not est.class_estimates.empty assert not est.posterior_probs.empty assert est.posterior_probs.shape[1] == 2 # two class columns _run("5. Latent Class estimation (n_classes=2, n_starts=3)", test_latent_class) # =================================================================== # 6. WTP computation # =================================================================== def test_wtp(): from dce_analyzer.wtp import compute_wtp # Use CL result (EstimationResult) for WTP wtp_df = compute_wtp(cl_result.estimation, cost_variable="price") assert isinstance(wtp_df, pd.DataFrame) assert len(wtp_df) == 3 # time, comfort, reliability (3 non-cost attrs) assert "wtp_estimate" in wtp_df.columns assert "wtp_std_error" in wtp_df.columns assert "wtp_ci_lower" in wtp_df.columns assert "wtp_ci_upper" in wtp_df.columns # WTP values should be finite for _, row in wtp_df.iterrows(): assert np.isfinite(row["wtp_estimate"]), f"Non-finite WTP for {row['attribute']}" _run("6. WTP computation (CL result)", test_wtp) # =================================================================== # 7. Bootstrap (n_boot=10) # =================================================================== def test_bootstrap(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.bootstrap import run_bootstrap spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], ) boot = run_bootstrap( df=sim_output.data, spec=spec, model_type="conditional", n_replications=10, maxiter=100, seed=42, ) assert boot.n_replications == 10 assert boot.n_successful >= 2, f"Only {boot.n_successful} succeeded" assert len(boot.param_names) == 4 assert boot.estimates_matrix.shape == (boot.n_successful, 4) summary = boot.summary_dataframe() assert isinstance(summary, pd.DataFrame) assert len(summary) == 4 _run("7. Bootstrap (n_boot=10, conditional logit)", test_bootstrap) # =================================================================== # 8. Wide-to-long conversion # =================================================================== def test_wide_to_long(): from dce_analyzer.format_converter import detect_format, wide_to_long # Create a small wide-format dataset wide_df = pd.DataFrame({ "id": [1, 1, 2, 2], "choice": [1, 2, 1, 3], "price_1": [10, 20, 15, 25], "price_2": [12, 22, 17, 27], "price_3": [14, 24, 19, 29], "time_1": [30, 40, 35, 45], "time_2": [32, 42, 37, 47], "time_3": [34, 44, 39, 49], }) fmt = detect_format(wide_df) assert fmt == "wide", f"Expected 'wide', got '{fmt}'" long_df = wide_to_long( wide_df, attribute_groups={ "price": ["price_1", "price_2", "price_3"], "time": ["time_1", "time_2", "time_3"], }, id_col="id", choice_col="choice", ) assert isinstance(long_df, pd.DataFrame) # 4 rows * 3 alts = 12 rows assert len(long_df) == 12, f"Expected 12 rows, got {len(long_df)}" assert "alternative" in long_df.columns assert "choice" in long_df.columns assert "price" in long_df.columns assert "time" in long_df.columns # Each task should have exactly one chosen alt for (rid, tid), grp in long_df.groupby(["respondent_id", "task_id"]): assert grp["choice"].sum() == 1, f"Task ({rid},{tid}) has {grp['choice'].sum()} choices" # Test detect_format on long data fmt2 = detect_format(long_df) assert fmt2 == "long", f"Expected 'long' for converted data, got '{fmt2}'" _run("8. Wide-to-long conversion", test_wide_to_long) # =================================================================== # 9. Additional checks: infer_structure, normalize_choice_column # =================================================================== def test_infer_and_normalize(): from dce_analyzer.format_converter import infer_structure, normalize_choice_column df = sim_output.data inference = infer_structure(df) assert inference.id_col is not None, "Should detect id column" assert inference.choice_col is not None, "Should detect choice column" # Test normalize_choice_column (already binary -- should be no-op) normalized = normalize_choice_column(df, "choice", "alternative") assert set(normalized["choice"].unique()) <= {0, 1} _run("9. infer_structure & normalize_choice_column", test_infer_and_normalize) # =================================================================== # 10. LatentClassResult.summary_dict() # =================================================================== def test_lc_summary(): est = lc_result.estimation sd = est.summary_dict() assert "n_classes" in sd assert "class_probabilities" in sd assert sd["n_classes"] == 2 _run("10. LatentClassResult.summary_dict()", test_lc_summary) # =================================================================== # 11. Full correlated MMNL (backward compat) # =================================================================== def test_full_correlated_mxl(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42, correlated=True, ) est = result.estimation assert est.covariance_matrix is not None, "Expected covariance matrix" assert est.covariance_matrix.shape == (2, 2), f"Expected 2x2 cov, got {est.covariance_matrix.shape}" assert est.correlation_matrix is not None _run("11. Full correlated MMNL (backward compat)", test_full_correlated_mxl) # =================================================================== # 12. Selective correlated MMNL (block-diagonal Cholesky) # =================================================================== def test_selective_correlated_mxl(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="normal"), VariableSpec(name="reliability", column="reliability", distribution="normal"), ], n_draws=50, ) # Correlate price-time (group [0,1]) and comfort-reliability (group [2,3]) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42, correlation_groups=[[0, 1], [2, 3]], ) est = result.estimation assert est.covariance_matrix is not None, "Expected covariance matrix" assert est.covariance_matrix.shape == (4, 4) # Off-block elements should be zero (price-comfort, price-reliability, etc.) cov = est.covariance_matrix assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" assert abs(cov[0, 3]) < 1e-8, f"Expected 0 cov(price,reliability), got {cov[0,3]}" assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" assert abs(cov[1, 3]) < 1e-8, f"Expected 0 cov(time,reliability), got {cov[1,3]}" # Within-block elements should be non-zero assert abs(cov[0, 1]) > 1e-10 or True # may be zero by chance, just check shape _run("12. Selective correlated MMNL (block-diagonal)", test_selective_correlated_mxl) # =================================================================== # 13. Selective with standalone random params # =================================================================== def test_selective_with_standalone(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="normal"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) # Only correlate price-time, comfort is standalone random result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42, correlation_groups=[[0, 1]], ) est = result.estimation assert est.covariance_matrix is not None assert est.covariance_matrix.shape == (3, 3) cov = est.covariance_matrix # comfort (index 2) is standalone: zero cross-cov with price/time assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" # n_parameters: 3 mu + 3 chol(price-time) + 1 sd(comfort) + 1 fixed = 8 assert est.n_parameters == 8, f"Expected 8 params, got {est.n_parameters}" _run("13. Selective with standalone random params", test_selective_with_standalone) # =================================================================== # 14. Create BWS simulated data # =================================================================== bws_df = None def test_create_bws_data(): """Create BWS data by adding a 'worst' column to simulated DCE data.""" global bws_df df = sim_output.data.copy() # J=3 alts per task. For each task, pick the alt with LOWEST utility-like # score as worst. Use negative of choice to ensure worst != best. rng = np.random.default_rng(99) worst_rows = [] for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): best_alt = grp.loc[grp["choice"] == 1, "alternative"].values[0] non_best = grp[grp["alternative"] != best_alt] # Pick random non-best as worst worst_alt = non_best["alternative"].values[rng.integers(len(non_best))] for _, row in grp.iterrows(): worst_rows.append(1 if row["alternative"] == worst_alt else 0) df["worst"] = worst_rows # Verify: each task has exactly 1 worst, 1 best, and worst != best for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): assert grp["choice"].sum() == 1, "Exactly one best per task" assert grp["worst"].sum() == 1, "Exactly one worst per task" best_idx = grp.loc[grp["choice"] == 1].index[0] worst_idx = grp.loc[grp["worst"] == 1].index[0] assert best_idx != worst_idx, "worst != best" bws_df = df assert "worst" in bws_df.columns _run("14. Create BWS simulated data", test_create_bws_data) # =================================================================== # 15. BWS + Conditional Logit # =================================================================== def test_bws_clogit(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], ) result = estimate_dataframe( df=bws_df, spec=spec, model_type="conditional", maxiter=200, seed=42, bws_worst_col="worst", estimate_lambda_w=True, ) est = result.estimation assert est.success, f"BWS CL did not converge: {est.message}" # 4 betas + 1 lambda_w = 5 params assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" # lambda_w should appear in estimates param_names = set(est.estimates["parameter"]) assert "lambda_w (worst scale)" in param_names, f"Missing lambda_w param. Got: {param_names}" # lambda_w should be positive lw_row = est.estimates[est.estimates["parameter"] == "lambda_w (worst scale)"] assert lw_row["estimate"].values[0] > 0, "lambda_w must be positive" _run("15. BWS + Conditional Logit", test_bws_clogit) # =================================================================== # 16. BWS + CLogit with lambda_w fixed (MaxDiff equivalent) # =================================================================== def test_bws_clogit_fixed_lw(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], ) result = estimate_dataframe( df=bws_df, spec=spec, model_type="conditional", maxiter=200, seed=42, bws_worst_col="worst", estimate_lambda_w=False, ) est = result.estimation assert est.success # 4 betas only (no lambda_w) assert est.n_parameters == 4, f"Expected 4 params, got {est.n_parameters}" param_names = set(est.estimates["parameter"]) assert "lambda_w (worst scale)" not in param_names _run("16. BWS + CLogit fixed lambda_w (MaxDiff)", test_bws_clogit_fixed_lw) # =================================================================== # 17. BWS + Mixed Logit # =================================================================== def test_bws_mxl(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=bws_df, spec=spec, model_type="mixed", maxiter=200, seed=42, bws_worst_col="worst", estimate_lambda_w=True, ) est = result.estimation # 2 mu + 2 sd + 2 fixed + 1 lambda_w = 7 assert est.n_parameters == 7, f"Expected 7 params, got {est.n_parameters}" param_names = set(est.estimates["parameter"]) assert "lambda_w (worst scale)" in param_names assert "mu_price" in param_names assert "sd_price" in param_names _run("17. BWS + Mixed Logit", test_bws_mxl) # =================================================================== # 18. BWS + GMNL # =================================================================== def test_bws_gmnl(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="fixed"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=bws_df, spec=spec, model_type="gmnl", maxiter=200, seed=42, bws_worst_col="worst", estimate_lambda_w=True, ) est = result.estimation # 1 mu + 1 sd + 3 fixed + 1 lambda_w + 3 GMNL(tau,sigma_tau,gamma) = 9 assert est.n_parameters == 9, f"Expected 9 params, got {est.n_parameters}" param_names = set(est.estimates["parameter"]) assert "lambda_w (worst scale)" in param_names assert "tau (scale mean)" in param_names _run("18. BWS + GMNL", test_bws_gmnl) # =================================================================== # 19. BWS + Latent Class # =================================================================== def test_bws_lc(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], n_classes=2, ) result = estimate_dataframe( df=bws_df, spec=spec, model_type="latent_class", maxiter=200, seed=42, n_classes=2, n_starts=3, bws_worst_col="worst", estimate_lambda_w=True, ) est = result.estimation assert est.n_classes == 2 assert len(est.class_probabilities) == 2 # Check lambda_w appears in estimates lw_rows = est.estimates[est.estimates["parameter"].str.contains("lambda_w")] assert len(lw_rows) > 0, "Missing lambda_w in LC estimates" _run("19. BWS + Latent Class", test_bws_lc) # =================================================================== # 20. Correlation inference (delta method SEs for cov/cor) # =================================================================== def test_correlation_inference(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42, correlated=True, ) est = result.estimation # Covariance SE matrix should exist and match shape assert est.covariance_se is not None, "Expected covariance_se" assert est.covariance_se.shape == (2, 2), f"Expected 2x2, got {est.covariance_se.shape}" # Correlation SE matrix assert est.correlation_se is not None, "Expected correlation_se" assert est.correlation_se.shape == (2, 2) # Diagonal of correlation SE should be 0 (cor(x,x)=1, no variation) for i in range(2): assert est.correlation_se[i, i] < 1e-6, f"Diagonal cor SE should be ~0, got {est.correlation_se[i,i]}" # Correlation test table assert est.correlation_test is not None, "Expected correlation_test DataFrame" assert len(est.correlation_test) == 1, "Expected 1 off-diagonal pair for 2 random params" row = est.correlation_test.iloc[0] assert row["param_1"] == "price" assert row["param_2"] == "time" assert not np.isnan(row["cor_std_error"]), "SE should not be NaN" assert not np.isnan(row["z_stat"]), "z_stat should not be NaN" assert not np.isnan(row["p_value"]), "p_value should not be NaN" assert 0.0 <= row["p_value"] <= 1.0, f"p-value out of range: {row['p_value']}" _run("20. Correlation inference (delta method SEs for cov/cor)", test_correlation_inference) # =================================================================== # 21. FullModelSpec + estimate_from_spec # =================================================================== def test_full_model_spec(): from dce_analyzer.config import FullModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_from_spec spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], model_type="mixed", n_draws=50, maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation # Should produce the same kind of result as estimate_dataframe assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" assert not est.estimates.empty param_names = set(est.estimates["parameter"]) for expected in ["mu_price", "sd_price", "mu_time", "sd_time", "beta_comfort", "beta_reliability"]: assert expected in param_names, f"Missing param: {expected}" assert est.n_observations == 100 * 4 _run("21. FullModelSpec + estimate_from_spec", test_full_model_spec) # =================================================================== # 22. Heterogeneity interactions with MMNL via FullModelSpec # =================================================================== def test_interactions_mmnl(): from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec from dce_analyzer.pipeline import estimate_from_spec spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="fixed"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], model_type="mixed", interactions=[ InteractionTerm(columns=("price", "income")), ], n_draws=50, maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation param_names = set(est.estimates["parameter"]) # Interaction term should appear as a fixed parameter assert "beta_price_x_income" in param_names, ( f"Missing interaction param. Got: {param_names}" ) # 1 mu + 1 sd (price) + 3 fixed (time, comfort, reliability) + 1 interaction = 6 assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" _run("22. Heterogeneity interactions with MMNL (InteractionTerm)", test_interactions_mmnl) # =================================================================== # 23. GMNL + full correlation # =================================================================== def test_gmnl_full_correlation(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="gmnl", maxiter=200, seed=42, correlated=True, ) est = result.estimation assert est.covariance_matrix is not None, "Expected covariance matrix for GMNL+correlated" assert est.covariance_matrix.shape == (2, 2), ( f"Expected 2x2 cov, got {est.covariance_matrix.shape}" ) assert est.correlation_matrix is not None # GMNL params: 2 mu + chol(2)=3 + 2 fixed + 3 GMNL(tau,sigma_tau,gamma) = 10 assert est.n_parameters == 10, f"Expected 10 params, got {est.n_parameters}" param_names = set(est.estimates["parameter"]) assert "tau (scale mean)" in param_names assert "sigma_tau (scale SD)" in param_names assert "gamma (mixing)" in param_names _run("23. GMNL + full correlation", test_gmnl_full_correlation) # =================================================================== # 24. GMNL + selective correlation # =================================================================== def test_gmnl_selective_correlation(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="normal"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) # Correlate price-time only; comfort is standalone random result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="gmnl", maxiter=200, seed=42, correlation_groups=[[0, 1]], ) est = result.estimation assert est.covariance_matrix is not None assert est.covariance_matrix.shape == (3, 3) cov = est.covariance_matrix # comfort (index 2) is standalone: zero cross-cov with price/time assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" param_names = set(est.estimates["parameter"]) assert "tau (scale mean)" in param_names _run("24. GMNL + selective correlation", test_gmnl_selective_correlation) # =================================================================== # 25. BWS composable functions (bws_log_prob, standard_log_prob) # =================================================================== def test_bws_composable_functions(): import torch from dce_analyzer.bws import bws_log_prob, standard_log_prob # Create simple test tensors: 4 observations, 3 alternatives n_obs, n_alts = 4, 3 torch.manual_seed(42) utility = torch.randn(n_obs, n_alts) y_best = torch.tensor([0, 1, 2, 0]) # chosen alt indices y_worst = torch.tensor([2, 0, 1, 1]) # worst alt indices (different from best) # Test standard_log_prob log_p = standard_log_prob(utility, y_best, alt_dim=-1) assert log_p.shape == (n_obs,), f"Expected shape ({n_obs},), got {log_p.shape}" # Log-probabilities must be <= 0 assert (log_p <= 1e-6).all(), "Log-probabilities must be <= 0" # Probabilities must sum to 1 across alternatives (verify via logsumexp) log_all = torch.stack([ standard_log_prob(utility, torch.full((n_obs,), j), alt_dim=-1) for j in range(n_alts) ], dim=1) prob_sums = torch.exp(log_all).sum(dim=1) assert torch.allclose(prob_sums, torch.ones(n_obs), atol=1e-5), ( f"Probabilities don't sum to 1: {prob_sums}" ) # Test bws_log_prob lambda_w = 1.0 log_p_bws = bws_log_prob(utility, y_best, y_worst, lambda_w, alt_dim=-1) assert log_p_bws.shape == (n_obs,), f"Expected shape ({n_obs},), got {log_p_bws.shape}" assert (log_p_bws <= 1e-6).all(), "BWS log-probabilities must be <= 0" # BWS log-prob should be less than standard (it's a product of two probs) assert (log_p_bws <= log_p + 1e-6).all(), ( "BWS log-prob should be <= standard log-prob (product of two probs)" ) # Test with lambda_w as tensor lambda_w_tensor = torch.tensor(2.0) log_p_bws2 = bws_log_prob(utility, y_best, y_worst, lambda_w_tensor, alt_dim=-1) assert log_p_bws2.shape == (n_obs,) # Test with 3D utility (simulating draws): (n_obs, n_draws, n_alts) n_draws = 5 utility_3d = torch.randn(n_obs, n_draws, n_alts) log_p_3d = standard_log_prob(utility_3d, y_best, alt_dim=-1) assert log_p_3d.shape == (n_obs, n_draws), f"Expected ({n_obs},{n_draws}), got {log_p_3d.shape}" log_p_bws_3d = bws_log_prob(utility_3d, y_best, y_worst, 1.0, alt_dim=-1) assert log_p_bws_3d.shape == (n_obs, n_draws), ( f"Expected ({n_obs},{n_draws}), got {log_p_bws_3d.shape}" ) _run("25. BWS composable functions (bws_log_prob, standard_log_prob)", test_bws_composable_functions) # =================================================================== # 26. Heterogeneity interactions with Latent Class via FullModelSpec # =================================================================== def test_interactions_lc(): from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec from dce_analyzer.pipeline import estimate_from_spec spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], model_type="latent_class", interactions=[ InteractionTerm(columns=("price", "income")), ], n_classes=2, n_starts=3, maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation assert est.n_classes == 2 # Interaction param should appear in estimates has_interaction = any("price_x_income" in str(p) for p in est.estimates["parameter"]) assert has_interaction, ( f"Missing interaction param in LC estimates. Got: {list(est.estimates['parameter'])}" ) _run("26. Heterogeneity interactions with Latent Class (InteractionTerm)", test_interactions_lc) # =================================================================== # 27. FullModelSpec with dummy coding via estimate_from_spec # =================================================================== def test_dummy_coding_via_spec(): from dce_analyzer.config import DummyCoding, FullModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_from_spec # comfort has 2 unique values (0, 1) -> dummy with ref=0 -> one dummy comfort_L1 spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], model_type="conditional", dummy_codings=[ DummyCoding(column="comfort", ref_level=0), ], maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation param_names = set(est.estimates["parameter"]) # comfort should be expanded: beta_comfort_L1 instead of beta_comfort assert "beta_comfort_L1" in param_names, ( f"Missing dummy param beta_comfort_L1. Got: {param_names}" ) # Original comfort should NOT appear assert "beta_comfort" not in param_names, ( f"Original column should be replaced by dummy expansion. Got: {param_names}" ) # price, time, reliability remain continuous assert "beta_price" in param_names assert "beta_time" in param_names assert "beta_reliability" in param_names # 3 continuous + 1 dummy = 4 params assert est.n_parameters == 4, f"Expected 4 params, got {est.n_parameters}" _run("27. FullModelSpec with dummy coding via estimate_from_spec", test_dummy_coding_via_spec) # =================================================================== # 28. Variable ordering: dummy-coded vars expanded in-place # =================================================================== def test_variable_ordering_preservation(): from dce_analyzer.config import DummyCoding, FullModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_from_spec # Variables in order: price (continuous), comfort (dummy, binary 0/1), time (continuous), reliability (continuous) # After expansion, order must be: price, comfort_L1, time, reliability # (not: price, time, reliability, comfort_L1 — the old buggy behavior) spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="time", column="time"), VariableSpec(name="reliability", column="reliability"), ], model_type="conditional", dummy_codings=[ DummyCoding(column="comfort", ref_level=0), ], maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation param_names = list(est.estimates["parameter"]) # Check order: price -> comfort dummy -> time -> reliability expected_order = ["beta_price", "beta_comfort_L1", "beta_time", "beta_reliability"] assert param_names == expected_order, ( f"Variable ordering not preserved. Expected {expected_order}, got {param_names}" ) # Also verify expanded_spec preserves order exp_spec = result.expanded_spec exp_var_names = [v.name for v in exp_spec.variables] assert exp_var_names == ["price", "comfort_L1", "time", "reliability"], ( f"Expanded spec variable order wrong: {exp_var_names}" ) _run("28. Variable ordering: dummy-coded vars expanded in-place", test_variable_ordering_preservation) # =================================================================== # 29. WTP theta_index mapping for MMNL (SE correctness) # =================================================================== def test_wtp_theta_index(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe from dce_analyzer.wtp import compute_wtp # price is random, then time (fixed), comfort (fixed), reliability (fixed) # This creates interleaved mu/sd rows: mu_price, sd_price, beta_time, ... # The theta_index mapping must be correct for WTP SEs. spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="fixed"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42, ) est = result.estimation # Verify theta_index column exists and is correct assert "theta_index" in est.estimates.columns, "theta_index column missing" # mu_price -> theta 0, sd_price -> theta 4, beta_time -> theta 1, # beta_comfort -> theta 2, beta_reliability -> theta 3 tidx_map = dict(zip(est.estimates["parameter"], est.estimates["theta_index"])) assert tidx_map["mu_price"] == 0, f"mu_price should be theta 0, got {tidx_map['mu_price']}" assert tidx_map["beta_time"] == 1, f"beta_time should be theta 1, got {tidx_map['beta_time']}" assert tidx_map["sd_price"] == 4, f"sd_price should be theta 4, got {tidx_map['sd_price']}" # Compute WTP using time as the cost variable wtp_df = compute_wtp(est, cost_variable="time") assert not wtp_df.empty # Check that SEs are not NaN (vcov should be available) if est.vcov_matrix is not None: for _, row in wtp_df.iterrows(): if row["attribute"] in ("price", "comfort", "reliability"): assert not np.isnan(row["wtp_std_error"]), ( f"WTP SE is NaN for {row['attribute']} — theta_index mapping may be wrong" ) _run("29. WTP theta_index mapping for MMNL (SE correctness)", test_wtp_theta_index) # =================================================================== # 30. 3-way interaction (price × time × income) # =================================================================== def test_3way_interaction(): from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec from dce_analyzer.pipeline import estimate_from_spec spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], model_type="conditional", interactions=[ InteractionTerm(columns=("price", "time", "income")), ], maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation param_names = set(est.estimates["parameter"]) # 3-way interaction name: price_x_time_x_income assert "beta_price_x_time_x_income" in param_names, ( f"Missing 3-way interaction param. Got: {param_names}" ) # 4 base + 1 interaction = 5 params assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" _run("30. 3-way interaction (price × time × income)", test_3way_interaction) # =================================================================== # 31. Attribute × attribute interaction (price × time) # =================================================================== def test_attribute_x_attribute_interaction(): from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec from dce_analyzer.pipeline import estimate_from_spec spec = FullModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], model_type="conditional", interactions=[ InteractionTerm(columns=("price", "time")), ], maxiter=200, seed=42, ) result = estimate_from_spec(df=sim_output.data, spec=spec) est = result.estimation param_names = set(est.estimates["parameter"]) # attribute x attribute interaction assert "beta_price_x_time" in param_names, ( f"Missing attribute x attribute interaction param. Got: {param_names}" ) # 4 base + 1 interaction = 5 params assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" _run("31. Attribute × attribute interaction (price × time)", test_attribute_x_attribute_interaction) # =================================================================== # 32. Custom attribute names in simulation # =================================================================== def test_custom_attribute_names(): from dce_analyzer.simulate import generate_simulated_dce output = generate_simulated_dce( n_individuals=50, n_tasks=4, n_alts=3, seed=123, attribute_names=["cost", "quality", "speed"], ) df = output.data for col in ["cost", "quality", "speed"]: assert col in df.columns, f"Missing custom attribute column: {col}" # true_params should reference custom names for attr in ["cost", "quality", "speed"]: assert f"mu_{attr}" in output.true_parameters, f"Missing mu_{attr} in true_params" assert f"sd_{attr}" in output.true_parameters, f"Missing sd_{attr} in true_params" # Default attributes should NOT be present for col in ["price", "time", "comfort", "reliability"]: assert col not in df.columns, f"Default attribute '{col}' should not be present" _run("32. Custom attribute names in simulation", test_custom_attribute_names) # =================================================================== # 33. Custom covariate names in simulation # =================================================================== def test_custom_covariate_names(): from dce_analyzer.simulate import generate_simulated_dce output = generate_simulated_dce( n_individuals=50, n_tasks=4, n_alts=3, seed=123, covariate_names=["education", "gender"], ) df = output.data for col in ["education", "gender"]: assert col in df.columns, f"Missing custom covariate column: {col}" # Covariates should be constant within each respondent for (rid,), grp in df.groupby(["respondent_id"]): for col in ["education", "gender"]: assert grp[col].nunique() == 1, ( f"Covariate '{col}' not constant for respondent {rid}" ) # Default covariates should NOT be present for col in ["income", "age"]: assert col not in df.columns, f"Default covariate '{col}' should not be present" _run("33. Custom covariate names in simulation", test_custom_covariate_names) # =================================================================== # 34. BWS simulation (worst column) # =================================================================== def test_bws_simulation(): from dce_analyzer.simulate import generate_simulated_dce output = generate_simulated_dce( n_individuals=50, n_tasks=4, n_alts=3, seed=42, bws=True, ) df = output.data assert "worst" in df.columns, "Missing 'worst' column" # Each task should have exactly 1 worst for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): assert grp["worst"].sum() == 1, f"Task ({rid},{tid}) should have exactly 1 worst" assert grp["choice"].sum() == 1, f"Task ({rid},{tid}) should have exactly 1 best" best_alt = grp.loc[grp["choice"] == 1, "alternative"].values[0] worst_alt = grp.loc[grp["worst"] == 1, "alternative"].values[0] assert best_alt != worst_alt, f"Task ({rid},{tid}): worst must differ from best" _run("34. BWS simulation (worst column)", test_bws_simulation) # =================================================================== # 35. BWS simulation with n_alts=2 raises ValueError # =================================================================== def test_bws_n_alts_2_raises(): from dce_analyzer.simulate import generate_simulated_dce try: generate_simulated_dce(n_individuals=10, n_tasks=2, n_alts=2, seed=1, bws=True) raise AssertionError("Should have raised ValueError for bws with n_alts=2") except ValueError as exc: assert "n_alts >= 3" in str(exc), f"Unexpected error message: {exc}" _run("35. BWS simulation with n_alts=2 raises ValueError", test_bws_n_alts_2_raises) # =================================================================== # 36. Default params backward compat (no new args) # =================================================================== def test_default_params_backward_compat(): from dce_analyzer.simulate import generate_simulated_dce output = generate_simulated_dce(n_individuals=50, n_tasks=4, n_alts=3, seed=42) df = output.data # Same columns as original test 2 for col in ["respondent_id", "task_id", "alternative", "choice", "price", "time", "comfort", "reliability", "income", "age"]: assert col in df.columns, f"Missing column: {col}" assert "worst" not in df.columns, "'worst' column should not be present by default" assert len(df) == 50 * 4 * 3 # true_params should contain the hardcoded keys for key in ["mu_price", "sd_price", "mu_time", "sd_time", "mu_comfort", "sd_comfort", "beta_reliability"]: assert key in output.true_parameters, f"Missing true_param key: {key}" _run("36. Default params backward compat (no new args)", test_default_params_backward_compat) # =================================================================== # 37. Bootstrap with Mixed Logit # =================================================================== def test_bootstrap_mixed_logit(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.bootstrap import run_bootstrap spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price", distribution="normal"), VariableSpec(name="time", column="time", distribution="normal"), VariableSpec(name="comfort", column="comfort", distribution="fixed"), VariableSpec(name="reliability", column="reliability", distribution="fixed"), ], n_draws=50, ) boot = run_bootstrap( df=sim_output.data, spec=spec, model_type="mixed", n_replications=5, maxiter=100, seed=42, ) assert boot.n_replications == 5 assert boot.n_successful >= 2, f"Only {boot.n_successful} succeeded" # 2 normal (mu+sd each) + 2 fixed = 6 params assert len(boot.param_names) == 6, f"Expected 6 params, got {len(boot.param_names)}" assert boot.estimates_matrix.shape == (boot.n_successful, 6) summary = boot.summary_dataframe() assert isinstance(summary, pd.DataFrame) assert len(summary) == 6 # Check that bootstrap SE is computed for all parameters for name in boot.param_names: se = boot.bootstrap_se[name] assert se >= 0, f"Bootstrap SE negative for {name}" assert np.isfinite(se), f"Bootstrap SE not finite for {name}" _run("37. Bootstrap with Mixed Logit", test_bootstrap_mixed_logit) # =================================================================== # 38. Latent Class with EM algorithm # =================================================================== def test_lc_em(): from dce_analyzer.config import ModelSpec, VariableSpec from dce_analyzer.pipeline import estimate_dataframe spec = ModelSpec( id_col="respondent_id", task_col="task_id", alt_col="alternative", choice_col="choice", variables=[ VariableSpec(name="price", column="price"), VariableSpec(name="time", column="time"), VariableSpec(name="comfort", column="comfort"), VariableSpec(name="reliability", column="reliability"), ], n_classes=2, ) result = estimate_dataframe( df=sim_output.data, spec=spec, model_type="latent_class", maxiter=200, seed=42, n_classes=2, n_starts=3, lc_method="em", ) est = result.estimation assert est.n_classes == 2 assert len(est.class_probabilities) == 2 assert abs(sum(est.class_probabilities) - 1.0) < 1e-4, "Class probs must sum to 1" assert not est.estimates.empty assert not est.class_estimates.empty assert not est.posterior_probs.empty assert est.posterior_probs.shape[1] == 2 # EM-specific fields assert est.optimizer_method == "EM" assert est.em_iterations > 0, "EM should run at least 1 iteration" assert len(est.em_ll_history) == est.em_iterations assert isinstance(est.em_converged, bool) # LL should be monotonically non-decreasing (EM guarantee) for i in range(1, len(est.em_ll_history)): assert est.em_ll_history[i] >= est.em_ll_history[i - 1] - 1e-6, ( f"EM LL decreased at iter {i}: {est.em_ll_history[i-1]:.6f} -> {est.em_ll_history[i]:.6f}" ) # summary_dict should include EM fields sd = est.summary_dict() assert "em_iterations" in sd assert "em_ll_history" in sd assert "em_converged" in sd _run("38. Latent Class with EM algorithm", test_lc_em) # =================================================================== # Summary # =================================================================== print() print("=" * 60) n_pass = sum(1 for _, ok, _ in _results if ok) n_fail = sum(1 for _, ok, _ in _results if not ok) print(f" {n_pass} passed, {n_fail} failed out of {len(_results)} tests") print("=" * 60) if n_fail > 0: print() print("FAILURES:") for name, ok, msg in _results: if not ok: print(f" {name}: {msg}") print() sys.exit(1) else: print(" ALL TESTS PASSED") sys.exit(0)