PULSE-code / experiments /nets /models_forecast_priv.py
velvet-pine-22's picture
Upload folder using huggingface_hub
b4b2877 verified
"""Models for T8 v3 — privileged future-pressure conditioning.
Wraps the existing TransformerForecast (DAF) to accept future pressure as
side-channel context. The future pressure trajectory is encoded into T_fut
tokens that get appended to the past memory; future queries cross-attend
over the union (past sensors + future pressure). This is privileged
information (oracle) — at test time we'd not have future pressure — so
this is a hypothesis-test setup, not a deployable forecaster.
"""
from __future__ import annotations
from typing import Dict
import torch
import torch.nn as nn
class _PerModalityProj(nn.Module):
def __init__(self, modality_dims, d_model):
super().__init__()
self.proj = nn.ModuleDict({
m: nn.Linear(d, d_model) for m, d in modality_dims.items()
})
self.mod_emb = nn.Parameter(torch.zeros(len(modality_dims), d_model))
nn.init.trunc_normal_(self.mod_emb, std=0.02)
self.mods = list(modality_dims.keys())
def forward(self, x):
out = None
for i, m in enumerate(self.mods):
h = self.proj[m](x[m]) + self.mod_emb[i]
out = h if out is None else out + h
return out / len(self.mods)
class DAFFuturePressure(nn.Module):
"""DAF backbone + future-pressure conditioning."""
def __init__(self, modality_dims: Dict[str, int], target_dim: int,
t_obs: int, t_fut: int, future_pressure_dim: int = 50,
d_model: int = 128, n_heads: int = 4, n_layers: int = 2,
dropout: float = 0.1):
super().__init__()
self.t_obs = t_obs
self.t_fut = t_fut
self.embed = _PerModalityProj(modality_dims, d_model)
self.pos = nn.Parameter(torch.zeros(1, t_obs, d_model))
nn.init.trunc_normal_(self.pos, std=0.02)
layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=4 * d_model,
dropout=dropout, batch_first=True, activation="gelu",
)
self.encoder = nn.TransformerEncoder(layer, num_layers=n_layers)
# future-pressure encoder
self.fp_proj = nn.Linear(future_pressure_dim, d_model)
self.fp_pos = nn.Parameter(torch.zeros(1, t_fut, d_model))
nn.init.trunc_normal_(self.fp_pos, std=0.02)
self.fp_seg = nn.Parameter(torch.zeros(1, 1, d_model)) # segment id
nn.init.trunc_normal_(self.fp_seg, std=0.02)
# decoder side
self.queries = nn.Parameter(torch.zeros(1, t_fut, d_model))
nn.init.trunc_normal_(self.queries, std=0.02)
self.cross_attn = nn.MultiheadAttention(
d_model, n_heads, dropout=dropout, batch_first=True
)
self.norm = nn.LayerNorm(d_model)
self.head = nn.Linear(d_model, target_dim)
def forward(self, x: Dict[str, torch.Tensor],
future_pressure: torch.Tensor) -> torch.Tensor:
h_past = self.encoder(self.embed(x) + self.pos) # (B, T_obs, D)
h_fp = self.fp_proj(future_pressure) + self.fp_pos + self.fp_seg
memory = torch.cat([h_past, h_fp], dim=1) # (B, T_obs+T_fut, D)
q = self.queries.expand(memory.size(0), -1, -1) # (B, T_fut, D)
out, _ = self.cross_attn(q, memory, memory, need_weights=False)
out = self.norm(out)
return self.head(out) # (B, T_fut, target_dim)