"""Models for T8 v3 — privileged future-pressure conditioning. Wraps the existing TransformerForecast (DAF) to accept future pressure as side-channel context. The future pressure trajectory is encoded into T_fut tokens that get appended to the past memory; future queries cross-attend over the union (past sensors + future pressure). This is privileged information (oracle) — at test time we'd not have future pressure — so this is a hypothesis-test setup, not a deployable forecaster. """ from __future__ import annotations from typing import Dict import torch import torch.nn as nn class _PerModalityProj(nn.Module): def __init__(self, modality_dims, d_model): super().__init__() self.proj = nn.ModuleDict({ m: nn.Linear(d, d_model) for m, d in modality_dims.items() }) self.mod_emb = nn.Parameter(torch.zeros(len(modality_dims), d_model)) nn.init.trunc_normal_(self.mod_emb, std=0.02) self.mods = list(modality_dims.keys()) def forward(self, x): out = None for i, m in enumerate(self.mods): h = self.proj[m](x[m]) + self.mod_emb[i] out = h if out is None else out + h return out / len(self.mods) class DAFFuturePressure(nn.Module): """DAF backbone + future-pressure conditioning.""" def __init__(self, modality_dims: Dict[str, int], target_dim: int, t_obs: int, t_fut: int, future_pressure_dim: int = 50, d_model: int = 128, n_heads: int = 4, n_layers: int = 2, dropout: float = 0.1): super().__init__() self.t_obs = t_obs self.t_fut = t_fut self.embed = _PerModalityProj(modality_dims, d_model) self.pos = nn.Parameter(torch.zeros(1, t_obs, d_model)) nn.init.trunc_normal_(self.pos, std=0.02) layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=n_heads, dim_feedforward=4 * d_model, dropout=dropout, batch_first=True, activation="gelu", ) self.encoder = nn.TransformerEncoder(layer, num_layers=n_layers) # future-pressure encoder self.fp_proj = nn.Linear(future_pressure_dim, d_model) self.fp_pos = nn.Parameter(torch.zeros(1, t_fut, d_model)) nn.init.trunc_normal_(self.fp_pos, std=0.02) self.fp_seg = nn.Parameter(torch.zeros(1, 1, d_model)) # segment id nn.init.trunc_normal_(self.fp_seg, std=0.02) # decoder side self.queries = nn.Parameter(torch.zeros(1, t_fut, d_model)) nn.init.trunc_normal_(self.queries, std=0.02) self.cross_attn = nn.MultiheadAttention( d_model, n_heads, dropout=dropout, batch_first=True ) self.norm = nn.LayerNorm(d_model) self.head = nn.Linear(d_model, target_dim) def forward(self, x: Dict[str, torch.Tensor], future_pressure: torch.Tensor) -> torch.Tensor: h_past = self.encoder(self.embed(x) + self.pos) # (B, T_obs, D) h_fp = self.fp_proj(future_pressure) + self.fp_pos + self.fp_seg memory = torch.cat([h_past, h_fp], dim=1) # (B, T_obs+T_fut, D) q = self.queries.expand(memory.size(0), -1, -1) # (B, T_fut, D) out, _ = self.cross_attn(q, memory, memory, need_weights=False) out = self.norm(out) return self.head(out) # (B, T_fut, target_dim)