File size: 6,671 Bytes
247642a 5ed1762 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | # Copyright (C) 2026 Hengzhe Zhao. All rights reserved.
# Licensed under dual license: AGPL-3.0 (open-source) or commercial. See LICENSE.
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
import pandas as pd
@dataclass
class SimulationOutput:
data: pd.DataFrame
true_parameters: dict[str, float]
def generate_simulated_dce(
n_individuals: int = 300,
n_tasks: int = 8,
n_alts: int = 3,
seed: int = 42,
attribute_names: list[str] | None = None,
covariate_names: list[str] | None = None,
bws: bool = False,
) -> SimulationOutput:
"""
Generate a panel choice dataset in long format.
Parameters
----------
attribute_names : list[str] or None
Custom attribute column names. When None, uses the hardcoded defaults
(price, time, comfort, reliability) with their original distributions.
covariate_names : list[str] or None
Custom covariate column names. When None, uses the hardcoded defaults
(income, age) with their original distributions.
bws : bool
If True, generate a ``worst`` column (binary 0/1) alongside the best
choice column. Requires ``n_alts >= 3``.
"""
if n_individuals < 1 or n_tasks < 1 or n_alts < 2:
raise ValueError("Need n_individuals >= 1, n_tasks >= 1, n_alts >= 2.")
if bws and n_alts < 3:
raise ValueError("BWS simulation requires n_alts >= 3.")
rng = np.random.default_rng(seed)
alt_constants = np.linspace(0.0, 0.35, num=n_alts)
# โโ custom vs hardcoded path โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
use_custom_attrs = attribute_names is not None
use_custom_covs = covariate_names is not None
if use_custom_attrs:
# Generate true parameters for each custom attribute
true_params: dict[str, float] = {}
attr_true_mu: list[float] = []
attr_true_sd: list[float] = []
for attr in attribute_names:
mu = float(rng.uniform(-1.5, 1.5))
sd = abs(mu) * 0.2
true_params[f"mu_{attr}"] = round(mu, 4)
true_params[f"sd_{attr}"] = round(sd, 4)
attr_true_mu.append(mu)
attr_true_sd.append(sd)
else:
attribute_names = ["price", "time", "comfort", "reliability"]
true_params = {
"mu_price": -1.20,
"sd_price": 0.25,
"mu_time": -0.70,
"sd_time": 0.18,
"mu_comfort": 0.85,
"sd_comfort": 0.12,
"beta_reliability": 0.55,
}
if not use_custom_covs:
covariate_names = ["income", "age"]
for alt_idx in range(1, n_alts):
true_params[f"asc_alt_{alt_idx + 1}"] = float(alt_constants[alt_idx])
rows: list[dict[str, float | int]] = []
# โโ covariate arrays (constant within respondent) โโโโโโโโโโโโโ
if use_custom_covs:
cov_arrays = {
name: rng.normal(50, 15, size=n_individuals).clip(10, 100)
for name in covariate_names
}
else:
cov_arrays = {
"income": rng.normal(60_000, 15_000, size=n_individuals).clip(18_000, 180_000),
"age": rng.normal(42, 12, size=n_individuals).clip(18, 80),
}
for individual in range(1, n_individuals + 1):
# individual-level random coefficients
if use_custom_attrs:
betas = [rng.normal(attr_true_mu[k], max(attr_true_sd[k], 1e-6))
for k in range(len(attribute_names))]
else:
betas_named = {
"price": rng.normal(true_params["mu_price"], true_params["sd_price"]),
"time": rng.normal(true_params["mu_time"], true_params["sd_time"]),
"comfort": rng.normal(true_params["mu_comfort"], true_params["sd_comfort"]),
}
cov_values = {name: float(arr[individual - 1]) for name, arr in cov_arrays.items()}
for task in range(1, n_tasks + 1):
# โโ attribute values โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if use_custom_attrs:
attr_values = {
attr: rng.uniform(1, 10, size=n_alts) for attr in attribute_names
}
deterministic_utility = alt_constants.copy()
for k, attr in enumerate(attribute_names):
deterministic_utility = deterministic_utility + betas[k] * attr_values[attr]
else:
price = rng.uniform(3.0, 30.0, size=n_alts)
time = rng.uniform(10.0, 90.0, size=n_alts)
comfort = rng.integers(0, 2, size=n_alts)
reliability = rng.uniform(0.70, 1.00, size=n_alts)
deterministic_utility = (
alt_constants
+ betas_named["price"] * np.log1p(price)
+ betas_named["time"] * (time / 10.0)
+ betas_named["comfort"] * comfort
+ true_params["beta_reliability"] * reliability
)
attr_values = {
"price": price,
"time": time,
"comfort": comfort,
"reliability": reliability,
}
random_shock = rng.gumbel(0.0, 1.0, size=n_alts)
utilities = deterministic_utility + random_shock
chosen_alt = int(np.argmax(utilities))
# โโ worst choice (BWS) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if bws:
remaining_utils = utilities.copy()
remaining_utils[chosen_alt] = np.inf # exclude best
worst_alt = int(np.argmin(remaining_utils))
for alt in range(n_alts):
row: dict[str, float | int] = {
"respondent_id": individual,
"task_id": task,
"alternative": alt + 1,
"choice": int(alt == chosen_alt),
}
for attr in attribute_names:
val = attr_values[attr][alt]
row[attr] = int(val) if isinstance(val, (np.integer,)) else float(val)
for cov_name, cov_val in cov_values.items():
row[cov_name] = cov_val
if bws:
row["worst"] = int(alt == worst_alt)
rows.append(row)
return SimulationOutput(data=pd.DataFrame(rows), true_parameters=true_params)
|