File size: 4,754 Bytes
2ae5a57
 
 
 
 
 
 
 
 
 
a7af3e9
2ae5a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a7af3e9
 
e129ce5
2ae5a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Synthesize ~2000 additional rows for Computer_Durability_Plus.csv.

Distribution matches the original 999-row dataset but introduces mild drift
so Evidently can detect it:
  - Hours Used Per Day shifted +2h (heavier usage in new cohort)
  - Cost shifted -$3000 (cheaper machines in new cohort)
  - Class rate ~7% positive (up from 5%), still realistic

The synthesized rows are appended to the original data and saved as
Computer_Durability_Plus.csv at the project root.
"""
from __future__ import annotations

import csv
import random
import math
from pathlib import Path

SEED = 42
N_SYNTH = 2000
ROOT = Path(__file__).parent.parent

# Original distribution parameters (derived from EDA)
ORIG = {
    "hours_mean": 12.648, "hours_std": 6.558, "hours_min": 1.0, "hours_max": 24.0,
    "cost_mean": 33789.0, "cost_std": 9647.0, "cost_min": 5000.0, "cost_max": 50000.0,
    "age_mean": 36.637, "age_std": 16.560, "age_min": 8.0, "age_max": 65.0,
    "comp_age_mean": 29.654, "comp_age_std": 16.862, "comp_age_min": 1.0, "comp_age_max": 60.0,
}
# Mild drift: heavier usage, cheaper machines
DRIFT = {
    "hours_mean": 14.8,    # +2.15h drift
    "hours_std": 6.2,
    "cost_mean": 30500.0,  # -$3289 drift
    "cost_std": 9200.0,
    "age_mean": 36.637,    # unchanged
    "age_std": 16.560,
    "comp_age_mean": 29.654,
    "comp_age_std": 16.862,
}

HEADER = [
    "Hours Used Per Day", "Cost", "User Age",
    "Needs Replacement", "Primary Usage", "Brand", "Computer Age (Months)"
]


def clamp(value: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, value))


def box_muller(mean: float, std: float, rng: random.Random) -> float:
    u1, u2 = rng.random(), rng.random()
    z = math.sqrt(-2 * math.log(max(u1, 1e-12))) * math.cos(2 * math.pi * u2)
    return mean + std * z


def replacement_probability(hours: float, cost: float, user_age: float) -> float:
    """
    Signal calibrated to match observed replacement rates:
      <12h/day β†’ ~0%,  12-16h β†’ ~8%,  >16h β†’ ~11%
    Cost modifier: cheaper machine β†’ higher risk.
    Age modifier: older users slightly more at risk.
    Overall positive rate: ~5-7% (original ~5%, drift cohort ~7%).
    """
    if hours < 12.0:
        base = 0.002
    elif hours < 16.0:
        base = 0.07
    else:
        base = 0.11
    # Cost: scale from 1.8 (cheapest, $5k) to 0.2 (most expensive, $50k)
    cost_factor = 2.0 - 1.8 * (cost - 5000.0) / 45000.0
    # User age: mild uplift for older users
    age_factor = 1.0 + 0.01 * max(0.0, user_age - 40.0)
    return min(base * cost_factor * age_factor, 0.85)


def synthesize(n: int, rng: random.Random) -> list[list]:
    rows = []
    for _ in range(n):
        hours = clamp(box_muller(DRIFT["hours_mean"], DRIFT["hours_std"], rng), 1.0, 24.0)
        cost = clamp(box_muller(DRIFT["cost_mean"], DRIFT["cost_std"], rng), 5000.0, 50000.0)
        user_age = clamp(box_muller(DRIFT["age_mean"], DRIFT["age_std"], rng), 8.0, 65.0)
        comp_age = clamp(box_muller(DRIFT["comp_age_mean"], DRIFT["comp_age_std"], rng), 1.0, 60.0)
        primary_usage = rng.randint(1, 4)
        brand = rng.randint(1, 5)
        p_replace = replacement_probability(hours, cost, user_age)
        needs_replacement = 1 if rng.random() < p_replace else 0
        rows.append([
            round(hours, 8), round(cost, 5), round(user_age, 8),
            needs_replacement, primary_usage, brand, round(comp_age, 7)
        ])
    return rows


def main() -> None:
    rng = random.Random(SEED)

    src = ROOT / "Computer_Durability.csv"
    dst = ROOT / "Computer_Durability_Plus.csv"

    orig_rows = []
    with src.open(encoding="utf-8-sig") as f:
        reader = csv.reader(f)
        next(reader)  # skip header
        for r in reader:
            orig_rows.append([
                float(r[0]), float(r[1]), float(r[2]),
                int(r[3]), int(r[4]), int(r[5]), float(r[6])
            ])

    synth_rows = synthesize(N_SYNTH, rng)
    all_rows = orig_rows + synth_rows

    pos = sum(1 for r in all_rows if r[3] == 1)
    print(f"Original rows : {len(orig_rows)}")
    print(f"Synthesized   : {len(synth_rows)}")
    print(f"Total rows    : {len(all_rows)}")
    print(f"Positive class: {pos} ({100*pos/len(all_rows):.1f}%)")

    with dst.open("w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(HEADER)
        writer.writerows(all_rows)

    print(f"Saved β†’ {dst}")

    # Also copy raw originals into data/raw/
    raw_dir = ROOT / "data" / "raw"
    raw_dir.mkdir(parents=True, exist_ok=True)
    import shutil
    shutil.copy(src, raw_dir / src.name)
    shutil.copy(dst, raw_dir / dst.name)
    print(f"Copied raw files β†’ {raw_dir}")


if __name__ == "__main__":
    main()