Spaces:
Sleeping
Sleeping
| """ | |
| Synthesize ~2000 additional rows for Computer_Durability_Plus.csv. | |
| Distribution matches the original 999-row dataset but introduces mild drift | |
| so Evidently can detect it: | |
| - Hours Used Per Day shifted +2h (heavier usage in new cohort) | |
| - Cost shifted -$3000 (cheaper machines in new cohort) | |
| - Class rate ~7% positive (up from 5%), still realistic | |
| The synthesized rows are appended to the original data and saved as | |
| Computer_Durability_Plus.csv at the project root. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import random | |
| import math | |
| from pathlib import Path | |
| SEED = 42 | |
| N_SYNTH = 2000 | |
| ROOT = Path(__file__).parent.parent | |
| # Original distribution parameters (derived from EDA) | |
| ORIG = { | |
| "hours_mean": 12.648, "hours_std": 6.558, "hours_min": 1.0, "hours_max": 24.0, | |
| "cost_mean": 33789.0, "cost_std": 9647.0, "cost_min": 5000.0, "cost_max": 50000.0, | |
| "age_mean": 36.637, "age_std": 16.560, "age_min": 8.0, "age_max": 65.0, | |
| "comp_age_mean": 29.654, "comp_age_std": 16.862, "comp_age_min": 1.0, "comp_age_max": 60.0, | |
| } | |
| # Mild drift: heavier usage, cheaper machines | |
| DRIFT = { | |
| "hours_mean": 14.8, # +2.15h drift | |
| "hours_std": 6.2, | |
| "cost_mean": 30500.0, # -$3289 drift | |
| "cost_std": 9200.0, | |
| "age_mean": 36.637, # unchanged | |
| "age_std": 16.560, | |
| "comp_age_mean": 29.654, | |
| "comp_age_std": 16.862, | |
| } | |
| HEADER = [ | |
| "Hours Used Per Day", "Cost", "User Age", | |
| "Needs Replacement", "Primary Usage", "Brand", "Computer Age (Months)" | |
| ] | |
| def clamp(value: float, lo: float, hi: float) -> float: | |
| return max(lo, min(hi, value)) | |
| def box_muller(mean: float, std: float, rng: random.Random) -> float: | |
| u1, u2 = rng.random(), rng.random() | |
| z = math.sqrt(-2 * math.log(max(u1, 1e-12))) * math.cos(2 * math.pi * u2) | |
| return mean + std * z | |
| def replacement_probability(hours: float, cost: float, user_age: float) -> float: | |
| """ | |
| Signal calibrated to match observed replacement rates: | |
| <12h/day β ~0%, 12-16h β ~8%, >16h β ~11% | |
| Cost modifier: cheaper machine β higher risk. | |
| Age modifier: older users slightly more at risk. | |
| Overall positive rate: ~5-7% (original ~5%, drift cohort ~7%). | |
| """ | |
| if hours < 12.0: | |
| base = 0.002 | |
| elif hours < 16.0: | |
| base = 0.07 | |
| else: | |
| base = 0.11 | |
| # Cost: scale from 1.8 (cheapest, $5k) to 0.2 (most expensive, $50k) | |
| cost_factor = 2.0 - 1.8 * (cost - 5000.0) / 45000.0 | |
| # User age: mild uplift for older users | |
| age_factor = 1.0 + 0.01 * max(0.0, user_age - 40.0) | |
| return min(base * cost_factor * age_factor, 0.85) | |
| def synthesize(n: int, rng: random.Random) -> list[list]: | |
| rows = [] | |
| for _ in range(n): | |
| hours = clamp(box_muller(DRIFT["hours_mean"], DRIFT["hours_std"], rng), 1.0, 24.0) | |
| cost = clamp(box_muller(DRIFT["cost_mean"], DRIFT["cost_std"], rng), 5000.0, 50000.0) | |
| user_age = clamp(box_muller(DRIFT["age_mean"], DRIFT["age_std"], rng), 8.0, 65.0) | |
| comp_age = clamp(box_muller(DRIFT["comp_age_mean"], DRIFT["comp_age_std"], rng), 1.0, 60.0) | |
| primary_usage = rng.randint(1, 4) | |
| brand = rng.randint(1, 5) | |
| p_replace = replacement_probability(hours, cost, user_age) | |
| needs_replacement = 1 if rng.random() < p_replace else 0 | |
| rows.append([ | |
| round(hours, 8), round(cost, 5), round(user_age, 8), | |
| needs_replacement, primary_usage, brand, round(comp_age, 7) | |
| ]) | |
| return rows | |
| def main() -> None: | |
| rng = random.Random(SEED) | |
| src = ROOT / "Computer_Durability.csv" | |
| dst = ROOT / "Computer_Durability_Plus.csv" | |
| orig_rows = [] | |
| with src.open(encoding="utf-8-sig") as f: | |
| reader = csv.reader(f) | |
| next(reader) # skip header | |
| for r in reader: | |
| orig_rows.append([ | |
| float(r[0]), float(r[1]), float(r[2]), | |
| int(r[3]), int(r[4]), int(r[5]), float(r[6]) | |
| ]) | |
| synth_rows = synthesize(N_SYNTH, rng) | |
| all_rows = orig_rows + synth_rows | |
| pos = sum(1 for r in all_rows if r[3] == 1) | |
| print(f"Original rows : {len(orig_rows)}") | |
| print(f"Synthesized : {len(synth_rows)}") | |
| print(f"Total rows : {len(all_rows)}") | |
| print(f"Positive class: {pos} ({100*pos/len(all_rows):.1f}%)") | |
| with dst.open("w", newline="", encoding="utf-8") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(HEADER) | |
| writer.writerows(all_rows) | |
| print(f"Saved β {dst}") | |
| # Also copy raw originals into data/raw/ | |
| raw_dir = ROOT / "data" / "raw" | |
| raw_dir.mkdir(parents=True, exist_ok=True) | |
| import shutil | |
| shutil.copy(src, raw_dir / src.name) | |
| shutil.copy(dst, raw_dir / dst.name) | |
| print(f"Copied raw files β {raw_dir}") | |
| if __name__ == "__main__": | |
| main() | |