| | """End-to-end test script for the dce_analyzer backend. |
| | |
| | Run from project root: |
| | python scripts/test_e2e.py |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import sys |
| | import traceback |
| | from pathlib import Path |
| |
|
| | |
| | ROOT = Path(__file__).resolve().parents[1] |
| | sys.path.insert(0, str(ROOT / "src")) |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | |
| | |
| | |
| |
|
| | _results: list[tuple[str, bool, str]] = [] |
| |
|
| |
|
| | def _run(name: str, fn): |
| | """Run *fn* and record PASS / FAIL.""" |
| | try: |
| | fn() |
| | _results.append((name, True, "")) |
| | print(f" PASS {name}") |
| | except Exception as exc: |
| | msg = f"{exc.__class__.__name__}: {exc}" |
| | _results.append((name, False, msg)) |
| | print(f" FAIL {name}") |
| | traceback.print_exc() |
| | print() |
| |
|
| |
|
| | |
| | |
| | |
| | def test_imports(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.simulate import generate_simulated_dce |
| | from dce_analyzer.data import prepare_choice_tensors, ChoiceTensors |
| | from dce_analyzer.model import ( |
| | MixedLogitEstimator, |
| | ConditionalLogitEstimator, |
| | EstimationResult, |
| | ) |
| | from dce_analyzer.latent_class import LatentClassEstimator, LatentClassResult |
| | from dce_analyzer.pipeline import estimate_dataframe, PipelineResult |
| | from dce_analyzer.wtp import compute_wtp |
| | from dce_analyzer.bootstrap import run_bootstrap, BootstrapResult |
| | from dce_analyzer.format_converter import ( |
| | detect_format, |
| | wide_to_long, |
| | infer_structure, |
| | normalize_choice_column, |
| | ColumnInference, |
| | ) |
| | from dce_analyzer.apollo import APOLLO_DATASETS |
| | |
| |
|
| |
|
| | _run("1. Import all backend modules", test_imports) |
| |
|
| |
|
| | |
| | |
| | |
| | sim_output = None |
| |
|
| |
|
| | def test_simulate(): |
| | global sim_output |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | sim_output = generate_simulated_dce( |
| | n_individuals=100, n_tasks=4, n_alts=3, seed=42 |
| | ) |
| | df = sim_output.data |
| | assert isinstance(df, pd.DataFrame), "Expected DataFrame" |
| | assert len(df) == 100 * 4 * 3, f"Expected 1200 rows, got {len(df)}" |
| | for col in ["respondent_id", "task_id", "alternative", "choice", |
| | "price", "time", "comfort", "reliability"]: |
| | assert col in df.columns, f"Missing column: {col}" |
| | assert isinstance(sim_output.true_parameters, dict) |
| | assert len(sim_output.true_parameters) > 0 |
| |
|
| |
|
| | _run("2. Generate simulated data (100 ind, 4 tasks, 3 alts)", test_simulate) |
| |
|
| |
|
| | |
| | |
| | |
| | cl_result = None |
| |
|
| |
|
| | def test_conditional_logit(): |
| | global cl_result |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="conditional", maxiter=200, seed=42 |
| | ) |
| | cl_result = result |
| | est = result.estimation |
| | assert est.success, f"CL did not converge: {est.message}" |
| | assert est.n_parameters == 4 |
| | assert est.n_observations == 100 * 4 |
| | assert not est.estimates.empty |
| | assert "estimate" in est.estimates.columns |
| |
|
| |
|
| | _run("3. Conditional Logit estimation", test_conditional_logit) |
| |
|
| |
|
| | |
| | |
| | |
| | mxl_result = None |
| |
|
| |
|
| | def test_mixed_logit(): |
| | global mxl_result |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", maxiter=200, seed=42 |
| | ) |
| | mxl_result = result |
| | est = result.estimation |
| | |
| | assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" |
| | assert not est.estimates.empty |
| | |
| | param_names = set(est.estimates["parameter"]) |
| | for expected in ["mu_price", "sd_price", "mu_time", "sd_time", |
| | "beta_comfort", "beta_reliability"]: |
| | assert expected in param_names, f"Missing param: {expected}" |
| |
|
| |
|
| | _run("4. Mixed Logit estimation (n_draws=50)", test_mixed_logit) |
| |
|
| |
|
| | |
| | |
| | |
| | lc_result = None |
| |
|
| |
|
| | def test_latent_class(): |
| | global lc_result |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | n_classes=2, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="latent_class", |
| | maxiter=200, seed=42, n_classes=2, n_starts=3, |
| | ) |
| | lc_result = result |
| | est = result.estimation |
| | assert est.n_classes == 2 |
| | assert len(est.class_probabilities) == 2 |
| | assert abs(sum(est.class_probabilities) - 1.0) < 1e-4, "Class probs must sum to 1" |
| | assert not est.estimates.empty |
| | assert not est.class_estimates.empty |
| | assert not est.posterior_probs.empty |
| | assert est.posterior_probs.shape[1] == 2 |
| |
|
| |
|
| | _run("5. Latent Class estimation (n_classes=2, n_starts=3)", test_latent_class) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_wtp(): |
| | from dce_analyzer.wtp import compute_wtp |
| |
|
| | |
| | wtp_df = compute_wtp(cl_result.estimation, cost_variable="price") |
| | assert isinstance(wtp_df, pd.DataFrame) |
| | assert len(wtp_df) == 3 |
| | assert "wtp_estimate" in wtp_df.columns |
| | assert "wtp_std_error" in wtp_df.columns |
| | assert "wtp_ci_lower" in wtp_df.columns |
| | assert "wtp_ci_upper" in wtp_df.columns |
| | |
| | for _, row in wtp_df.iterrows(): |
| | assert np.isfinite(row["wtp_estimate"]), f"Non-finite WTP for {row['attribute']}" |
| |
|
| |
|
| | _run("6. WTP computation (CL result)", test_wtp) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bootstrap(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.bootstrap import run_bootstrap |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | ) |
| | boot = run_bootstrap( |
| | df=sim_output.data, spec=spec, model_type="conditional", |
| | n_replications=10, maxiter=100, seed=42, |
| | ) |
| | assert boot.n_replications == 10 |
| | assert boot.n_successful >= 2, f"Only {boot.n_successful} succeeded" |
| | assert len(boot.param_names) == 4 |
| | assert boot.estimates_matrix.shape == (boot.n_successful, 4) |
| | summary = boot.summary_dataframe() |
| | assert isinstance(summary, pd.DataFrame) |
| | assert len(summary) == 4 |
| |
|
| |
|
| | _run("7. Bootstrap (n_boot=10, conditional logit)", test_bootstrap) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_wide_to_long(): |
| | from dce_analyzer.format_converter import detect_format, wide_to_long |
| |
|
| | |
| | wide_df = pd.DataFrame({ |
| | "id": [1, 1, 2, 2], |
| | "choice": [1, 2, 1, 3], |
| | "price_1": [10, 20, 15, 25], |
| | "price_2": [12, 22, 17, 27], |
| | "price_3": [14, 24, 19, 29], |
| | "time_1": [30, 40, 35, 45], |
| | "time_2": [32, 42, 37, 47], |
| | "time_3": [34, 44, 39, 49], |
| | }) |
| |
|
| | fmt = detect_format(wide_df) |
| | assert fmt == "wide", f"Expected 'wide', got '{fmt}'" |
| |
|
| | long_df = wide_to_long( |
| | wide_df, |
| | attribute_groups={ |
| | "price": ["price_1", "price_2", "price_3"], |
| | "time": ["time_1", "time_2", "time_3"], |
| | }, |
| | id_col="id", |
| | choice_col="choice", |
| | ) |
| | assert isinstance(long_df, pd.DataFrame) |
| | |
| | assert len(long_df) == 12, f"Expected 12 rows, got {len(long_df)}" |
| | assert "alternative" in long_df.columns |
| | assert "choice" in long_df.columns |
| | assert "price" in long_df.columns |
| | assert "time" in long_df.columns |
| | |
| | for (rid, tid), grp in long_df.groupby(["respondent_id", "task_id"]): |
| | assert grp["choice"].sum() == 1, f"Task ({rid},{tid}) has {grp['choice'].sum()} choices" |
| |
|
| | |
| | fmt2 = detect_format(long_df) |
| | assert fmt2 == "long", f"Expected 'long' for converted data, got '{fmt2}'" |
| |
|
| |
|
| | _run("8. Wide-to-long conversion", test_wide_to_long) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_infer_and_normalize(): |
| | from dce_analyzer.format_converter import infer_structure, normalize_choice_column |
| |
|
| | df = sim_output.data |
| | inference = infer_structure(df) |
| | assert inference.id_col is not None, "Should detect id column" |
| | assert inference.choice_col is not None, "Should detect choice column" |
| |
|
| | |
| | normalized = normalize_choice_column(df, "choice", "alternative") |
| | assert set(normalized["choice"].unique()) <= {0, 1} |
| |
|
| |
|
| | _run("9. infer_structure & normalize_choice_column", test_infer_and_normalize) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_lc_summary(): |
| | est = lc_result.estimation |
| | sd = est.summary_dict() |
| | assert "n_classes" in sd |
| | assert "class_probabilities" in sd |
| | assert sd["n_classes"] == 2 |
| |
|
| |
|
| | _run("10. LatentClassResult.summary_dict()", test_lc_summary) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_full_correlated_mxl(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, correlated=True, |
| | ) |
| | est = result.estimation |
| | assert est.covariance_matrix is not None, "Expected covariance matrix" |
| | assert est.covariance_matrix.shape == (2, 2), f"Expected 2x2 cov, got {est.covariance_matrix.shape}" |
| | assert est.correlation_matrix is not None |
| |
|
| |
|
| | _run("11. Full correlated MMNL (backward compat)", test_full_correlated_mxl) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_selective_correlated_mxl(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="normal"), |
| | VariableSpec(name="reliability", column="reliability", distribution="normal"), |
| | ], |
| | n_draws=50, |
| | ) |
| | |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, |
| | correlation_groups=[[0, 1], [2, 3]], |
| | ) |
| | est = result.estimation |
| | assert est.covariance_matrix is not None, "Expected covariance matrix" |
| | assert est.covariance_matrix.shape == (4, 4) |
| | |
| | cov = est.covariance_matrix |
| | assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" |
| | assert abs(cov[0, 3]) < 1e-8, f"Expected 0 cov(price,reliability), got {cov[0,3]}" |
| | assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" |
| | assert abs(cov[1, 3]) < 1e-8, f"Expected 0 cov(time,reliability), got {cov[1,3]}" |
| | |
| | assert abs(cov[0, 1]) > 1e-10 or True |
| |
|
| |
|
| | _run("12. Selective correlated MMNL (block-diagonal)", test_selective_correlated_mxl) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_selective_with_standalone(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="normal"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, |
| | correlation_groups=[[0, 1]], |
| | ) |
| | est = result.estimation |
| | assert est.covariance_matrix is not None |
| | assert est.covariance_matrix.shape == (3, 3) |
| | cov = est.covariance_matrix |
| | |
| | assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" |
| | assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" |
| | |
| | assert est.n_parameters == 8, f"Expected 8 params, got {est.n_parameters}" |
| |
|
| |
|
| | _run("13. Selective with standalone random params", test_selective_with_standalone) |
| |
|
| |
|
| | |
| | |
| | |
| | bws_df = None |
| |
|
| |
|
| | def test_create_bws_data(): |
| | """Create BWS data by adding a 'worst' column to simulated DCE data.""" |
| | global bws_df |
| | df = sim_output.data.copy() |
| | |
| | |
| | rng = np.random.default_rng(99) |
| | worst_rows = [] |
| | for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): |
| | best_alt = grp.loc[grp["choice"] == 1, "alternative"].values[0] |
| | non_best = grp[grp["alternative"] != best_alt] |
| | |
| | worst_alt = non_best["alternative"].values[rng.integers(len(non_best))] |
| | for _, row in grp.iterrows(): |
| | worst_rows.append(1 if row["alternative"] == worst_alt else 0) |
| | df["worst"] = worst_rows |
| | |
| | for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): |
| | assert grp["choice"].sum() == 1, "Exactly one best per task" |
| | assert grp["worst"].sum() == 1, "Exactly one worst per task" |
| | best_idx = grp.loc[grp["choice"] == 1].index[0] |
| | worst_idx = grp.loc[grp["worst"] == 1].index[0] |
| | assert best_idx != worst_idx, "worst != best" |
| | bws_df = df |
| | assert "worst" in bws_df.columns |
| |
|
| |
|
| | _run("14. Create BWS simulated data", test_create_bws_data) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_clogit(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | ) |
| | result = estimate_dataframe( |
| | df=bws_df, spec=spec, model_type="conditional", |
| | maxiter=200, seed=42, |
| | bws_worst_col="worst", estimate_lambda_w=True, |
| | ) |
| | est = result.estimation |
| | assert est.success, f"BWS CL did not converge: {est.message}" |
| | |
| | assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" |
| | |
| | param_names = set(est.estimates["parameter"]) |
| | assert "lambda_w (worst scale)" in param_names, f"Missing lambda_w param. Got: {param_names}" |
| | |
| | lw_row = est.estimates[est.estimates["parameter"] == "lambda_w (worst scale)"] |
| | assert lw_row["estimate"].values[0] > 0, "lambda_w must be positive" |
| |
|
| |
|
| | _run("15. BWS + Conditional Logit", test_bws_clogit) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_clogit_fixed_lw(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | ) |
| | result = estimate_dataframe( |
| | df=bws_df, spec=spec, model_type="conditional", |
| | maxiter=200, seed=42, |
| | bws_worst_col="worst", estimate_lambda_w=False, |
| | ) |
| | est = result.estimation |
| | assert est.success |
| | |
| | assert est.n_parameters == 4, f"Expected 4 params, got {est.n_parameters}" |
| | param_names = set(est.estimates["parameter"]) |
| | assert "lambda_w (worst scale)" not in param_names |
| |
|
| |
|
| | _run("16. BWS + CLogit fixed lambda_w (MaxDiff)", test_bws_clogit_fixed_lw) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_mxl(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=bws_df, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, |
| | bws_worst_col="worst", estimate_lambda_w=True, |
| | ) |
| | est = result.estimation |
| | |
| | assert est.n_parameters == 7, f"Expected 7 params, got {est.n_parameters}" |
| | param_names = set(est.estimates["parameter"]) |
| | assert "lambda_w (worst scale)" in param_names |
| | assert "mu_price" in param_names |
| | assert "sd_price" in param_names |
| |
|
| |
|
| | _run("17. BWS + Mixed Logit", test_bws_mxl) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_gmnl(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="fixed"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=bws_df, spec=spec, model_type="gmnl", |
| | maxiter=200, seed=42, |
| | bws_worst_col="worst", estimate_lambda_w=True, |
| | ) |
| | est = result.estimation |
| | |
| | assert est.n_parameters == 9, f"Expected 9 params, got {est.n_parameters}" |
| | param_names = set(est.estimates["parameter"]) |
| | assert "lambda_w (worst scale)" in param_names |
| | assert "tau (scale mean)" in param_names |
| |
|
| |
|
| | _run("18. BWS + GMNL", test_bws_gmnl) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_lc(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | n_classes=2, |
| | ) |
| | result = estimate_dataframe( |
| | df=bws_df, spec=spec, model_type="latent_class", |
| | maxiter=200, seed=42, n_classes=2, n_starts=3, |
| | bws_worst_col="worst", estimate_lambda_w=True, |
| | ) |
| | est = result.estimation |
| | assert est.n_classes == 2 |
| | assert len(est.class_probabilities) == 2 |
| | |
| | lw_rows = est.estimates[est.estimates["parameter"].str.contains("lambda_w")] |
| | assert len(lw_rows) > 0, "Missing lambda_w in LC estimates" |
| |
|
| |
|
| | _run("19. BWS + Latent Class", test_bws_lc) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_correlation_inference(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, correlated=True, |
| | ) |
| | est = result.estimation |
| | |
| | assert est.covariance_se is not None, "Expected covariance_se" |
| | assert est.covariance_se.shape == (2, 2), f"Expected 2x2, got {est.covariance_se.shape}" |
| | |
| | assert est.correlation_se is not None, "Expected correlation_se" |
| | assert est.correlation_se.shape == (2, 2) |
| | |
| | for i in range(2): |
| | assert est.correlation_se[i, i] < 1e-6, f"Diagonal cor SE should be ~0, got {est.correlation_se[i,i]}" |
| | |
| | assert est.correlation_test is not None, "Expected correlation_test DataFrame" |
| | assert len(est.correlation_test) == 1, "Expected 1 off-diagonal pair for 2 random params" |
| | row = est.correlation_test.iloc[0] |
| | assert row["param_1"] == "price" |
| | assert row["param_2"] == "time" |
| | assert not np.isnan(row["cor_std_error"]), "SE should not be NaN" |
| | assert not np.isnan(row["z_stat"]), "z_stat should not be NaN" |
| | assert not np.isnan(row["p_value"]), "p_value should not be NaN" |
| | assert 0.0 <= row["p_value"] <= 1.0, f"p-value out of range: {row['p_value']}" |
| |
|
| |
|
| | _run("20. Correlation inference (delta method SEs for cov/cor)", test_correlation_inference) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_full_model_spec(): |
| | from dce_analyzer.config import FullModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | model_type="mixed", |
| | n_draws=50, |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | |
| | assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" |
| | assert not est.estimates.empty |
| | param_names = set(est.estimates["parameter"]) |
| | for expected in ["mu_price", "sd_price", "mu_time", "sd_time", |
| | "beta_comfort", "beta_reliability"]: |
| | assert expected in param_names, f"Missing param: {expected}" |
| | assert est.n_observations == 100 * 4 |
| |
|
| |
|
| | _run("21. FullModelSpec + estimate_from_spec", test_full_model_spec) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_interactions_mmnl(): |
| | from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="fixed"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | model_type="mixed", |
| | interactions=[ |
| | InteractionTerm(columns=("price", "income")), |
| | ], |
| | n_draws=50, |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | param_names = set(est.estimates["parameter"]) |
| | |
| | assert "beta_price_x_income" in param_names, ( |
| | f"Missing interaction param. Got: {param_names}" |
| | ) |
| | |
| | assert est.n_parameters == 6, f"Expected 6 params, got {est.n_parameters}" |
| |
|
| |
|
| | _run("22. Heterogeneity interactions with MMNL (InteractionTerm)", test_interactions_mmnl) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_gmnl_full_correlation(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="gmnl", |
| | maxiter=200, seed=42, correlated=True, |
| | ) |
| | est = result.estimation |
| | assert est.covariance_matrix is not None, "Expected covariance matrix for GMNL+correlated" |
| | assert est.covariance_matrix.shape == (2, 2), ( |
| | f"Expected 2x2 cov, got {est.covariance_matrix.shape}" |
| | ) |
| | assert est.correlation_matrix is not None |
| | |
| | assert est.n_parameters == 10, f"Expected 10 params, got {est.n_parameters}" |
| | param_names = set(est.estimates["parameter"]) |
| | assert "tau (scale mean)" in param_names |
| | assert "sigma_tau (scale SD)" in param_names |
| | assert "gamma (mixing)" in param_names |
| |
|
| |
|
| | _run("23. GMNL + full correlation", test_gmnl_full_correlation) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_gmnl_selective_correlation(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="normal"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="gmnl", |
| | maxiter=200, seed=42, |
| | correlation_groups=[[0, 1]], |
| | ) |
| | est = result.estimation |
| | assert est.covariance_matrix is not None |
| | assert est.covariance_matrix.shape == (3, 3) |
| | cov = est.covariance_matrix |
| | |
| | assert abs(cov[0, 2]) < 1e-8, f"Expected 0 cov(price,comfort), got {cov[0,2]}" |
| | assert abs(cov[1, 2]) < 1e-8, f"Expected 0 cov(time,comfort), got {cov[1,2]}" |
| | param_names = set(est.estimates["parameter"]) |
| | assert "tau (scale mean)" in param_names |
| |
|
| |
|
| | _run("24. GMNL + selective correlation", test_gmnl_selective_correlation) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_composable_functions(): |
| | import torch |
| | from dce_analyzer.bws import bws_log_prob, standard_log_prob |
| |
|
| | |
| | n_obs, n_alts = 4, 3 |
| | torch.manual_seed(42) |
| | utility = torch.randn(n_obs, n_alts) |
| | y_best = torch.tensor([0, 1, 2, 0]) |
| | y_worst = torch.tensor([2, 0, 1, 1]) |
| |
|
| | |
| | log_p = standard_log_prob(utility, y_best, alt_dim=-1) |
| | assert log_p.shape == (n_obs,), f"Expected shape ({n_obs},), got {log_p.shape}" |
| | |
| | assert (log_p <= 1e-6).all(), "Log-probabilities must be <= 0" |
| | |
| | log_all = torch.stack([ |
| | standard_log_prob(utility, torch.full((n_obs,), j), alt_dim=-1) |
| | for j in range(n_alts) |
| | ], dim=1) |
| | prob_sums = torch.exp(log_all).sum(dim=1) |
| | assert torch.allclose(prob_sums, torch.ones(n_obs), atol=1e-5), ( |
| | f"Probabilities don't sum to 1: {prob_sums}" |
| | ) |
| |
|
| | |
| | lambda_w = 1.0 |
| | log_p_bws = bws_log_prob(utility, y_best, y_worst, lambda_w, alt_dim=-1) |
| | assert log_p_bws.shape == (n_obs,), f"Expected shape ({n_obs},), got {log_p_bws.shape}" |
| | assert (log_p_bws <= 1e-6).all(), "BWS log-probabilities must be <= 0" |
| | |
| | assert (log_p_bws <= log_p + 1e-6).all(), ( |
| | "BWS log-prob should be <= standard log-prob (product of two probs)" |
| | ) |
| |
|
| | |
| | lambda_w_tensor = torch.tensor(2.0) |
| | log_p_bws2 = bws_log_prob(utility, y_best, y_worst, lambda_w_tensor, alt_dim=-1) |
| | assert log_p_bws2.shape == (n_obs,) |
| |
|
| | |
| | n_draws = 5 |
| | utility_3d = torch.randn(n_obs, n_draws, n_alts) |
| | log_p_3d = standard_log_prob(utility_3d, y_best, alt_dim=-1) |
| | assert log_p_3d.shape == (n_obs, n_draws), f"Expected ({n_obs},{n_draws}), got {log_p_3d.shape}" |
| |
|
| | log_p_bws_3d = bws_log_prob(utility_3d, y_best, y_worst, 1.0, alt_dim=-1) |
| | assert log_p_bws_3d.shape == (n_obs, n_draws), ( |
| | f"Expected ({n_obs},{n_draws}), got {log_p_bws_3d.shape}" |
| | ) |
| |
|
| |
|
| | _run("25. BWS composable functions (bws_log_prob, standard_log_prob)", test_bws_composable_functions) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_interactions_lc(): |
| | from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | model_type="latent_class", |
| | interactions=[ |
| | InteractionTerm(columns=("price", "income")), |
| | ], |
| | n_classes=2, |
| | n_starts=3, |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | assert est.n_classes == 2 |
| | |
| | has_interaction = any("price_x_income" in str(p) for p in est.estimates["parameter"]) |
| | assert has_interaction, ( |
| | f"Missing interaction param in LC estimates. Got: {list(est.estimates['parameter'])}" |
| | ) |
| |
|
| |
|
| | _run("26. Heterogeneity interactions with Latent Class (InteractionTerm)", test_interactions_lc) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_dummy_coding_via_spec(): |
| | from dce_analyzer.config import DummyCoding, FullModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | |
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | model_type="conditional", |
| | dummy_codings=[ |
| | DummyCoding(column="comfort", ref_level=0), |
| | ], |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | param_names = set(est.estimates["parameter"]) |
| | |
| | assert "beta_comfort_L1" in param_names, ( |
| | f"Missing dummy param beta_comfort_L1. Got: {param_names}" |
| | ) |
| | |
| | assert "beta_comfort" not in param_names, ( |
| | f"Original column should be replaced by dummy expansion. Got: {param_names}" |
| | ) |
| | |
| | assert "beta_price" in param_names |
| | assert "beta_time" in param_names |
| | assert "beta_reliability" in param_names |
| | |
| | assert est.n_parameters == 4, f"Expected 4 params, got {est.n_parameters}" |
| |
|
| |
|
| | _run("27. FullModelSpec with dummy coding via estimate_from_spec", test_dummy_coding_via_spec) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_variable_ordering_preservation(): |
| | from dce_analyzer.config import DummyCoding, FullModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | |
| | |
| | |
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | model_type="conditional", |
| | dummy_codings=[ |
| | DummyCoding(column="comfort", ref_level=0), |
| | ], |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | param_names = list(est.estimates["parameter"]) |
| | |
| | expected_order = ["beta_price", "beta_comfort_L1", "beta_time", "beta_reliability"] |
| | assert param_names == expected_order, ( |
| | f"Variable ordering not preserved. Expected {expected_order}, got {param_names}" |
| | ) |
| | |
| | exp_spec = result.expanded_spec |
| | exp_var_names = [v.name for v in exp_spec.variables] |
| | assert exp_var_names == ["price", "comfort_L1", "time", "reliability"], ( |
| | f"Expanded spec variable order wrong: {exp_var_names}" |
| | ) |
| |
|
| |
|
| | _run("28. Variable ordering: dummy-coded vars expanded in-place", test_variable_ordering_preservation) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_wtp_theta_index(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| | from dce_analyzer.wtp import compute_wtp |
| |
|
| | |
| | |
| | |
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="fixed"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | maxiter=200, seed=42, |
| | ) |
| | est = result.estimation |
| |
|
| | |
| | assert "theta_index" in est.estimates.columns, "theta_index column missing" |
| | |
| | |
| | tidx_map = dict(zip(est.estimates["parameter"], est.estimates["theta_index"])) |
| | assert tidx_map["mu_price"] == 0, f"mu_price should be theta 0, got {tidx_map['mu_price']}" |
| | assert tidx_map["beta_time"] == 1, f"beta_time should be theta 1, got {tidx_map['beta_time']}" |
| | assert tidx_map["sd_price"] == 4, f"sd_price should be theta 4, got {tidx_map['sd_price']}" |
| |
|
| | |
| | wtp_df = compute_wtp(est, cost_variable="time") |
| | assert not wtp_df.empty |
| | |
| | if est.vcov_matrix is not None: |
| | for _, row in wtp_df.iterrows(): |
| | if row["attribute"] in ("price", "comfort", "reliability"): |
| | assert not np.isnan(row["wtp_std_error"]), ( |
| | f"WTP SE is NaN for {row['attribute']} — theta_index mapping may be wrong" |
| | ) |
| |
|
| |
|
| | _run("29. WTP theta_index mapping for MMNL (SE correctness)", test_wtp_theta_index) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_3way_interaction(): |
| | from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | model_type="conditional", |
| | interactions=[ |
| | InteractionTerm(columns=("price", "time", "income")), |
| | ], |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | param_names = set(est.estimates["parameter"]) |
| | |
| | assert "beta_price_x_time_x_income" in param_names, ( |
| | f"Missing 3-way interaction param. Got: {param_names}" |
| | ) |
| | |
| | assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" |
| |
|
| |
|
| | _run("30. 3-way interaction (price × time × income)", test_3way_interaction) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_attribute_x_attribute_interaction(): |
| | from dce_analyzer.config import FullModelSpec, InteractionTerm, VariableSpec |
| | from dce_analyzer.pipeline import estimate_from_spec |
| |
|
| | spec = FullModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | model_type="conditional", |
| | interactions=[ |
| | InteractionTerm(columns=("price", "time")), |
| | ], |
| | maxiter=200, |
| | seed=42, |
| | ) |
| | result = estimate_from_spec(df=sim_output.data, spec=spec) |
| | est = result.estimation |
| | param_names = set(est.estimates["parameter"]) |
| | |
| | assert "beta_price_x_time" in param_names, ( |
| | f"Missing attribute x attribute interaction param. Got: {param_names}" |
| | ) |
| | |
| | assert est.n_parameters == 5, f"Expected 5 params, got {est.n_parameters}" |
| |
|
| |
|
| | _run("31. Attribute × attribute interaction (price × time)", test_attribute_x_attribute_interaction) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_custom_attribute_names(): |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | output = generate_simulated_dce( |
| | n_individuals=50, n_tasks=4, n_alts=3, seed=123, |
| | attribute_names=["cost", "quality", "speed"], |
| | ) |
| | df = output.data |
| | for col in ["cost", "quality", "speed"]: |
| | assert col in df.columns, f"Missing custom attribute column: {col}" |
| | |
| | for attr in ["cost", "quality", "speed"]: |
| | assert f"mu_{attr}" in output.true_parameters, f"Missing mu_{attr} in true_params" |
| | assert f"sd_{attr}" in output.true_parameters, f"Missing sd_{attr} in true_params" |
| | |
| | for col in ["price", "time", "comfort", "reliability"]: |
| | assert col not in df.columns, f"Default attribute '{col}' should not be present" |
| |
|
| |
|
| | _run("32. Custom attribute names in simulation", test_custom_attribute_names) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_custom_covariate_names(): |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | output = generate_simulated_dce( |
| | n_individuals=50, n_tasks=4, n_alts=3, seed=123, |
| | covariate_names=["education", "gender"], |
| | ) |
| | df = output.data |
| | for col in ["education", "gender"]: |
| | assert col in df.columns, f"Missing custom covariate column: {col}" |
| | |
| | for (rid,), grp in df.groupby(["respondent_id"]): |
| | for col in ["education", "gender"]: |
| | assert grp[col].nunique() == 1, ( |
| | f"Covariate '{col}' not constant for respondent {rid}" |
| | ) |
| | |
| | for col in ["income", "age"]: |
| | assert col not in df.columns, f"Default covariate '{col}' should not be present" |
| |
|
| |
|
| | _run("33. Custom covariate names in simulation", test_custom_covariate_names) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_simulation(): |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | output = generate_simulated_dce( |
| | n_individuals=50, n_tasks=4, n_alts=3, seed=42, bws=True, |
| | ) |
| | df = output.data |
| | assert "worst" in df.columns, "Missing 'worst' column" |
| | |
| | for (rid, tid), grp in df.groupby(["respondent_id", "task_id"]): |
| | assert grp["worst"].sum() == 1, f"Task ({rid},{tid}) should have exactly 1 worst" |
| | assert grp["choice"].sum() == 1, f"Task ({rid},{tid}) should have exactly 1 best" |
| | best_alt = grp.loc[grp["choice"] == 1, "alternative"].values[0] |
| | worst_alt = grp.loc[grp["worst"] == 1, "alternative"].values[0] |
| | assert best_alt != worst_alt, f"Task ({rid},{tid}): worst must differ from best" |
| |
|
| |
|
| | _run("34. BWS simulation (worst column)", test_bws_simulation) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bws_n_alts_2_raises(): |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | try: |
| | generate_simulated_dce(n_individuals=10, n_tasks=2, n_alts=2, seed=1, bws=True) |
| | raise AssertionError("Should have raised ValueError for bws with n_alts=2") |
| | except ValueError as exc: |
| | assert "n_alts >= 3" in str(exc), f"Unexpected error message: {exc}" |
| |
|
| |
|
| | _run("35. BWS simulation with n_alts=2 raises ValueError", test_bws_n_alts_2_raises) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_default_params_backward_compat(): |
| | from dce_analyzer.simulate import generate_simulated_dce |
| |
|
| | output = generate_simulated_dce(n_individuals=50, n_tasks=4, n_alts=3, seed=42) |
| | df = output.data |
| | |
| | for col in ["respondent_id", "task_id", "alternative", "choice", |
| | "price", "time", "comfort", "reliability", "income", "age"]: |
| | assert col in df.columns, f"Missing column: {col}" |
| | assert "worst" not in df.columns, "'worst' column should not be present by default" |
| | assert len(df) == 50 * 4 * 3 |
| | |
| | for key in ["mu_price", "sd_price", "mu_time", "sd_time", |
| | "mu_comfort", "sd_comfort", "beta_reliability"]: |
| | assert key in output.true_parameters, f"Missing true_param key: {key}" |
| |
|
| |
|
| | _run("36. Default params backward compat (no new args)", test_default_params_backward_compat) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_bootstrap_mixed_logit(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.bootstrap import run_bootstrap |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price", distribution="normal"), |
| | VariableSpec(name="time", column="time", distribution="normal"), |
| | VariableSpec(name="comfort", column="comfort", distribution="fixed"), |
| | VariableSpec(name="reliability", column="reliability", distribution="fixed"), |
| | ], |
| | n_draws=50, |
| | ) |
| | boot = run_bootstrap( |
| | df=sim_output.data, spec=spec, model_type="mixed", |
| | n_replications=5, maxiter=100, seed=42, |
| | ) |
| | assert boot.n_replications == 5 |
| | assert boot.n_successful >= 2, f"Only {boot.n_successful} succeeded" |
| | |
| | assert len(boot.param_names) == 6, f"Expected 6 params, got {len(boot.param_names)}" |
| | assert boot.estimates_matrix.shape == (boot.n_successful, 6) |
| | summary = boot.summary_dataframe() |
| | assert isinstance(summary, pd.DataFrame) |
| | assert len(summary) == 6 |
| | |
| | for name in boot.param_names: |
| | se = boot.bootstrap_se[name] |
| | assert se >= 0, f"Bootstrap SE negative for {name}" |
| | assert np.isfinite(se), f"Bootstrap SE not finite for {name}" |
| |
|
| |
|
| | _run("37. Bootstrap with Mixed Logit", test_bootstrap_mixed_logit) |
| |
|
| |
|
| | |
| | |
| | |
| | def test_lc_em(): |
| | from dce_analyzer.config import ModelSpec, VariableSpec |
| | from dce_analyzer.pipeline import estimate_dataframe |
| |
|
| | spec = ModelSpec( |
| | id_col="respondent_id", |
| | task_col="task_id", |
| | alt_col="alternative", |
| | choice_col="choice", |
| | variables=[ |
| | VariableSpec(name="price", column="price"), |
| | VariableSpec(name="time", column="time"), |
| | VariableSpec(name="comfort", column="comfort"), |
| | VariableSpec(name="reliability", column="reliability"), |
| | ], |
| | n_classes=2, |
| | ) |
| | result = estimate_dataframe( |
| | df=sim_output.data, spec=spec, model_type="latent_class", |
| | maxiter=200, seed=42, n_classes=2, n_starts=3, lc_method="em", |
| | ) |
| | est = result.estimation |
| | assert est.n_classes == 2 |
| | assert len(est.class_probabilities) == 2 |
| | assert abs(sum(est.class_probabilities) - 1.0) < 1e-4, "Class probs must sum to 1" |
| | assert not est.estimates.empty |
| | assert not est.class_estimates.empty |
| | assert not est.posterior_probs.empty |
| | assert est.posterior_probs.shape[1] == 2 |
| |
|
| | |
| | assert est.optimizer_method == "EM" |
| | assert est.em_iterations > 0, "EM should run at least 1 iteration" |
| | assert len(est.em_ll_history) == est.em_iterations |
| | assert isinstance(est.em_converged, bool) |
| | |
| | for i in range(1, len(est.em_ll_history)): |
| | assert est.em_ll_history[i] >= est.em_ll_history[i - 1] - 1e-6, ( |
| | f"EM LL decreased at iter {i}: {est.em_ll_history[i-1]:.6f} -> {est.em_ll_history[i]:.6f}" |
| | ) |
| |
|
| | |
| | sd = est.summary_dict() |
| | assert "em_iterations" in sd |
| | assert "em_ll_history" in sd |
| | assert "em_converged" in sd |
| |
|
| |
|
| | _run("38. Latent Class with EM algorithm", test_lc_em) |
| |
|
| |
|
| | |
| | |
| | |
| | print() |
| | print("=" * 60) |
| | n_pass = sum(1 for _, ok, _ in _results if ok) |
| | n_fail = sum(1 for _, ok, _ in _results if not ok) |
| | print(f" {n_pass} passed, {n_fail} failed out of {len(_results)} tests") |
| | print("=" * 60) |
| |
|
| | if n_fail > 0: |
| | print() |
| | print("FAILURES:") |
| | for name, ok, msg in _results: |
| | if not ok: |
| | print(f" {name}: {msg}") |
| | print() |
| | sys.exit(1) |
| | else: |
| | print(" ALL TESTS PASSED") |
| | sys.exit(0) |
| |
|