Spaces:
Sleeping
Sleeping
File size: 4,754 Bytes
2ae5a57 a7af3e9 2ae5a57 a7af3e9 e129ce5 2ae5a57 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
Synthesize ~2000 additional rows for Computer_Durability_Plus.csv.
Distribution matches the original 999-row dataset but introduces mild drift
so Evidently can detect it:
- Hours Used Per Day shifted +2h (heavier usage in new cohort)
- Cost shifted -$3000 (cheaper machines in new cohort)
- Class rate ~7% positive (up from 5%), still realistic
The synthesized rows are appended to the original data and saved as
Computer_Durability_Plus.csv at the project root.
"""
from __future__ import annotations
import csv
import random
import math
from pathlib import Path
SEED = 42
N_SYNTH = 2000
ROOT = Path(__file__).parent.parent
# Original distribution parameters (derived from EDA)
ORIG = {
"hours_mean": 12.648, "hours_std": 6.558, "hours_min": 1.0, "hours_max": 24.0,
"cost_mean": 33789.0, "cost_std": 9647.0, "cost_min": 5000.0, "cost_max": 50000.0,
"age_mean": 36.637, "age_std": 16.560, "age_min": 8.0, "age_max": 65.0,
"comp_age_mean": 29.654, "comp_age_std": 16.862, "comp_age_min": 1.0, "comp_age_max": 60.0,
}
# Mild drift: heavier usage, cheaper machines
DRIFT = {
"hours_mean": 14.8, # +2.15h drift
"hours_std": 6.2,
"cost_mean": 30500.0, # -$3289 drift
"cost_std": 9200.0,
"age_mean": 36.637, # unchanged
"age_std": 16.560,
"comp_age_mean": 29.654,
"comp_age_std": 16.862,
}
HEADER = [
"Hours Used Per Day", "Cost", "User Age",
"Needs Replacement", "Primary Usage", "Brand", "Computer Age (Months)"
]
def clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
def box_muller(mean: float, std: float, rng: random.Random) -> float:
u1, u2 = rng.random(), rng.random()
z = math.sqrt(-2 * math.log(max(u1, 1e-12))) * math.cos(2 * math.pi * u2)
return mean + std * z
def replacement_probability(hours: float, cost: float, user_age: float) -> float:
"""
Signal calibrated to match observed replacement rates:
<12h/day β ~0%, 12-16h β ~8%, >16h β ~11%
Cost modifier: cheaper machine β higher risk.
Age modifier: older users slightly more at risk.
Overall positive rate: ~5-7% (original ~5%, drift cohort ~7%).
"""
if hours < 12.0:
base = 0.002
elif hours < 16.0:
base = 0.07
else:
base = 0.11
# Cost: scale from 1.8 (cheapest, $5k) to 0.2 (most expensive, $50k)
cost_factor = 2.0 - 1.8 * (cost - 5000.0) / 45000.0
# User age: mild uplift for older users
age_factor = 1.0 + 0.01 * max(0.0, user_age - 40.0)
return min(base * cost_factor * age_factor, 0.85)
def synthesize(n: int, rng: random.Random) -> list[list]:
rows = []
for _ in range(n):
hours = clamp(box_muller(DRIFT["hours_mean"], DRIFT["hours_std"], rng), 1.0, 24.0)
cost = clamp(box_muller(DRIFT["cost_mean"], DRIFT["cost_std"], rng), 5000.0, 50000.0)
user_age = clamp(box_muller(DRIFT["age_mean"], DRIFT["age_std"], rng), 8.0, 65.0)
comp_age = clamp(box_muller(DRIFT["comp_age_mean"], DRIFT["comp_age_std"], rng), 1.0, 60.0)
primary_usage = rng.randint(1, 4)
brand = rng.randint(1, 5)
p_replace = replacement_probability(hours, cost, user_age)
needs_replacement = 1 if rng.random() < p_replace else 0
rows.append([
round(hours, 8), round(cost, 5), round(user_age, 8),
needs_replacement, primary_usage, brand, round(comp_age, 7)
])
return rows
def main() -> None:
rng = random.Random(SEED)
src = ROOT / "Computer_Durability.csv"
dst = ROOT / "Computer_Durability_Plus.csv"
orig_rows = []
with src.open(encoding="utf-8-sig") as f:
reader = csv.reader(f)
next(reader) # skip header
for r in reader:
orig_rows.append([
float(r[0]), float(r[1]), float(r[2]),
int(r[3]), int(r[4]), int(r[5]), float(r[6])
])
synth_rows = synthesize(N_SYNTH, rng)
all_rows = orig_rows + synth_rows
pos = sum(1 for r in all_rows if r[3] == 1)
print(f"Original rows : {len(orig_rows)}")
print(f"Synthesized : {len(synth_rows)}")
print(f"Total rows : {len(all_rows)}")
print(f"Positive class: {pos} ({100*pos/len(all_rows):.1f}%)")
with dst.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(HEADER)
writer.writerows(all_rows)
print(f"Saved β {dst}")
# Also copy raw originals into data/raw/
raw_dir = ROOT / "data" / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy(src, raw_dir / src.name)
shutil.copy(dst, raw_dir / dst.name)
print(f"Copied raw files β {raw_dir}")
if __name__ == "__main__":
main()
|