walnut-rancidity-predictor / generate_data.py
Arko007's picture
Upload generate_data.py with huggingface_hub
7d806fd verified
"""
Synthetic Walnut Storage Dataset Generator
Simulates Indian storage conditions using Arrhenius-based food chemistry kinetics.
"""
import numpy as np
import pandas as pd
from pathlib import Path
import random
np.random.seed(42)
random.seed(42)
# Physical constants
R = 8.314 # J/(mol·K) — gas constant
Ea = 80000.0 # J/mol — activation energy for lipid oxidation
A = 1.5e12 # pre-exponential factor (1/day)
# Rancidity threshold
PV_THRESHOLD = 5.0 # meq/kg
# Storage scenario profiles (Indian context)
SCENARIOS = {
"cold_storage": {
"temp_range": (2.0, 8.0),
"hum_range": (40.0, 60.0),
"weight": 0.20,
},
"hill_region": {
"temp_range": (5.0, 20.0),
"hum_range": (35.0, 65.0),
"weight": 0.25,
},
"ambient_warehouse": {
"temp_range": (18.0, 32.0),
"hum_range": (50.0, 75.0),
"weight": 0.30,
},
"hot_transport": {
"temp_range": (28.0, 40.0),
"hum_range": (55.0, 85.0),
"weight": 0.25,
},
}
SCENARIO_NAMES = list(SCENARIOS.keys())
SCENARIO_WEIGHTS = [SCENARIOS[s]["weight"] for s in SCENARIO_NAMES]
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
def arrhenius_rate(T_celsius: float, humidity: float, moisture: float) -> float:
"""Compute daily oxidation rate constant via Arrhenius kinetics."""
T_kelvin = T_celsius + 273.15
k_base = A * np.exp(-Ea / (R * T_kelvin))
# Humidity and moisture accelerate oxidation
h_factor = 1.0 + 0.015 * (humidity - 40.0) # relative to 40 % RH
m_factor = 1.0 + 0.20 * (moisture - 4.0) # relative to 4 % moisture
h_factor = max(h_factor, 0.5)
m_factor = max(m_factor, 0.5)
return k_base * h_factor * m_factor
def generate_sequence(
seq_len: int,
scenario: str,
start_day: int = 0,
) -> list[dict]:
"""
Generate a single walnut storage sequence.
Returns a list of daily records.
"""
scen = SCENARIOS[scenario]
temp_lo, temp_hi = scen["temp_range"]
hum_lo, hum_hi = scen["hum_range"]
# Initial chemical state
PV0 = np.random.uniform(0.3, 1.2) # initial peroxide value
FFA0 = np.random.uniform(0.05, 0.15) # initial free fatty acids
base_oxy = np.random.uniform(0.18, 0.23)
# Day when PV will exceed threshold (for shelf-life label)
# We estimate it analytically for the "average" conditions of this scenario
T_avg = (temp_lo + temp_hi) / 2.0
H_avg = (hum_lo + hum_hi) / 2.0
M_avg = np.random.uniform(3.0, 8.0)
k_avg = arrhenius_rate(T_avg, H_avg, M_avg)
if k_avg > 0 and PV0 < PV_THRESHOLD:
days_to_rancid = np.log(PV_THRESHOLD / PV0) / k_avg
else:
days_to_rancid = 0.0
records = []
PV = PV0
FFA = FFA0
for i in range(seq_len):
day = start_day + i
# Daily environmental fluctuation
temp = np.clip(np.random.normal((temp_lo + temp_hi) / 2,
(temp_hi - temp_lo) / 6),
temp_lo, temp_hi)
humidity = np.clip(np.random.normal((hum_lo + hum_hi) / 2,
(hum_hi - hum_lo) / 6),
hum_lo, hum_hi)
moisture = np.clip(np.random.normal(5.0, 1.2), 3.0, 8.0)
oxygen = np.clip(np.random.normal(base_oxy, 0.005), 0.18, 0.23)
k = arrhenius_rate(temp, humidity, moisture)
# Peroxide value (Arrhenius model, Euler step)
PV = PV * np.exp(k) # PV(t+1) = PV(t) * exp(k)
PV = max(PV, 0.01)
# Derived oxidation indicators
FFA = FFA + 0.002 * k * PV + np.random.normal(0, 0.002)
hexanal = 0.15 * PV + 0.05 * max(PV - 2.0, 0) ** 1.3 \
+ np.random.normal(0, 0.05)
oxidation_ix = 0.3 * PV + 0.4 * max(FFA, 0) + np.random.normal(0, 0.05)
FFA = max(FFA, 0.01)
hexanal = max(hexanal, 0.0)
oxidation_ix = max(oxidation_ix, 0.0)
# Targets
# rancidity_probability: sigmoid centred at PV = 5
rand_prob = float(sigmoid(PV - PV_THRESHOLD))
# shelf_life_remaining_days: days until PV > threshold from NOW
shelf_life = float(max(days_to_rancid - day, 0.0))
# decay_curve_value: normalised PV on [0,1] scale for regression
decay_curve = float(min(PV / 10.0, 1.0))
records.append({
"day": day,
"temperature": round(float(temp), 3),
"humidity": round(float(humidity), 3),
"moisture": round(float(moisture), 3),
"oxygen": round(float(oxygen), 5),
"peroxide_value": round(float(PV), 4),
"free_fatty_acids": round(float(FFA), 4),
"hexanal_level": round(float(hexanal), 4),
"oxidation_index": round(float(oxidation_ix), 4),
"rancidity_probability": round(rand_prob, 6),
"shelf_life_remaining_days": round(shelf_life, 2),
"decay_curve_value": round(decay_curve, 6),
})
return records
def generate_dataset(target_sequences: int = 90000) -> pd.DataFrame:
"""Generate the full dataset as a flat CSV."""
all_records = []
seq_count = 0
print(f"Generating {target_sequences} sequences …")
log_every = target_sequences // 10
while seq_count < target_sequences:
scenario = random.choices(SCENARIO_NAMES, weights=SCENARIO_WEIGHTS, k=1)[0]
seq_len = random.randint(30, 90)
start_day = random.randint(0, 90)
records = generate_sequence(seq_len, scenario, start_day)
# Tag each record with a sequence_id
seq_id = seq_count
for rec in records:
rec["sequence_id"] = seq_id
all_records.extend(records)
seq_count += 1
if seq_count % log_every == 0:
print(f" {seq_count}/{target_sequences} sequences "
f"({len(all_records):,} rows)")
df = pd.DataFrame(all_records)
# Reorder columns
cols = [
"sequence_id", "day",
"temperature", "humidity", "moisture", "oxygen",
"peroxide_value", "free_fatty_acids", "hexanal_level", "oxidation_index",
"rancidity_probability", "shelf_life_remaining_days", "decay_curve_value",
]
df = df[cols]
return df
if __name__ == "__main__":
out_dir = Path("data")
out_dir.mkdir(exist_ok=True)
df = generate_dataset(target_sequences=90000)
out_path = out_dir / "walnut_storage_timeseries.csv"
df.to_csv(out_path, index=False)
print(f"\nDataset saved → {out_path}")
print(f"Total rows : {len(df):,}")
print(f"Total seqs : {df['sequence_id'].nunique():,}")
print(df.describe().to_string())