prefero / src /dce_analyzer /simulate.py
Wil2200's picture
Add dual license (AGPL-3.0 + Commercial) and copyright notices
247642a
# Copyright (C) 2026 Hengzhe Zhao. All rights reserved.
# Licensed under dual license: AGPL-3.0 (open-source) or commercial. See LICENSE.
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
import pandas as pd
@dataclass
class SimulationOutput:
data: pd.DataFrame
true_parameters: dict[str, float]
def generate_simulated_dce(
n_individuals: int = 300,
n_tasks: int = 8,
n_alts: int = 3,
seed: int = 42,
attribute_names: list[str] | None = None,
covariate_names: list[str] | None = None,
bws: bool = False,
) -> SimulationOutput:
"""
Generate a panel choice dataset in long format.
Parameters
----------
attribute_names : list[str] or None
Custom attribute column names. When None, uses the hardcoded defaults
(price, time, comfort, reliability) with their original distributions.
covariate_names : list[str] or None
Custom covariate column names. When None, uses the hardcoded defaults
(income, age) with their original distributions.
bws : bool
If True, generate a ``worst`` column (binary 0/1) alongside the best
choice column. Requires ``n_alts >= 3``.
"""
if n_individuals < 1 or n_tasks < 1 or n_alts < 2:
raise ValueError("Need n_individuals >= 1, n_tasks >= 1, n_alts >= 2.")
if bws and n_alts < 3:
raise ValueError("BWS simulation requires n_alts >= 3.")
rng = np.random.default_rng(seed)
alt_constants = np.linspace(0.0, 0.35, num=n_alts)
# โ”€โ”€ custom vs hardcoded path โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
use_custom_attrs = attribute_names is not None
use_custom_covs = covariate_names is not None
if use_custom_attrs:
# Generate true parameters for each custom attribute
true_params: dict[str, float] = {}
attr_true_mu: list[float] = []
attr_true_sd: list[float] = []
for attr in attribute_names:
mu = float(rng.uniform(-1.5, 1.5))
sd = abs(mu) * 0.2
true_params[f"mu_{attr}"] = round(mu, 4)
true_params[f"sd_{attr}"] = round(sd, 4)
attr_true_mu.append(mu)
attr_true_sd.append(sd)
else:
attribute_names = ["price", "time", "comfort", "reliability"]
true_params = {
"mu_price": -1.20,
"sd_price": 0.25,
"mu_time": -0.70,
"sd_time": 0.18,
"mu_comfort": 0.85,
"sd_comfort": 0.12,
"beta_reliability": 0.55,
}
if not use_custom_covs:
covariate_names = ["income", "age"]
for alt_idx in range(1, n_alts):
true_params[f"asc_alt_{alt_idx + 1}"] = float(alt_constants[alt_idx])
rows: list[dict[str, float | int]] = []
# โ”€โ”€ covariate arrays (constant within respondent) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if use_custom_covs:
cov_arrays = {
name: rng.normal(50, 15, size=n_individuals).clip(10, 100)
for name in covariate_names
}
else:
cov_arrays = {
"income": rng.normal(60_000, 15_000, size=n_individuals).clip(18_000, 180_000),
"age": rng.normal(42, 12, size=n_individuals).clip(18, 80),
}
for individual in range(1, n_individuals + 1):
# individual-level random coefficients
if use_custom_attrs:
betas = [rng.normal(attr_true_mu[k], max(attr_true_sd[k], 1e-6))
for k in range(len(attribute_names))]
else:
betas_named = {
"price": rng.normal(true_params["mu_price"], true_params["sd_price"]),
"time": rng.normal(true_params["mu_time"], true_params["sd_time"]),
"comfort": rng.normal(true_params["mu_comfort"], true_params["sd_comfort"]),
}
cov_values = {name: float(arr[individual - 1]) for name, arr in cov_arrays.items()}
for task in range(1, n_tasks + 1):
# โ”€โ”€ attribute values โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if use_custom_attrs:
attr_values = {
attr: rng.uniform(1, 10, size=n_alts) for attr in attribute_names
}
deterministic_utility = alt_constants.copy()
for k, attr in enumerate(attribute_names):
deterministic_utility = deterministic_utility + betas[k] * attr_values[attr]
else:
price = rng.uniform(3.0, 30.0, size=n_alts)
time = rng.uniform(10.0, 90.0, size=n_alts)
comfort = rng.integers(0, 2, size=n_alts)
reliability = rng.uniform(0.70, 1.00, size=n_alts)
deterministic_utility = (
alt_constants
+ betas_named["price"] * np.log1p(price)
+ betas_named["time"] * (time / 10.0)
+ betas_named["comfort"] * comfort
+ true_params["beta_reliability"] * reliability
)
attr_values = {
"price": price,
"time": time,
"comfort": comfort,
"reliability": reliability,
}
random_shock = rng.gumbel(0.0, 1.0, size=n_alts)
utilities = deterministic_utility + random_shock
chosen_alt = int(np.argmax(utilities))
# โ”€โ”€ worst choice (BWS) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if bws:
remaining_utils = utilities.copy()
remaining_utils[chosen_alt] = np.inf # exclude best
worst_alt = int(np.argmin(remaining_utils))
for alt in range(n_alts):
row: dict[str, float | int] = {
"respondent_id": individual,
"task_id": task,
"alternative": alt + 1,
"choice": int(alt == chosen_alt),
}
for attr in attribute_names:
val = attr_values[attr][alt]
row[attr] = int(val) if isinstance(val, (np.integer,)) else float(val)
for cov_name, cov_val in cov_values.items():
row[cov_name] = cov_val
if bws:
row["worst"] = int(alt == worst_alt)
rows.append(row)
return SimulationOutput(data=pd.DataFrame(rows), true_parameters=true_params)