MLOPs_end2end_api / scripts /synthesize_data.py
RCEjosephkarl
Initial commit: end-to-end MLOps pipeline project
a7af3e9
Raw
History Blame Contribute Delete
4.75 kB
"""
Synthesize ~2000 additional rows for Computer_Durability_Plus.csv.
Distribution matches the original 999-row dataset but introduces mild drift
so Evidently can detect it:
- Hours Used Per Day shifted +2h (heavier usage in new cohort)
- Cost shifted -$3000 (cheaper machines in new cohort)
- Class rate ~7% positive (up from 5%), still realistic
The synthesized rows are appended to the original data and saved as
Computer_Durability_Plus.csv at the project root.
"""
from __future__ import annotations
import csv
import random
import math
from pathlib import Path
SEED = 42
N_SYNTH = 2000
ROOT = Path(__file__).parent.parent
# Original distribution parameters (derived from EDA)
ORIG = {
"hours_mean": 12.648, "hours_std": 6.558, "hours_min": 1.0, "hours_max": 24.0,
"cost_mean": 33789.0, "cost_std": 9647.0, "cost_min": 5000.0, "cost_max": 50000.0,
"age_mean": 36.637, "age_std": 16.560, "age_min": 8.0, "age_max": 65.0,
"comp_age_mean": 29.654, "comp_age_std": 16.862, "comp_age_min": 1.0, "comp_age_max": 60.0,
}
# Mild drift: heavier usage, cheaper machines
DRIFT = {
"hours_mean": 14.8, # +2.15h drift
"hours_std": 6.2,
"cost_mean": 30500.0, # -$3289 drift
"cost_std": 9200.0,
"age_mean": 36.637, # unchanged
"age_std": 16.560,
"comp_age_mean": 29.654,
"comp_age_std": 16.862,
}
HEADER = [
"Hours Used Per Day", "Cost", "User Age",
"Needs Replacement", "Primary Usage", "Brand", "Computer Age (Months)"
]
def clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
def box_muller(mean: float, std: float, rng: random.Random) -> float:
u1, u2 = rng.random(), rng.random()
z = math.sqrt(-2 * math.log(max(u1, 1e-12))) * math.cos(2 * math.pi * u2)
return mean + std * z
def replacement_probability(hours: float, cost: float, user_age: float) -> float:
"""
Signal calibrated to match observed replacement rates:
<12h/day β†’ ~0%, 12-16h β†’ ~8%, >16h β†’ ~11%
Cost modifier: cheaper machine β†’ higher risk.
Age modifier: older users slightly more at risk.
Overall positive rate: ~5-7% (original ~5%, drift cohort ~7%).
"""
if hours < 12.0:
base = 0.002
elif hours < 16.0:
base = 0.07
else:
base = 0.11
# Cost: scale from 1.8 (cheapest, $5k) to 0.2 (most expensive, $50k)
cost_factor = 2.0 - 1.8 * (cost - 5000.0) / 45000.0
# User age: mild uplift for older users
age_factor = 1.0 + 0.01 * max(0.0, user_age - 40.0)
return min(base * cost_factor * age_factor, 0.85)
def synthesize(n: int, rng: random.Random) -> list[list]:
rows = []
for _ in range(n):
hours = clamp(box_muller(DRIFT["hours_mean"], DRIFT["hours_std"], rng), 1.0, 24.0)
cost = clamp(box_muller(DRIFT["cost_mean"], DRIFT["cost_std"], rng), 5000.0, 50000.0)
user_age = clamp(box_muller(DRIFT["age_mean"], DRIFT["age_std"], rng), 8.0, 65.0)
comp_age = clamp(box_muller(DRIFT["comp_age_mean"], DRIFT["comp_age_std"], rng), 1.0, 60.0)
primary_usage = rng.randint(1, 4)
brand = rng.randint(1, 5)
p_replace = replacement_probability(hours, cost, user_age)
needs_replacement = 1 if rng.random() < p_replace else 0
rows.append([
round(hours, 8), round(cost, 5), round(user_age, 8),
needs_replacement, primary_usage, brand, round(comp_age, 7)
])
return rows
def main() -> None:
rng = random.Random(SEED)
src = ROOT / "Computer_Durability.csv"
dst = ROOT / "Computer_Durability_Plus.csv"
orig_rows = []
with src.open(encoding="utf-8-sig") as f:
reader = csv.reader(f)
next(reader) # skip header
for r in reader:
orig_rows.append([
float(r[0]), float(r[1]), float(r[2]),
int(r[3]), int(r[4]), int(r[5]), float(r[6])
])
synth_rows = synthesize(N_SYNTH, rng)
all_rows = orig_rows + synth_rows
pos = sum(1 for r in all_rows if r[3] == 1)
print(f"Original rows : {len(orig_rows)}")
print(f"Synthesized : {len(synth_rows)}")
print(f"Total rows : {len(all_rows)}")
print(f"Positive class: {pos} ({100*pos/len(all_rows):.1f}%)")
with dst.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(HEADER)
writer.writerows(all_rows)
print(f"Saved β†’ {dst}")
# Also copy raw originals into data/raw/
raw_dir = ROOT / "data" / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy(src, raw_dir / src.name)
shutil.copy(dst, raw_dir / dst.name)
print(f"Copied raw files β†’ {raw_dir}")
if __name__ == "__main__":
main()