| """Models for T8 v3 — privileged future-pressure conditioning. |
| |
| Wraps the existing TransformerForecast (DAF) to accept future pressure as |
| side-channel context. The future pressure trajectory is encoded into T_fut |
| tokens that get appended to the past memory; future queries cross-attend |
| over the union (past sensors + future pressure). This is privileged |
| information (oracle) — at test time we'd not have future pressure — so |
| this is a hypothesis-test setup, not a deployable forecaster. |
| """ |
| from __future__ import annotations |
| from typing import Dict |
|
|
| import torch |
| import torch.nn as nn |
|
|
|
|
| class _PerModalityProj(nn.Module): |
| def __init__(self, modality_dims, d_model): |
| super().__init__() |
| self.proj = nn.ModuleDict({ |
| m: nn.Linear(d, d_model) for m, d in modality_dims.items() |
| }) |
| self.mod_emb = nn.Parameter(torch.zeros(len(modality_dims), d_model)) |
| nn.init.trunc_normal_(self.mod_emb, std=0.02) |
| self.mods = list(modality_dims.keys()) |
|
|
| def forward(self, x): |
| out = None |
| for i, m in enumerate(self.mods): |
| h = self.proj[m](x[m]) + self.mod_emb[i] |
| out = h if out is None else out + h |
| return out / len(self.mods) |
|
|
|
|
| class DAFFuturePressure(nn.Module): |
| """DAF backbone + future-pressure conditioning.""" |
|
|
| def __init__(self, modality_dims: Dict[str, int], target_dim: int, |
| t_obs: int, t_fut: int, future_pressure_dim: int = 50, |
| d_model: int = 128, n_heads: int = 4, n_layers: int = 2, |
| dropout: float = 0.1): |
| super().__init__() |
| self.t_obs = t_obs |
| self.t_fut = t_fut |
| self.embed = _PerModalityProj(modality_dims, d_model) |
| self.pos = nn.Parameter(torch.zeros(1, t_obs, d_model)) |
| nn.init.trunc_normal_(self.pos, std=0.02) |
| layer = nn.TransformerEncoderLayer( |
| d_model=d_model, nhead=n_heads, dim_feedforward=4 * d_model, |
| dropout=dropout, batch_first=True, activation="gelu", |
| ) |
| self.encoder = nn.TransformerEncoder(layer, num_layers=n_layers) |
| |
| self.fp_proj = nn.Linear(future_pressure_dim, d_model) |
| self.fp_pos = nn.Parameter(torch.zeros(1, t_fut, d_model)) |
| nn.init.trunc_normal_(self.fp_pos, std=0.02) |
| self.fp_seg = nn.Parameter(torch.zeros(1, 1, d_model)) |
| nn.init.trunc_normal_(self.fp_seg, std=0.02) |
| |
| self.queries = nn.Parameter(torch.zeros(1, t_fut, d_model)) |
| nn.init.trunc_normal_(self.queries, std=0.02) |
| self.cross_attn = nn.MultiheadAttention( |
| d_model, n_heads, dropout=dropout, batch_first=True |
| ) |
| self.norm = nn.LayerNorm(d_model) |
| self.head = nn.Linear(d_model, target_dim) |
|
|
| def forward(self, x: Dict[str, torch.Tensor], |
| future_pressure: torch.Tensor) -> torch.Tensor: |
| h_past = self.encoder(self.embed(x) + self.pos) |
| h_fp = self.fp_proj(future_pressure) + self.fp_pos + self.fp_seg |
| memory = torch.cat([h_past, h_fp], dim=1) |
| q = self.queries.expand(memory.size(0), -1, -1) |
| out, _ = self.cross_attn(q, memory, memory, need_weights=False) |
| out = self.norm(out) |
| return self.head(out) |
|
|