| """ |
| KernelX Intelligence Layer — Gradio Demo (Stage 7) |
| |
| Judge-facing demo that shows: |
| 1. Baseline SmolLM2-360M output (untrained) on kernel states |
| 2. World Model predictions vs actual next states |
| 3. Strategist scheduling actions (action-only, no rationale for speed) |
| 4. Before/after metrics comparison |
| |
| Usage: |
| python -m training.demo.app \ |
| --strategist-model training/models/strategist_merged/strategist-q4km.gguf \ |
| --test-data training/data/test.jsonl |
| |
| # Without trained models (shows heuristic baseline): |
| python -m training.demo.app --test-data training/data/test.jsonl --no-model |
| """ |
|
|
| import argparse |
| import json |
| import re |
| import sys |
| import time |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) |
| from training.data.preprocess import ( |
| FEATURE_NAMES, format_state, load_config, |
| IDX_WAIT_US, IDX_CTX_SWITCHES, IDX_EXEC_NS, |
| ) |
| from training.environment.rewards import RewardComputer |
|
|
| CONFIG = load_config() |
|
|
| |
| |
| |
|
|
| def heuristic_policy(state: list) -> float: |
| """Simple rule-based policy for baseline comparison.""" |
| wait_us = state[IDX_WAIT_US] |
| csw = state[IDX_CTX_SWITCHES] |
|
|
| if wait_us > 15: |
| return -0.6 |
| elif csw > 10: |
| return -0.3 |
| else: |
| return 0.05 |
|
|
| |
| |
| |
|
|
| class StrategistWrapper: |
| """Wraps a GGUF model for use in the demo.""" |
|
|
| def __init__(self, model_path: str): |
| from llama_cpp import Llama |
| self.llm = Llama(model_path=model_path, n_ctx=512, n_threads=4, verbose=False) |
|
|
| def predict(self, state: list, pid: int, cpu: int) -> tuple: |
| state_str = format_state(state) |
| prompt = ( |
| "<|system|>You are a Linux kernel scheduling strategist. " |
| "Given the current system state, output a scheduling action.<|end|>\n" |
| f"<|user|>[STATE] {state_str}\n" |
| f"[PID] {pid} [CPU] {cpu}\n" |
| "[ACTION]<|end|>\n" |
| "<|assistant|>" |
| ) |
|
|
| start = time.perf_counter() |
| output = self.llm(prompt, max_tokens=8, temperature=0.2) |
| latency = (time.perf_counter() - start) * 1000 |
|
|
| text = output["choices"][0]["text"] |
| action_match = re.search(r"([-+]?\d*\.?\d+)", text) |
|
|
| action = float(action_match.group(1)) if action_match else 0.0 |
| action = max(-1.0, min(1.0, action)) |
|
|
| return action, latency |
|
|
|
|
| class WorldModelWrapper: |
| """Wraps a GGUF or HF model for world model predictions.""" |
|
|
| def __init__(self, model_path: str): |
| from llama_cpp import Llama |
| self.llm = Llama(model_path=model_path, n_ctx=512, n_threads=4, verbose=False) |
|
|
| def predict_next_state(self, state: list, action: float, pid: int) -> list: |
| state_str = format_state(state) |
| prompt = ( |
| "<|system|>You are a Linux kernel simulator. " |
| "Predict the next system state.<|end|>\n" |
| f"<|user|>[STATE] {state_str}\n" |
| f"[ACTION] {action:.4f}\n" |
| f"[PID] {pid}\n" |
| "Predict [NEXT_STATE]<|end|>\n" |
| "<|assistant|>" |
| ) |
|
|
| output = self.llm(prompt, max_tokens=128, temperature=0.1) |
| text = output["choices"][0]["text"] |
|
|
| |
| values = [] |
| for part in text.split("|"): |
| part = part.strip() |
| if ":" in part: |
| try: |
| values.append(float(part.split(":")[1])) |
| except ValueError: |
| pass |
|
|
| if len(values) == len(FEATURE_NAMES): |
| return values |
| return state |
|
|
| |
| |
| |
|
|
| def run_comparison( |
| record: dict, |
| strategist: Optional[StrategistWrapper], |
| reward_computer: RewardComputer, |
| ) -> dict: |
| """Run baseline vs trained comparison on a single transition.""" |
| state = record["state"] |
| next_state = record["next_state"] |
| pid = record["pid"] |
| cpu = record["cpu"] |
|
|
| |
| h_action = heuristic_policy(state) |
| h_reward = reward_computer.compute_total( |
| state=state, action=h_action, prev_action=0.0, |
| next_state=next_state, |
| ) |
|
|
| result = { |
| "state": format_state(state), |
| "actual_next_state": format_state(next_state), |
| "pid": pid, |
| "cpu": cpu, |
| "heuristic": { |
| "action": h_action, |
| "reward": h_reward, |
| }, |
| } |
|
|
| |
| if strategist: |
| s_action, s_latency = strategist.predict(state, pid, cpu) |
| s_reward = reward_computer.compute_total( |
| state=state, action=s_action, prev_action=0.0, |
| next_state=next_state, |
| ) |
| result["strategist"] = { |
| "action": s_action, |
| "latency_ms": s_latency, |
| "reward": s_reward, |
| } |
|
|
| return result |
|
|
| |
| |
| |
|
|
| def build_gradio_app( |
| test_records: list, |
| strategist: Optional[StrategistWrapper], |
| world_model: Optional[WorldModelWrapper], |
| ): |
| """Build and return the Gradio interface.""" |
| import gradio as gr |
|
|
| reward_computer = RewardComputer(alpha=1.0, beta=2.0, gamma=0.5) |
|
|
| def analyze_state(record_idx: int): |
| """Run analysis on a selected test record.""" |
| idx = int(record_idx) % len(test_records) |
| record = test_records[idx] |
| result = run_comparison(record, strategist, reward_computer) |
|
|
| |
| lines = [] |
| lines.append(f"## State #{idx}") |
| lines.append(f"**PID:** {result['pid']} | **CPU:** {result['cpu']}") |
| lines.append(f"**Current State:** `{result['state']}`") |
| lines.append(f"**Actual Next State:** `{result['actual_next_state']}`") |
| lines.append("") |
|
|
| |
| h = result["heuristic"] |
| lines.append("### Heuristic Baseline") |
| lines.append(f"- **Action:** {h['action']:.4f}") |
| lines.append(f"- **Total Reward:** {h['reward']['total']:.4f}") |
| lines.append(f" - Throughput: {h['reward']['throughput']:.4f}") |
| lines.append(f" - Latency: {h['reward']['latency']:.4f}") |
| lines.append(f" - Stability: {h['reward']['stability']:.4f}") |
| lines.append("") |
|
|
| |
| if "strategist" in result: |
| s = result["strategist"] |
| lines.append("### Trained Strategist") |
| lines.append(f"- **Action:** {s['action']:.4f}") |
| lines.append(f"- **Inference Latency:** {s['latency_ms']:.1f}ms") |
| lines.append(f"- **Total Reward:** {s['reward']['total']:.4f}") |
| lines.append(f" - Throughput: {s['reward']['throughput']:.4f}") |
| lines.append(f" - Latency: {s['reward']['latency']:.4f}") |
| lines.append(f" - Stability: {s['reward']['stability']:.4f}") |
| lines.append("") |
|
|
| |
| delta = s["reward"]["total"] - h["reward"]["total"] |
| direction = "better" if delta > 0 else "worse" |
| lines.append(f"### Comparison") |
| lines.append(f"Strategist is **{delta:+.4f}** reward ({direction} than heuristic)") |
| else: |
| lines.append("*No trained model loaded. Run with --strategist-model to compare.*") |
|
|
| return "\n".join(lines) |
|
|
| def batch_comparison(): |
| """Run comparison across all test records and report aggregate stats.""" |
| h_rewards = [] |
| s_rewards = [] |
| s_latencies = [] |
|
|
| n = min(50, len(test_records)) |
| for i in range(n): |
| result = run_comparison(test_records[i], strategist, reward_computer) |
| h_rewards.append(result["heuristic"]["reward"]["total"]) |
| if "strategist" in result: |
| s_rewards.append(result["strategist"]["reward"]["total"]) |
| s_latencies.append(result["strategist"]["latency_ms"]) |
|
|
| s_mean = f"{np.mean(s_rewards):.4f}" if s_rewards else "N/A" |
| s_std = f"{np.std(s_rewards):.4f}" if s_rewards else "N/A" |
|
|
| lines = [f"## Batch Comparison ({n} samples)\n"] |
| lines.append("| Metric | Heuristic | Strategist |") |
| lines.append("|--------|-----------|------------|") |
| lines.append(f"| Mean Reward | {np.mean(h_rewards):.4f} | {s_mean} |") |
| lines.append(f"| Std Reward | {np.std(h_rewards):.4f} | {s_std} |") |
| if s_latencies: |
| lines.append(f"| Mean Latency | N/A | {np.mean(s_latencies):.1f}ms |") |
| lines.append(f"| P95 Latency | N/A | {np.percentile(s_latencies, 95):.1f}ms |") |
| if s_rewards: |
| win_rate = sum(1 for s, h in zip(s_rewards, h_rewards) if s > h) / len(s_rewards) |
| lines.append(f"| Win Rate | — | {win_rate*100:.1f}% |") |
|
|
| return "\n".join(lines) |
|
|
| |
| with gr.Blocks(title="KernelX Intelligence Layer") as app: |
| gr.Markdown("# KernelX Intelligence Layer Demo") |
| gr.Markdown("Compare heuristic baseline vs trained Strategist on real kernel states.") |
|
|
| with gr.Row(): |
| record_slider = gr.Slider( |
| minimum=0, maximum=len(test_records) - 1, |
| step=1, value=0, label="Test Record Index" |
| ) |
| analyze_btn = gr.Button("Analyze", variant="primary") |
|
|
| output_md = gr.Markdown() |
| analyze_btn.click(fn=analyze_state, inputs=[record_slider], outputs=[output_md]) |
|
|
| gr.Markdown("---") |
| batch_btn = gr.Button("Run Batch Comparison (50 samples)") |
| batch_output = gr.Markdown() |
| batch_btn.click(fn=batch_comparison, outputs=[batch_output]) |
|
|
| return app |
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="KernelX Gradio Demo") |
| parser.add_argument("--test-data", required=True, help="Path to test.jsonl") |
| parser.add_argument("--strategist-model", default=None, help="GGUF Strategist model") |
| parser.add_argument("--world-model", default=None, help="GGUF World Model") |
| parser.add_argument("--no-model", action="store_true", help="Run without trained models") |
| parser.add_argument("--port", type=int, default=7860) |
| parser.add_argument("--share", action="store_true", help="Create public Gradio link") |
| args = parser.parse_args() |
|
|
| |
| records = [json.loads(l) for l in open(args.test_data) if l.strip()] |
| print(f"Loaded {len(records)} test records") |
|
|
| |
| strategist = None |
| world_model = None |
|
|
| if not args.no_model: |
| if args.strategist_model: |
| print(f"Loading Strategist: {args.strategist_model}") |
| strategist = StrategistWrapper(args.strategist_model) |
| if args.world_model: |
| print(f"Loading World Model: {args.world_model}") |
| world_model = WorldModelWrapper(args.world_model) |
|
|
| app = build_gradio_app(records, strategist, world_model) |
| app.launch(server_port=args.port, share=args.share) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|