kernelx-strategist / training /data /preprocess.py
Rayugacodes's picture
Training pipeline scripts
32a197f verified
"""
KernelX Intelligence Layer — Data Ingestion and Preprocessing (Stage 1)
Reads raw state_transitions.jsonl from the bridge's TrajectoryManager,
applies feature scaling (symlog for huge counters), drops sparse-zero
features, and produces train/val/test splits for World Model and
Strategist training.
Usage:
python -m training.data.preprocess --input data/state_transitions.jsonl
"""
import json
import argparse
import sys
from pathlib import Path
from typing import List, Dict, Any
import numpy as np
# ---------------------------------------------------------------------------
# Configuration (loaded from preprocessing_config.json)
# ---------------------------------------------------------------------------
CONFIG_PATH = Path(__file__).parent / "preprocessing_config.json"
def load_config() -> dict:
with open(CONFIG_PATH) as f:
return json.load(f)
CONFIG = load_config()
SYMLOG_FEATURES = CONFIG["symlog_features"] # [4, 5, 6]
ACTIVE_FEATURES = CONFIG["active_features"] # [0,1,2,3,4,5,6,7,12,23]
FEATURE_NAMES = CONFIG["feature_names"] # 10 short names
SPARSE_ZERO = CONFIG["sparse_zero_features"] # indices to drop
# ---------------------------------------------------------------------------
# Feature index constants (positions within the ACTIVE 10D vector)
# ---------------------------------------------------------------------------
IDX_CPU = 0 # raw index 0
IDX_PRIO = 1 # raw index 1
IDX_STATIC_PRIO = 2 # raw index 2
IDX_NORMAL_PRIO = 3 # raw index 3
IDX_EXEC_NS = 4 # raw index 4 (symlog)
IDX_VRUNTIME = 5 # raw index 5 (symlog)
IDX_MIGRATIONS = 6 # raw index 6 (symlog)
IDX_CPUS_ALLOWED = 7 # raw index 7
IDX_CTX_SWITCHES = 8 # raw index 12
IDX_WAIT_US = 9 # raw index 23
# ---------------------------------------------------------------------------
# Scaling functions
# ---------------------------------------------------------------------------
def symmetric_log(x: float) -> float:
"""sgn(x) * ln(1 + |x|) — compresses huge values while preserving sign."""
return float(np.sign(x) * np.log1p(np.abs(x)))
def preprocess_features(raw_features: List[float]) -> List[float]:
"""Transform a raw 24D feature vector: symlog the big counters."""
f = list(raw_features) # copy
for idx in SYMLOG_FEATURES:
f[idx] = symmetric_log(f[idx])
return f
def extract_active(scaled_features: List[float]) -> List[float]:
"""Keep only the active (non-zero, information-carrying) features."""
return [scaled_features[i] for i in ACTIVE_FEATURES]
# ---------------------------------------------------------------------------
# Prompt formatting for LLM consumption
# ---------------------------------------------------------------------------
def format_state(active_vector: List[float]) -> str:
"""Convert a 10D active feature vector into a compact text string."""
parts = []
for name, val in zip(FEATURE_NAMES, active_vector):
if val == int(val):
parts.append(f"{name}:{int(val)}")
else:
parts.append(f"{name}:{val:.2f}")
return " | ".join(parts)
# ---------------------------------------------------------------------------
# Record processing
# ---------------------------------------------------------------------------
def preprocess_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""Transform a single raw JSONL record into training-ready format."""
s_t_scaled = preprocess_features(record["state_t"]["features"])
s_t1_scaled = preprocess_features(record["state_t_next"]["features"])
s_t_active = extract_active(s_t_scaled)
s_t1_active = extract_active(s_t1_scaled)
return {
"state": s_t_active,
"action": record["action"],
"reward": record["reward"],
"next_state": s_t1_active,
"pid": record["state_t"]["pid"],
"cpu": record["state_t"]["cpu"],
"timestamp": record["state_t"]["timestamp"],
}
# ---------------------------------------------------------------------------
# Dataset audit
# ---------------------------------------------------------------------------
def audit_dataset(records: List[Dict]) -> None:
"""Print per-feature statistics for the raw dataset."""
all_features = []
for r in records:
all_features.append(r["state_t"]["features"])
all_features.append(r["state_t_next"]["features"])
arr = np.array(all_features, dtype=np.float64)
print(f"\nTotal transitions: {len(records)}")
print(f"Total feature vectors: {len(all_features)}")
print(f"\n{'Idx':<5} {'Min':<22} {'Max':<22} {'Mean':<22} {'Std':<22} {'Zeros%':<10}")
print("-" * 103)
for i in range(24):
col = arr[:, i]
zero_pct = (col == 0).sum() / len(col) * 100
print(f"{i:<5} {col.min():<22.2f} {col.max():<22.2f} {col.mean():<22.2f} {col.std():<22.2f} {zero_pct:<10.1f}")
print(f"\nNaN count: {np.isnan(arr).sum()}")
print(f"Inf count: {np.isinf(arr).sum()}")
actions = [r["action"] for r in records]
rewards = [r["reward"] for r in records]
print(f"\nAction — unique values: {sorted(set(actions))}")
print(f"Reward — min: {min(rewards)}, max: {max(rewards)}, mean: {np.mean(rewards):.2f}, std: {np.std(rewards):.2f}")
# ---------------------------------------------------------------------------
# Train / Val / Test split (chronological)
# ---------------------------------------------------------------------------
def split_chronological(processed: List[Dict], train_ratio=0.8, val_ratio=0.1):
"""Split processed records chronologically (NOT randomly)."""
processed.sort(key=lambda x: x["timestamp"])
n = len(processed)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
return processed[:train_end], processed[train_end:val_end], processed[val_end:]
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def run_pipeline(input_path: str, output_dir: str, audit: bool = True):
"""Full preprocessing pipeline: audit -> scale -> split -> save."""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Load raw data
print(f"Loading raw data from {input_path} ...")
records = []
with open(input_path) as f:
for line in f:
line = line.strip()
if line:
records.append(json.loads(line))
if not records:
print("ERROR: No records found in input file.")
sys.exit(1)
print(f"Loaded {len(records)} raw transitions.")
# Audit
if audit:
audit_dataset(records)
# Preprocess
print("\nPreprocessing (symlog scaling + active feature extraction) ...")
processed = [preprocess_record(r) for r in records]
# Verify transform on first record
sample = processed[0]
print(f"\nSample preprocessed state (10D):")
print(f" {format_state(sample['state'])}")
print(f"Sample preprocessed next_state:")
print(f" {format_state(sample['next_state'])}")
# Save full processed dataset
processed_path = output_dir / "processed_transitions.jsonl"
with open(processed_path, "w") as f:
for p in processed:
f.write(json.dumps(p) + "\n")
print(f"\nSaved {len(processed)} processed records to {processed_path}")
# Split
train, val, test = split_chronological(processed)
for split_name, split_data in [("train", train), ("val", val), ("test", test)]:
split_path = output_dir / f"{split_name}.jsonl"
with open(split_path, "w") as f:
for item in split_data:
f.write(json.dumps(item) + "\n")
print(f"{split_name}: {len(split_data)} records -> {split_path}")
print("\nPreprocessing complete.")
return train, val, test
def main():
parser = argparse.ArgumentParser(description="KernelX data preprocessing pipeline")
parser.add_argument("--input", required=True, help="Path to raw state_transitions.jsonl")
parser.add_argument("--output-dir", default=str(Path(__file__).parent), help="Output directory for processed data")
parser.add_argument("--no-audit", action="store_true", help="Skip the dataset audit step")
args = parser.parse_args()
run_pipeline(args.input, args.output_dir, audit=not args.no_audit)
if __name__ == "__main__":
main()