from __future__ import annotations import json from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Sequence, Tuple import torch from torch import nn LABELS = ["progress", "relief", "stall", "frustration", "damage"] LOOKBACK = 4 HIDDEN_SIZE = 48 EPOCHS = 180 LEARNING_RATE = 0.01 WEIGHT_DECAY = 1e-4 MIN_TRAIN = 24 MIN_TEST = 8 MIN_IMPROVEMENT = 0.08 @dataclass class Row: id: str session_key: str recorded_at: int target_label: str features: List[float] class SequenceObserver(nn.Module): def __init__(self, feature_dim: int, hidden_size: int, label_count: int): super().__init__() self.proj = nn.Linear(feature_dim, hidden_size) self.rnn = nn.GRU(hidden_size, hidden_size, batch_first=True) self.head = nn.Sequential( nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, label_count), ) def forward(self, batch: torch.Tensor) -> torch.Tensor: projected = torch.relu(self.proj(batch)) _, hidden = self.rnn(projected) return self.head(hidden[-1]) def load_rows(dataset_path: Path) -> List[Row]: payload = json.loads(dataset_path.read_text()) rows = [] for row in payload.get("rows", []): label = row.get("targetLabel") if label not in LABELS: continue rows.append( Row( id=str(row["id"]), session_key=str(row["sessionKey"]), recorded_at=int(row["recordedAt"]), target_label=label, features=[float(value) for value in row["features"]], ) ) return rows def build_windows(rows: Sequence[Row], lookback: int) -> List[Tuple[List[List[float]], str]]: grouped: Dict[str, List[Row]] = defaultdict(list) for row in rows: grouped[row.session_key].append(row) windows: List[Tuple[List[List[float]], str]] = [] for session_rows in grouped.values(): session_rows.sort(key=lambda row: row.recorded_at) for index, row in enumerate(session_rows): history = session_rows[max(0, index - lookback + 1) : index + 1] windows.append(([item.features for item in history], row.target_label)) return windows def split_windows( windows: Sequence[Tuple[List[List[float]], str]], ) -> Tuple[List[Tuple[List[List[float]], str]], List[Tuple[List[List[float]], str]]]: if len(windows) < (MIN_TRAIN + MIN_TEST): return list(windows), [] cutoff = max(MIN_TRAIN, int(len(windows) * 0.7)) cutoff = min(cutoff, len(windows) - MIN_TEST) return list(windows[:cutoff]), list(windows[cutoff:]) def pad_batch(batch: Sequence[Tuple[List[List[float]], str]]) -> Tuple[torch.Tensor, torch.Tensor]: label_map = {label: index for index, label in enumerate(LABELS)} feature_dim = len(batch[0][0][0]) max_len = max(len(sequence) for sequence, _ in batch) xs = torch.zeros((len(batch), max_len, feature_dim), dtype=torch.float32) ys = torch.zeros((len(batch),), dtype=torch.long) for row_index, (sequence, label) in enumerate(batch): start = max_len - len(sequence) xs[row_index, start:, :] = torch.tensor(sequence, dtype=torch.float32) ys[row_index] = label_map[label] return xs, ys def majority_baseline(test_rows: Sequence[Tuple[List[List[float]], str]]) -> float: if not test_rows: return 0.0 counter = Counter(label for _, label in test_rows) return max(counter.values()) / len(test_rows) def train_and_eval( train_rows: Sequence[Tuple[List[List[float]], str]], test_rows: Sequence[Tuple[List[List[float]], str]], ) -> Dict[str, object]: if not train_rows or not test_rows: return { "status": "insufficient_data", "accuracy": 0.0, "baseline": 0.0, "improvement": 0.0, "evaluated": 0, "failureReasons": ["need enough train and test sequence windows"], } feature_dim = len(train_rows[0][0][0]) train_x, train_y = pad_batch(train_rows) test_x, test_y = pad_batch(test_rows) model = SequenceObserver(feature_dim, HIDDEN_SIZE, len(LABELS)) optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY) loss_fn = nn.CrossEntropyLoss() model.train() for _ in range(EPOCHS): optimizer.zero_grad(set_to_none=True) logits = model(train_x) loss = loss_fn(logits, train_y) loss.backward() optimizer.step() model.eval() with torch.no_grad(): logits = model(test_x) predictions = torch.argmax(logits, dim=1) accuracy = float((predictions == test_y).float().mean().item()) baseline = majority_baseline(test_rows) improvement = accuracy - baseline failure_reasons: List[str] = [] if improvement < MIN_IMPROVEMENT: failure_reasons.append( f"improvement {improvement:.4f} < {MIN_IMPROVEMENT:.4f}" ) return { "status": "pass" if not failure_reasons else "fail", "accuracy": round(accuracy, 4), "baseline": round(baseline, 4), "improvement": round(improvement, 4), "evaluated": len(test_rows), "failureReasons": failure_reasons, } def main() -> None: workspace_root = Path.cwd() dataset_path = ( workspace_root / ".openskynet" / "skynet-experiments" / "agent_openskynet_main-runtime-observer-dataset-01.json" ) out_path = ( workspace_root / ".openskynet" / "skynet-experiments" / "agent_openskynet_main-runtime-observer-torch-01.json" ) rows = load_rows(dataset_path) windows = build_windows(rows, LOOKBACK) train_rows, test_rows = split_windows(windows) result = train_and_eval(train_rows, test_rows) result.update( { "projectName": "Skynet", "updatedAt": int(torch.tensor(0).new_empty(()).fill_(0).item() + __import__("time").time() * 1000), "rows": len(rows), "sequenceWindows": len(windows), "trainWindows": len(train_rows), "testWindows": len(test_rows), "lookback": LOOKBACK, "featureDimensions": len(rows[0].features) if rows else 0, "labelCoverage": dict(Counter(row.target_label for row in rows)), "datasetPath": str(dataset_path), } ) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, indent=2) + "\n") print("--- Skynet Experiment: Runtime Observer Torch 01 ---") print(f"Status: {result['status']}") print(f"Rows: {result['rows']}") print(f"Train windows: {result['trainWindows']}") print(f"Test windows: {result['testWindows']}") print(f"Accuracy: {result['accuracy']:.4f}") print(f"Baseline: {result['baseline']:.4f}") print(f"Improvement: {result['improvement']:.4f}") if __name__ == "__main__": main()