Rayugacodes's picture
Training pipeline scripts
32a197f verified
"""
KernelX Intelligence Layer — Gradio Demo (Stage 7)
Judge-facing demo that shows:
1. Baseline SmolLM2-360M output (untrained) on kernel states
2. World Model predictions vs actual next states
3. Strategist scheduling actions (action-only, no rationale for speed)
4. Before/after metrics comparison
Usage:
python -m training.demo.app \
--strategist-model training/models/strategist_merged/strategist-q4km.gguf \
--test-data training/data/test.jsonl
# Without trained models (shows heuristic baseline):
python -m training.demo.app --test-data training/data/test.jsonl --no-model
"""
import argparse
import json
import re
import sys
import time
from pathlib import Path
from typing import Optional
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from training.data.preprocess import (
FEATURE_NAMES, format_state, load_config,
IDX_WAIT_US, IDX_CTX_SWITCHES, IDX_EXEC_NS,
)
from training.environment.rewards import RewardComputer
CONFIG = load_config()
# ---------------------------------------------------------------------------
# Heuristic baseline policy (for comparison)
# ---------------------------------------------------------------------------
def heuristic_policy(state: list) -> float:
"""Simple rule-based policy for baseline comparison."""
wait_us = state[IDX_WAIT_US]
csw = state[IDX_CTX_SWITCHES]
if wait_us > 15:
return -0.6
elif csw > 10:
return -0.3
else:
return 0.05
# ---------------------------------------------------------------------------
# Model wrappers
# ---------------------------------------------------------------------------
class StrategistWrapper:
"""Wraps a GGUF model for use in the demo."""
def __init__(self, model_path: str):
from llama_cpp import Llama
self.llm = Llama(model_path=model_path, n_ctx=512, n_threads=4, verbose=False)
def predict(self, state: list, pid: int, cpu: int) -> tuple:
state_str = format_state(state)
prompt = (
"<|system|>You are a Linux kernel scheduling strategist. "
"Given the current system state, output a scheduling action.<|end|>\n"
f"<|user|>[STATE] {state_str}\n"
f"[PID] {pid} [CPU] {cpu}\n"
"[ACTION]<|end|>\n"
"<|assistant|>"
)
start = time.perf_counter()
output = self.llm(prompt, max_tokens=8, temperature=0.2)
latency = (time.perf_counter() - start) * 1000
text = output["choices"][0]["text"]
action_match = re.search(r"([-+]?\d*\.?\d+)", text)
action = float(action_match.group(1)) if action_match else 0.0
action = max(-1.0, min(1.0, action))
return action, latency
class WorldModelWrapper:
"""Wraps a GGUF or HF model for world model predictions."""
def __init__(self, model_path: str):
from llama_cpp import Llama
self.llm = Llama(model_path=model_path, n_ctx=512, n_threads=4, verbose=False)
def predict_next_state(self, state: list, action: float, pid: int) -> list:
state_str = format_state(state)
prompt = (
"<|system|>You are a Linux kernel simulator. "
"Predict the next system state.<|end|>\n"
f"<|user|>[STATE] {state_str}\n"
f"[ACTION] {action:.4f}\n"
f"[PID] {pid}\n"
"Predict [NEXT_STATE]<|end|>\n"
"<|assistant|>"
)
output = self.llm(prompt, max_tokens=128, temperature=0.1)
text = output["choices"][0]["text"]
# Parse predicted state
values = []
for part in text.split("|"):
part = part.strip()
if ":" in part:
try:
values.append(float(part.split(":")[1]))
except ValueError:
pass
if len(values) == len(FEATURE_NAMES):
return values
return state # fallback: return same state
# ---------------------------------------------------------------------------
# Demo functions
# ---------------------------------------------------------------------------
def run_comparison(
record: dict,
strategist: Optional[StrategistWrapper],
reward_computer: RewardComputer,
) -> dict:
"""Run baseline vs trained comparison on a single transition."""
state = record["state"]
next_state = record["next_state"]
pid = record["pid"]
cpu = record["cpu"]
# Heuristic baseline
h_action = heuristic_policy(state)
h_reward = reward_computer.compute_total(
state=state, action=h_action, prev_action=0.0,
next_state=next_state,
)
result = {
"state": format_state(state),
"actual_next_state": format_state(next_state),
"pid": pid,
"cpu": cpu,
"heuristic": {
"action": h_action,
"reward": h_reward,
},
}
# Trained strategist
if strategist:
s_action, s_latency = strategist.predict(state, pid, cpu)
s_reward = reward_computer.compute_total(
state=state, action=s_action, prev_action=0.0,
next_state=next_state,
)
result["strategist"] = {
"action": s_action,
"latency_ms": s_latency,
"reward": s_reward,
}
return result
# ---------------------------------------------------------------------------
# Gradio app
# ---------------------------------------------------------------------------
def build_gradio_app(
test_records: list,
strategist: Optional[StrategistWrapper],
world_model: Optional[WorldModelWrapper],
):
"""Build and return the Gradio interface."""
import gradio as gr
reward_computer = RewardComputer(alpha=1.0, beta=2.0, gamma=0.5)
def analyze_state(record_idx: int):
"""Run analysis on a selected test record."""
idx = int(record_idx) % len(test_records)
record = test_records[idx]
result = run_comparison(record, strategist, reward_computer)
# Format output
lines = []
lines.append(f"## State #{idx}")
lines.append(f"**PID:** {result['pid']} | **CPU:** {result['cpu']}")
lines.append(f"**Current State:** `{result['state']}`")
lines.append(f"**Actual Next State:** `{result['actual_next_state']}`")
lines.append("")
# Heuristic
h = result["heuristic"]
lines.append("### Heuristic Baseline")
lines.append(f"- **Action:** {h['action']:.4f}")
lines.append(f"- **Total Reward:** {h['reward']['total']:.4f}")
lines.append(f" - Throughput: {h['reward']['throughput']:.4f}")
lines.append(f" - Latency: {h['reward']['latency']:.4f}")
lines.append(f" - Stability: {h['reward']['stability']:.4f}")
lines.append("")
# Strategist
if "strategist" in result:
s = result["strategist"]
lines.append("### Trained Strategist")
lines.append(f"- **Action:** {s['action']:.4f}")
lines.append(f"- **Inference Latency:** {s['latency_ms']:.1f}ms")
lines.append(f"- **Total Reward:** {s['reward']['total']:.4f}")
lines.append(f" - Throughput: {s['reward']['throughput']:.4f}")
lines.append(f" - Latency: {s['reward']['latency']:.4f}")
lines.append(f" - Stability: {s['reward']['stability']:.4f}")
lines.append("")
# Comparison
delta = s["reward"]["total"] - h["reward"]["total"]
direction = "better" if delta > 0 else "worse"
lines.append(f"### Comparison")
lines.append(f"Strategist is **{delta:+.4f}** reward ({direction} than heuristic)")
else:
lines.append("*No trained model loaded. Run with --strategist-model to compare.*")
return "\n".join(lines)
def batch_comparison():
"""Run comparison across all test records and report aggregate stats."""
h_rewards = []
s_rewards = []
s_latencies = []
n = min(50, len(test_records)) # cap for speed
for i in range(n):
result = run_comparison(test_records[i], strategist, reward_computer)
h_rewards.append(result["heuristic"]["reward"]["total"])
if "strategist" in result:
s_rewards.append(result["strategist"]["reward"]["total"])
s_latencies.append(result["strategist"]["latency_ms"])
s_mean = f"{np.mean(s_rewards):.4f}" if s_rewards else "N/A"
s_std = f"{np.std(s_rewards):.4f}" if s_rewards else "N/A"
lines = [f"## Batch Comparison ({n} samples)\n"]
lines.append("| Metric | Heuristic | Strategist |")
lines.append("|--------|-----------|------------|")
lines.append(f"| Mean Reward | {np.mean(h_rewards):.4f} | {s_mean} |")
lines.append(f"| Std Reward | {np.std(h_rewards):.4f} | {s_std} |")
if s_latencies:
lines.append(f"| Mean Latency | N/A | {np.mean(s_latencies):.1f}ms |")
lines.append(f"| P95 Latency | N/A | {np.percentile(s_latencies, 95):.1f}ms |")
if s_rewards:
win_rate = sum(1 for s, h in zip(s_rewards, h_rewards) if s > h) / len(s_rewards)
lines.append(f"| Win Rate | — | {win_rate*100:.1f}% |")
return "\n".join(lines)
# Build Gradio UI
with gr.Blocks(title="KernelX Intelligence Layer") as app:
gr.Markdown("# KernelX Intelligence Layer Demo")
gr.Markdown("Compare heuristic baseline vs trained Strategist on real kernel states.")
with gr.Row():
record_slider = gr.Slider(
minimum=0, maximum=len(test_records) - 1,
step=1, value=0, label="Test Record Index"
)
analyze_btn = gr.Button("Analyze", variant="primary")
output_md = gr.Markdown()
analyze_btn.click(fn=analyze_state, inputs=[record_slider], outputs=[output_md])
gr.Markdown("---")
batch_btn = gr.Button("Run Batch Comparison (50 samples)")
batch_output = gr.Markdown()
batch_btn.click(fn=batch_comparison, outputs=[batch_output])
return app
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="KernelX Gradio Demo")
parser.add_argument("--test-data", required=True, help="Path to test.jsonl")
parser.add_argument("--strategist-model", default=None, help="GGUF Strategist model")
parser.add_argument("--world-model", default=None, help="GGUF World Model")
parser.add_argument("--no-model", action="store_true", help="Run without trained models")
parser.add_argument("--port", type=int, default=7860)
parser.add_argument("--share", action="store_true", help="Create public Gradio link")
args = parser.parse_args()
# Load test data
records = [json.loads(l) for l in open(args.test_data) if l.strip()]
print(f"Loaded {len(records)} test records")
# Load models
strategist = None
world_model = None
if not args.no_model:
if args.strategist_model:
print(f"Loading Strategist: {args.strategist_model}")
strategist = StrategistWrapper(args.strategist_model)
if args.world_model:
print(f"Loading World Model: {args.world_model}")
world_model = WorldModelWrapper(args.world_model)
app = build_gradio_app(records, strategist, world_model)
app.launch(server_port=args.port, share=args.share)
if __name__ == "__main__":
main()