File size: 6,671 Bytes
247642a
 
 
5ed1762
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# Copyright (C) 2026 Hengzhe Zhao. All rights reserved.
# Licensed under dual license: AGPL-3.0 (open-source) or commercial. See LICENSE.

from __future__ import annotations

from dataclasses import dataclass

import numpy as np
import pandas as pd


@dataclass
class SimulationOutput:
    data: pd.DataFrame
    true_parameters: dict[str, float]


def generate_simulated_dce(
    n_individuals: int = 300,
    n_tasks: int = 8,
    n_alts: int = 3,
    seed: int = 42,
    attribute_names: list[str] | None = None,
    covariate_names: list[str] | None = None,
    bws: bool = False,
) -> SimulationOutput:
    """
    Generate a panel choice dataset in long format.

    Parameters
    ----------
    attribute_names : list[str] or None
        Custom attribute column names. When None, uses the hardcoded defaults
        (price, time, comfort, reliability) with their original distributions.
    covariate_names : list[str] or None
        Custom covariate column names. When None, uses the hardcoded defaults
        (income, age) with their original distributions.
    bws : bool
        If True, generate a ``worst`` column (binary 0/1) alongside the best
        choice column.  Requires ``n_alts >= 3``.
    """
    if n_individuals < 1 or n_tasks < 1 or n_alts < 2:
        raise ValueError("Need n_individuals >= 1, n_tasks >= 1, n_alts >= 2.")
    if bws and n_alts < 3:
        raise ValueError("BWS simulation requires n_alts >= 3.")

    rng = np.random.default_rng(seed)
    alt_constants = np.linspace(0.0, 0.35, num=n_alts)

    # โ”€โ”€ custom vs hardcoded path โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
    use_custom_attrs = attribute_names is not None
    use_custom_covs = covariate_names is not None

    if use_custom_attrs:
        # Generate true parameters for each custom attribute
        true_params: dict[str, float] = {}
        attr_true_mu: list[float] = []
        attr_true_sd: list[float] = []
        for attr in attribute_names:
            mu = float(rng.uniform(-1.5, 1.5))
            sd = abs(mu) * 0.2
            true_params[f"mu_{attr}"] = round(mu, 4)
            true_params[f"sd_{attr}"] = round(sd, 4)
            attr_true_mu.append(mu)
            attr_true_sd.append(sd)
    else:
        attribute_names = ["price", "time", "comfort", "reliability"]
        true_params = {
            "mu_price": -1.20,
            "sd_price": 0.25,
            "mu_time": -0.70,
            "sd_time": 0.18,
            "mu_comfort": 0.85,
            "sd_comfort": 0.12,
            "beta_reliability": 0.55,
        }

    if not use_custom_covs:
        covariate_names = ["income", "age"]

    for alt_idx in range(1, n_alts):
        true_params[f"asc_alt_{alt_idx + 1}"] = float(alt_constants[alt_idx])

    rows: list[dict[str, float | int]] = []

    # โ”€โ”€ covariate arrays (constant within respondent) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
    if use_custom_covs:
        cov_arrays = {
            name: rng.normal(50, 15, size=n_individuals).clip(10, 100)
            for name in covariate_names
        }
    else:
        cov_arrays = {
            "income": rng.normal(60_000, 15_000, size=n_individuals).clip(18_000, 180_000),
            "age": rng.normal(42, 12, size=n_individuals).clip(18, 80),
        }

    for individual in range(1, n_individuals + 1):
        # individual-level random coefficients
        if use_custom_attrs:
            betas = [rng.normal(attr_true_mu[k], max(attr_true_sd[k], 1e-6))
                     for k in range(len(attribute_names))]
        else:
            betas_named = {
                "price": rng.normal(true_params["mu_price"], true_params["sd_price"]),
                "time": rng.normal(true_params["mu_time"], true_params["sd_time"]),
                "comfort": rng.normal(true_params["mu_comfort"], true_params["sd_comfort"]),
            }

        cov_values = {name: float(arr[individual - 1]) for name, arr in cov_arrays.items()}

        for task in range(1, n_tasks + 1):
            # โ”€โ”€ attribute values โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
            if use_custom_attrs:
                attr_values = {
                    attr: rng.uniform(1, 10, size=n_alts) for attr in attribute_names
                }
                deterministic_utility = alt_constants.copy()
                for k, attr in enumerate(attribute_names):
                    deterministic_utility = deterministic_utility + betas[k] * attr_values[attr]
            else:
                price = rng.uniform(3.0, 30.0, size=n_alts)
                time = rng.uniform(10.0, 90.0, size=n_alts)
                comfort = rng.integers(0, 2, size=n_alts)
                reliability = rng.uniform(0.70, 1.00, size=n_alts)
                deterministic_utility = (
                    alt_constants
                    + betas_named["price"] * np.log1p(price)
                    + betas_named["time"] * (time / 10.0)
                    + betas_named["comfort"] * comfort
                    + true_params["beta_reliability"] * reliability
                )
                attr_values = {
                    "price": price,
                    "time": time,
                    "comfort": comfort,
                    "reliability": reliability,
                }

            random_shock = rng.gumbel(0.0, 1.0, size=n_alts)
            utilities = deterministic_utility + random_shock
            chosen_alt = int(np.argmax(utilities))

            # โ”€โ”€ worst choice (BWS) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
            if bws:
                remaining_utils = utilities.copy()
                remaining_utils[chosen_alt] = np.inf  # exclude best
                worst_alt = int(np.argmin(remaining_utils))

            for alt in range(n_alts):
                row: dict[str, float | int] = {
                    "respondent_id": individual,
                    "task_id": task,
                    "alternative": alt + 1,
                    "choice": int(alt == chosen_alt),
                }
                for attr in attribute_names:
                    val = attr_values[attr][alt]
                    row[attr] = int(val) if isinstance(val, (np.integer,)) else float(val)
                for cov_name, cov_val in cov_values.items():
                    row[cov_name] = cov_val
                if bws:
                    row["worst"] = int(alt == worst_alt)
                rows.append(row)

    return SimulationOutput(data=pd.DataFrame(rows), true_parameters=true_params)