openskynet / src /skynet /experiments /runtime_observer_torch_01.py
Darochin's picture
Mirror OpenSkyNet workspace snapshot from Git HEAD
fc93158 verified
from __future__ import annotations
import json
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Sequence, Tuple
import torch
from torch import nn
LABELS = ["progress", "relief", "stall", "frustration", "damage"]
LOOKBACK = 4
HIDDEN_SIZE = 48
EPOCHS = 180
LEARNING_RATE = 0.01
WEIGHT_DECAY = 1e-4
MIN_TRAIN = 24
MIN_TEST = 8
MIN_IMPROVEMENT = 0.08
@dataclass
class Row:
id: str
session_key: str
recorded_at: int
target_label: str
features: List[float]
class SequenceObserver(nn.Module):
def __init__(self, feature_dim: int, hidden_size: int, label_count: int):
super().__init__()
self.proj = nn.Linear(feature_dim, hidden_size)
self.rnn = nn.GRU(hidden_size, hidden_size, batch_first=True)
self.head = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, label_count),
)
def forward(self, batch: torch.Tensor) -> torch.Tensor:
projected = torch.relu(self.proj(batch))
_, hidden = self.rnn(projected)
return self.head(hidden[-1])
def load_rows(dataset_path: Path) -> List[Row]:
payload = json.loads(dataset_path.read_text())
rows = []
for row in payload.get("rows", []):
label = row.get("targetLabel")
if label not in LABELS:
continue
rows.append(
Row(
id=str(row["id"]),
session_key=str(row["sessionKey"]),
recorded_at=int(row["recordedAt"]),
target_label=label,
features=[float(value) for value in row["features"]],
)
)
return rows
def build_windows(rows: Sequence[Row], lookback: int) -> List[Tuple[List[List[float]], str]]:
grouped: Dict[str, List[Row]] = defaultdict(list)
for row in rows:
grouped[row.session_key].append(row)
windows: List[Tuple[List[List[float]], str]] = []
for session_rows in grouped.values():
session_rows.sort(key=lambda row: row.recorded_at)
for index, row in enumerate(session_rows):
history = session_rows[max(0, index - lookback + 1) : index + 1]
windows.append(([item.features for item in history], row.target_label))
return windows
def split_windows(
windows: Sequence[Tuple[List[List[float]], str]],
) -> Tuple[List[Tuple[List[List[float]], str]], List[Tuple[List[List[float]], str]]]:
if len(windows) < (MIN_TRAIN + MIN_TEST):
return list(windows), []
cutoff = max(MIN_TRAIN, int(len(windows) * 0.7))
cutoff = min(cutoff, len(windows) - MIN_TEST)
return list(windows[:cutoff]), list(windows[cutoff:])
def pad_batch(batch: Sequence[Tuple[List[List[float]], str]]) -> Tuple[torch.Tensor, torch.Tensor]:
label_map = {label: index for index, label in enumerate(LABELS)}
feature_dim = len(batch[0][0][0])
max_len = max(len(sequence) for sequence, _ in batch)
xs = torch.zeros((len(batch), max_len, feature_dim), dtype=torch.float32)
ys = torch.zeros((len(batch),), dtype=torch.long)
for row_index, (sequence, label) in enumerate(batch):
start = max_len - len(sequence)
xs[row_index, start:, :] = torch.tensor(sequence, dtype=torch.float32)
ys[row_index] = label_map[label]
return xs, ys
def majority_baseline(test_rows: Sequence[Tuple[List[List[float]], str]]) -> float:
if not test_rows:
return 0.0
counter = Counter(label for _, label in test_rows)
return max(counter.values()) / len(test_rows)
def train_and_eval(
train_rows: Sequence[Tuple[List[List[float]], str]],
test_rows: Sequence[Tuple[List[List[float]], str]],
) -> Dict[str, object]:
if not train_rows or not test_rows:
return {
"status": "insufficient_data",
"accuracy": 0.0,
"baseline": 0.0,
"improvement": 0.0,
"evaluated": 0,
"failureReasons": ["need enough train and test sequence windows"],
}
feature_dim = len(train_rows[0][0][0])
train_x, train_y = pad_batch(train_rows)
test_x, test_y = pad_batch(test_rows)
model = SequenceObserver(feature_dim, HIDDEN_SIZE, len(LABELS))
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
loss_fn = nn.CrossEntropyLoss()
model.train()
for _ in range(EPOCHS):
optimizer.zero_grad(set_to_none=True)
logits = model(train_x)
loss = loss_fn(logits, train_y)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
logits = model(test_x)
predictions = torch.argmax(logits, dim=1)
accuracy = float((predictions == test_y).float().mean().item())
baseline = majority_baseline(test_rows)
improvement = accuracy - baseline
failure_reasons: List[str] = []
if improvement < MIN_IMPROVEMENT:
failure_reasons.append(
f"improvement {improvement:.4f} < {MIN_IMPROVEMENT:.4f}"
)
return {
"status": "pass" if not failure_reasons else "fail",
"accuracy": round(accuracy, 4),
"baseline": round(baseline, 4),
"improvement": round(improvement, 4),
"evaluated": len(test_rows),
"failureReasons": failure_reasons,
}
def main() -> None:
workspace_root = Path.cwd()
dataset_path = (
workspace_root
/ ".openskynet"
/ "skynet-experiments"
/ "agent_openskynet_main-runtime-observer-dataset-01.json"
)
out_path = (
workspace_root
/ ".openskynet"
/ "skynet-experiments"
/ "agent_openskynet_main-runtime-observer-torch-01.json"
)
rows = load_rows(dataset_path)
windows = build_windows(rows, LOOKBACK)
train_rows, test_rows = split_windows(windows)
result = train_and_eval(train_rows, test_rows)
result.update(
{
"projectName": "Skynet",
"updatedAt": int(torch.tensor(0).new_empty(()).fill_(0).item() + __import__("time").time() * 1000),
"rows": len(rows),
"sequenceWindows": len(windows),
"trainWindows": len(train_rows),
"testWindows": len(test_rows),
"lookback": LOOKBACK,
"featureDimensions": len(rows[0].features) if rows else 0,
"labelCoverage": dict(Counter(row.target_label for row in rows)),
"datasetPath": str(dataset_path),
}
)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, indent=2) + "\n")
print("--- Skynet Experiment: Runtime Observer Torch 01 ---")
print(f"Status: {result['status']}")
print(f"Rows: {result['rows']}")
print(f"Train windows: {result['trainWindows']}")
print(f"Test windows: {result['testWindows']}")
print(f"Accuracy: {result['accuracy']:.4f}")
print(f"Baseline: {result['baseline']:.4f}")
print(f"Improvement: {result['improvement']:.4f}")
if __name__ == "__main__":
main()