""" Synthetic Walnut Storage Dataset Generator Simulates Indian storage conditions using Arrhenius-based food chemistry kinetics. """ import numpy as np import pandas as pd from pathlib import Path import random np.random.seed(42) random.seed(42) # Physical constants R = 8.314 # J/(mol·K) — gas constant Ea = 80000.0 # J/mol — activation energy for lipid oxidation A = 1.5e12 # pre-exponential factor (1/day) # Rancidity threshold PV_THRESHOLD = 5.0 # meq/kg # Storage scenario profiles (Indian context) SCENARIOS = { "cold_storage": { "temp_range": (2.0, 8.0), "hum_range": (40.0, 60.0), "weight": 0.20, }, "hill_region": { "temp_range": (5.0, 20.0), "hum_range": (35.0, 65.0), "weight": 0.25, }, "ambient_warehouse": { "temp_range": (18.0, 32.0), "hum_range": (50.0, 75.0), "weight": 0.30, }, "hot_transport": { "temp_range": (28.0, 40.0), "hum_range": (55.0, 85.0), "weight": 0.25, }, } SCENARIO_NAMES = list(SCENARIOS.keys()) SCENARIO_WEIGHTS = [SCENARIOS[s]["weight"] for s in SCENARIO_NAMES] def sigmoid(x): return 1.0 / (1.0 + np.exp(-x)) def arrhenius_rate(T_celsius: float, humidity: float, moisture: float) -> float: """Compute daily oxidation rate constant via Arrhenius kinetics.""" T_kelvin = T_celsius + 273.15 k_base = A * np.exp(-Ea / (R * T_kelvin)) # Humidity and moisture accelerate oxidation h_factor = 1.0 + 0.015 * (humidity - 40.0) # relative to 40 % RH m_factor = 1.0 + 0.20 * (moisture - 4.0) # relative to 4 % moisture h_factor = max(h_factor, 0.5) m_factor = max(m_factor, 0.5) return k_base * h_factor * m_factor def generate_sequence( seq_len: int, scenario: str, start_day: int = 0, ) -> list[dict]: """ Generate a single walnut storage sequence. Returns a list of daily records. """ scen = SCENARIOS[scenario] temp_lo, temp_hi = scen["temp_range"] hum_lo, hum_hi = scen["hum_range"] # Initial chemical state PV0 = np.random.uniform(0.3, 1.2) # initial peroxide value FFA0 = np.random.uniform(0.05, 0.15) # initial free fatty acids base_oxy = np.random.uniform(0.18, 0.23) # Day when PV will exceed threshold (for shelf-life label) # We estimate it analytically for the "average" conditions of this scenario T_avg = (temp_lo + temp_hi) / 2.0 H_avg = (hum_lo + hum_hi) / 2.0 M_avg = np.random.uniform(3.0, 8.0) k_avg = arrhenius_rate(T_avg, H_avg, M_avg) if k_avg > 0 and PV0 < PV_THRESHOLD: days_to_rancid = np.log(PV_THRESHOLD / PV0) / k_avg else: days_to_rancid = 0.0 records = [] PV = PV0 FFA = FFA0 for i in range(seq_len): day = start_day + i # Daily environmental fluctuation temp = np.clip(np.random.normal((temp_lo + temp_hi) / 2, (temp_hi - temp_lo) / 6), temp_lo, temp_hi) humidity = np.clip(np.random.normal((hum_lo + hum_hi) / 2, (hum_hi - hum_lo) / 6), hum_lo, hum_hi) moisture = np.clip(np.random.normal(5.0, 1.2), 3.0, 8.0) oxygen = np.clip(np.random.normal(base_oxy, 0.005), 0.18, 0.23) k = arrhenius_rate(temp, humidity, moisture) # Peroxide value (Arrhenius model, Euler step) PV = PV * np.exp(k) # PV(t+1) = PV(t) * exp(k) PV = max(PV, 0.01) # Derived oxidation indicators FFA = FFA + 0.002 * k * PV + np.random.normal(0, 0.002) hexanal = 0.15 * PV + 0.05 * max(PV - 2.0, 0) ** 1.3 \ + np.random.normal(0, 0.05) oxidation_ix = 0.3 * PV + 0.4 * max(FFA, 0) + np.random.normal(0, 0.05) FFA = max(FFA, 0.01) hexanal = max(hexanal, 0.0) oxidation_ix = max(oxidation_ix, 0.0) # Targets # rancidity_probability: sigmoid centred at PV = 5 rand_prob = float(sigmoid(PV - PV_THRESHOLD)) # shelf_life_remaining_days: days until PV > threshold from NOW shelf_life = float(max(days_to_rancid - day, 0.0)) # decay_curve_value: normalised PV on [0,1] scale for regression decay_curve = float(min(PV / 10.0, 1.0)) records.append({ "day": day, "temperature": round(float(temp), 3), "humidity": round(float(humidity), 3), "moisture": round(float(moisture), 3), "oxygen": round(float(oxygen), 5), "peroxide_value": round(float(PV), 4), "free_fatty_acids": round(float(FFA), 4), "hexanal_level": round(float(hexanal), 4), "oxidation_index": round(float(oxidation_ix), 4), "rancidity_probability": round(rand_prob, 6), "shelf_life_remaining_days": round(shelf_life, 2), "decay_curve_value": round(decay_curve, 6), }) return records def generate_dataset(target_sequences: int = 90000) -> pd.DataFrame: """Generate the full dataset as a flat CSV.""" all_records = [] seq_count = 0 print(f"Generating {target_sequences} sequences …") log_every = target_sequences // 10 while seq_count < target_sequences: scenario = random.choices(SCENARIO_NAMES, weights=SCENARIO_WEIGHTS, k=1)[0] seq_len = random.randint(30, 90) start_day = random.randint(0, 90) records = generate_sequence(seq_len, scenario, start_day) # Tag each record with a sequence_id seq_id = seq_count for rec in records: rec["sequence_id"] = seq_id all_records.extend(records) seq_count += 1 if seq_count % log_every == 0: print(f" {seq_count}/{target_sequences} sequences " f"({len(all_records):,} rows)") df = pd.DataFrame(all_records) # Reorder columns cols = [ "sequence_id", "day", "temperature", "humidity", "moisture", "oxygen", "peroxide_value", "free_fatty_acids", "hexanal_level", "oxidation_index", "rancidity_probability", "shelf_life_remaining_days", "decay_curve_value", ] df = df[cols] return df if __name__ == "__main__": out_dir = Path("data") out_dir.mkdir(exist_ok=True) df = generate_dataset(target_sequences=90000) out_path = out_dir / "walnut_storage_timeseries.csv" df.to_csv(out_path, index=False) print(f"\nDataset saved → {out_path}") print(f"Total rows : {len(df):,}") print(f"Total seqs : {df['sequence_id'].nunique():,}") print(df.describe().to_string())