| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
|
|
| import copy |
| import math |
| import os |
| from dataclasses import dataclass |
| from typing import Any |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch import Tensor |
| from transformers.modeling_outputs import ModelOutput |
| from transformers.modeling_utils import PreTrainedModel |
| from transformers.utils import logging |
|
|
| from .configuration_eo1_internvl import EO1InternVLPiFlowMatchingConfig |
|
|
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| def create_sinusoidal_pos_embedding( |
| time: torch.tensor, |
| dimension: int, |
| min_period: float = 4e-3, |
| max_period: float = 4.0, |
| device: str | torch.device = "cpu", |
| ) -> Tensor: |
| """Sine-cosine embedding for scalar time in [0,1]. Matches openpi `posemb_sincos` sensitivity.""" |
| if dimension % 2 != 0: |
| raise ValueError(f"dimension ({dimension}) must be divisible by 2") |
| if time.ndim != 1: |
| raise ValueError("The time tensor is expected to be of shape `(batch_size, )`.") |
|
|
| fraction = torch.linspace(0.0, 1.0, dimension // 2, device=device) |
| period = min_period * (max_period / min_period) ** fraction |
| scaling_factor = 1.0 / period * 2 * math.pi |
| sin_input = scaling_factor[None, :] * time[:, None] |
| return torch.cat([torch.sin(sin_input), torch.cos(sin_input)], dim=1) |
|
|
|
|
| def _masked_fill_min(x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor: |
| """Fill with dtype-min where mask is False. `mask` is broadcastable to `x`.""" |
| return x.masked_fill(~mask, torch.finfo(x.dtype).min) |
|
|
|
|
| class AdaRMSNorm(nn.Module): |
| """ |
| Pi05-style AdaRMSNorm (openpi `gemma.RMSNorm` with `cond!=None`): |
| - RMS normalize in float32 |
| - per-layer modulation = Linear(cond -> 3*D) initialized to zeros |
| - output = normed * (1 + scale) + shift |
| - returns gate for gated residual. |
| """ |
|
|
| def __init__(self, dim: int, *, eps: float = 1e-6): |
| super().__init__() |
| self.eps = float(eps) |
| self.modulation = nn.Linear(dim, dim * 3, bias=True) |
| nn.init.zeros_(self.modulation.weight) |
| nn.init.zeros_(self.modulation.bias) |
|
|
| def forward(self, x: torch.Tensor, cond: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: |
| if cond is None: |
| raise ValueError("AdaRMSNorm requires `cond` (Pi05 mode).") |
| if cond.ndim != 2: |
| raise ValueError(f"cond must be (B,D), got {tuple(cond.shape)}") |
| if x.ndim != 3: |
| raise ValueError(f"x must be (B,T,D), got {tuple(x.shape)}") |
| if x.shape[0] != cond.shape[0]: |
| raise ValueError(f"Batch mismatch: {x.shape[0]=} vs {cond.shape[0]=}") |
| if x.shape[-1] != cond.shape[-1]: |
| raise ValueError(f"Dim mismatch: {x.shape[-1]=} vs {cond.shape[-1]=}") |
|
|
| x_dtype = x.dtype |
| x_f32 = x.float() |
| var = x_f32.pow(2).mean(dim=-1, keepdim=True) |
| normed = x_f32 * torch.rsqrt(var + self.eps) |
|
|
| mod = self.modulation(cond).to(dtype=x_f32.dtype) |
| scale, shift, gate = mod.chunk(3, dim=-1) |
| scale = scale[:, None, :] |
| shift = shift[:, None, :] |
| gate = gate[:, None, :] |
| out = normed * (1 + scale) + shift |
| return out.to(dtype=x_dtype), gate.to(dtype=x_dtype) |
|
|
|
|
| class Qwen2PiSelfAttention(nn.Module): |
| """ |
| Qwen2 attention variant for Pi05 action expert: |
| - queries from suffix tokens (action tokens) |
| - keys/values from concat(prefix_kv_cache, suffix_kv) |
| - uses full (non-causal) attention mask provided by caller. |
| """ |
|
|
| def __init__(self, qwen_config: Any, layer_idx: int): |
| super().__init__() |
| try: |
| from transformers.models.qwen2.modeling_qwen2 import apply_rotary_pos_emb, repeat_kv |
| except Exception as e: |
| raise ImportError("transformers qwen2 internals are required for eo_pi_internvl.") from e |
|
|
| self._apply_rotary_pos_emb = apply_rotary_pos_emb |
| self._repeat_kv = repeat_kv |
|
|
| self.layer_idx = int(layer_idx) |
| self.hidden_size = int(qwen_config.hidden_size) |
| self.num_heads = int(qwen_config.num_attention_heads) |
| self.num_kv_heads = int(qwen_config.num_key_value_heads) |
| self.num_kv_groups = self.num_heads // self.num_kv_heads |
| self.head_dim = int(getattr(qwen_config, "head_dim", self.hidden_size // self.num_heads)) |
| self.scaling = self.head_dim**-0.5 |
|
|
| self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=True) |
| self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=True) |
| self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=True) |
| self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False) |
|
|
| def forward( |
| self, |
| hidden_states: torch.Tensor, |
| *, |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], |
| attention_mask: torch.Tensor | None, |
| prefix_k: torch.Tensor, |
| prefix_v: torch.Tensor, |
| ) -> torch.Tensor: |
| |
| bsz, seqlen, _ = hidden_states.shape |
| q = self.q_proj(hidden_states).view(bsz, seqlen, self.num_heads, self.head_dim).transpose(1, 2) |
| k = self.k_proj(hidden_states).view(bsz, seqlen, self.num_kv_heads, self.head_dim).transpose(1, 2) |
| v = self.v_proj(hidden_states).view(bsz, seqlen, self.num_kv_heads, self.head_dim).transpose(1, 2) |
|
|
| cos, sin = position_embeddings |
| q, k = self._apply_rotary_pos_emb(q, k, cos, sin) |
|
|
| if prefix_k.ndim != 4 or prefix_v.ndim != 4: |
| raise ValueError(f"prefix_k/v must be (B, n_kv, P, hd), got {tuple(prefix_k.shape)}, {tuple(prefix_v.shape)}") |
| if int(prefix_k.shape[0]) != bsz or int(prefix_v.shape[0]) != bsz: |
| raise ValueError("prefix_k/v batch mismatch.") |
| if int(prefix_k.shape[1]) != self.num_kv_heads or int(prefix_v.shape[1]) != self.num_kv_heads: |
| raise ValueError( |
| "prefix_k/v num_kv_heads mismatch: " |
| f"{int(prefix_k.shape[1])=} {int(prefix_v.shape[1])=} vs {self.num_kv_heads=}" |
| ) |
| if int(prefix_k.shape[-1]) != self.head_dim or int(prefix_v.shape[-1]) != self.head_dim: |
| raise ValueError("prefix_k/v head_dim mismatch.") |
|
|
| k_all = torch.cat([prefix_k, k], dim=2) |
| v_all = torch.cat([prefix_v, v], dim=2) |
|
|
| k_all = self._repeat_kv(k_all, self.num_kv_groups) |
| v_all = self._repeat_kv(v_all, self.num_kv_groups) |
|
|
| |
| if attention_mask is not None: |
| if attention_mask.ndim != 4: |
| raise ValueError(f"attention_mask must be 4D (B,1,S,K), got {tuple(attention_mask.shape)}") |
| attn_mask = attention_mask.expand(bsz, self.num_heads, seqlen, k_all.shape[-2]) |
| else: |
| attn_mask = None |
|
|
| attn_out = torch.nn.functional.scaled_dot_product_attention( |
| q, k_all, v_all, attn_mask=attn_mask, dropout_p=0.0, is_causal=False |
| ) |
| attn_out = attn_out.transpose(1, 2).contiguous().view(bsz, seqlen, self.num_heads * self.head_dim) |
| return self.o_proj(attn_out) |
|
|
|
|
| class Qwen2PiMLP(nn.Module): |
| def __init__(self, qwen_config: Any): |
| super().__init__() |
| hidden = int(qwen_config.hidden_size) |
| inter = int(qwen_config.intermediate_size) |
| self.gate_proj = nn.Linear(hidden, inter, bias=False) |
| self.up_proj = nn.Linear(hidden, inter, bias=False) |
| self.down_proj = nn.Linear(inter, hidden, bias=False) |
| act_name = str(getattr(qwen_config, "hidden_act", "silu")) |
| if act_name != "silu": |
| logger.warning_once("EO Pi action expert: forcing SiLU hidden_act for MLP (got %s).", act_name) |
| self.act = nn.SiLU() |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.down_proj(self.act(self.gate_proj(x)) * self.up_proj(x)) |
|
|
|
|
| class Qwen2PiDecoderLayer(nn.Module): |
| def __init__(self, qwen_config: Any, layer_idx: int): |
| super().__init__() |
| eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6)) |
| self.input_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps) |
| self.self_attn = Qwen2PiSelfAttention(qwen_config=qwen_config, layer_idx=layer_idx) |
| self.post_attention_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps) |
| self.mlp = Qwen2PiMLP(qwen_config=qwen_config) |
|
|
| def forward( |
| self, |
| hidden_states: torch.Tensor, |
| *, |
| adarms_cond: torch.Tensor, |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], |
| attention_mask: torch.Tensor | None, |
| prefix_k: torch.Tensor, |
| prefix_v: torch.Tensor, |
| ) -> torch.Tensor: |
| residual = hidden_states |
| x, gate = self.input_layernorm(hidden_states, adarms_cond) |
| x = self.self_attn( |
| x, |
| position_embeddings=position_embeddings, |
| attention_mask=attention_mask, |
| prefix_k=prefix_k, |
| prefix_v=prefix_v, |
| ) |
| hidden_states = residual + x * gate |
|
|
| residual = hidden_states |
| x, gate = self.post_attention_layernorm(hidden_states, adarms_cond) |
| x = self.mlp(x) |
| hidden_states = residual + x * gate |
| return hidden_states |
|
|
|
|
| class Qwen2PiActionExpert(nn.Module): |
| def __init__(self, qwen_config: Any, *, init_from_qwen2_lm: nn.Module | None = None): |
| super().__init__() |
| try: |
| from transformers.models.qwen2.modeling_qwen2 import Qwen2RotaryEmbedding |
| except Exception as e: |
| raise ImportError("transformers qwen2 internals are required for eo_pi_internvl.") from e |
|
|
| self.config = qwen_config |
| self.num_layers = int(qwen_config.num_hidden_layers) |
| self.layers = nn.ModuleList([Qwen2PiDecoderLayer(qwen_config, i) for i in range(self.num_layers)]) |
| self.rotary_emb = Qwen2RotaryEmbedding(qwen_config) |
| self.final_norm = AdaRMSNorm(int(qwen_config.hidden_size), eps=float(getattr(qwen_config, "rms_norm_eps", 1e-6))) |
|
|
| if init_from_qwen2_lm is not None: |
| self._init_from_qwen2_lm(init_from_qwen2_lm) |
|
|
| def _init_from_qwen2_lm(self, qwen2_lm: nn.Module): |
| """ |
| Copy attention/MLP weights from a Qwen2ForCausalLM (or Qwen2Model) into this expert. |
| AdaRMSNorm modulation stays zero-init to match Pi05. |
| """ |
| src_layers = None |
| if hasattr(qwen2_lm, "model") and hasattr(qwen2_lm.model, "layers"): |
| src_layers = qwen2_lm.model.layers |
| elif hasattr(qwen2_lm, "layers"): |
| src_layers = qwen2_lm.layers |
| if src_layers is None: |
| raise ValueError("Unsupported qwen2_lm: cannot locate `.model.layers`.") |
|
|
| if len(src_layers) != len(self.layers): |
| raise ValueError(f"Layer count mismatch: {len(src_layers)=} vs {len(self.layers)=}") |
|
|
| for dst, src in zip(self.layers, src_layers, strict=True): |
| |
| dst.self_attn.q_proj.load_state_dict(src.self_attn.q_proj.state_dict()) |
| dst.self_attn.k_proj.load_state_dict(src.self_attn.k_proj.state_dict()) |
| dst.self_attn.v_proj.load_state_dict(src.self_attn.v_proj.state_dict()) |
| dst.self_attn.o_proj.load_state_dict(src.self_attn.o_proj.state_dict()) |
| |
| dst.mlp.gate_proj.load_state_dict(src.mlp.gate_proj.state_dict()) |
| dst.mlp.up_proj.load_state_dict(src.mlp.up_proj.state_dict()) |
| dst.mlp.down_proj.load_state_dict(src.mlp.down_proj.state_dict()) |
|
|
| def forward( |
| self, |
| action_tokens: torch.Tensor, |
| *, |
| prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]], |
| prefix_key_mask: torch.Tensor, |
| position_ids: torch.Tensor, |
| adarms_cond: torch.Tensor, |
| suffix_key_mask: torch.Tensor | None = None, |
| ) -> torch.Tensor: |
| """ |
| Args: |
| action_tokens: (B, S, D) |
| prefix_kv_cache: list[(k,v)] each (B, n_kv, P, hd) from InternVL prefix expert. |
| prefix_key_mask: (B, P) bool, True = valid prefix token. |
| position_ids: (B, S) positions for action tokens (prefix_len + [0..S-1]). |
| adarms_cond: (B, D) time conditioning vector. |
| suffix_key_mask: (B, S) bool, True = valid suffix token (optional; for padding). |
| """ |
| if action_tokens.ndim != 3: |
| raise ValueError(f"action_tokens must be (B,S,D), got {tuple(action_tokens.shape)}") |
| bsz, s_len, _ = action_tokens.shape |
| if prefix_key_mask.ndim != 2 or int(prefix_key_mask.shape[0]) != bsz: |
| raise ValueError(f"prefix_key_mask must be (B,P), got {tuple(prefix_key_mask.shape)}") |
| if position_ids.shape != (bsz, s_len): |
| raise ValueError(f"position_ids must be (B,S)={bsz,s_len}, got {tuple(position_ids.shape)}") |
| if len(prefix_kv_cache) == 0: |
| raise ValueError("prefix_kv_cache is empty.") |
|
|
| |
| position_embeddings = self.rotary_emb(action_tokens, position_ids) |
|
|
| if suffix_key_mask is None: |
| suffix_key_mask = torch.ones((bsz, s_len), device=action_tokens.device, dtype=torch.bool) |
| if suffix_key_mask.shape != (bsz, s_len): |
| raise ValueError(f"suffix_key_mask must be (B,S), got {tuple(suffix_key_mask.shape)}") |
|
|
| |
| prefix_part = (suffix_key_mask[:, None, :, None] & prefix_key_mask[:, None, None, :]) |
| suffix_part = (suffix_key_mask[:, None, :, None] & suffix_key_mask[:, None, None, :]) |
| allow = torch.cat([prefix_part, suffix_part], dim=-1) |
| attn_mask = torch.zeros( |
| (bsz, 1, s_len, int(prefix_key_mask.shape[1]) + s_len), |
| device=action_tokens.device, |
| dtype=action_tokens.dtype, |
| ) |
| attn_mask = _masked_fill_min(attn_mask, allow) |
|
|
| x = action_tokens |
| for layer_idx, layer in enumerate(self.layers): |
| if layer_idx >= len(prefix_kv_cache): |
| raise ValueError( |
| "prefix_kv_cache has fewer layers than action expert. " |
| f"{len(prefix_kv_cache)=} < {len(self.layers)=}" |
| ) |
| pk, pv = prefix_kv_cache[layer_idx] |
| x = layer( |
| x, |
| adarms_cond=adarms_cond, |
| position_embeddings=position_embeddings, |
| attention_mask=attn_mask, |
| prefix_k=pk, |
| prefix_v=pv, |
| ) |
|
|
| x, _ = self.final_norm(x, adarms_cond) |
| return x |
|
|
|
|
| class Qwen3HeadRMSNorm(nn.Module): |
| """Qwen3-style RMSNorm used for `q_norm`/`k_norm` on per-head dim.""" |
|
|
| def __init__(self, dim: int, *, eps: float = 1e-6): |
| super().__init__() |
| self.weight = nn.Parameter(torch.ones(dim)) |
| self.eps = float(eps) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| dtype = x.dtype |
| x_f32 = x.float() |
| var = x_f32.pow(2).mean(dim=-1, keepdim=True) |
| x_norm = x_f32 * torch.rsqrt(var + self.eps) |
| return (self.weight * x_norm).to(dtype=dtype) |
|
|
|
|
| class Qwen3PiSelfAttention(nn.Module): |
| """ |
| Qwen3 attention variant for Pi05 action expert: |
| - queries from suffix tokens (action tokens) |
| - keys/values from concat(prefix_kv_cache, suffix_kv) |
| - uses full (non-causal) attention mask provided by caller. |
| """ |
|
|
| def __init__(self, qwen_config: Any, layer_idx: int): |
| super().__init__() |
| try: |
| from transformers.models.qwen3.modeling_qwen3 import apply_rotary_pos_emb, repeat_kv |
| except Exception as e: |
| raise ImportError("transformers qwen3 internals are required for eo_pi_internvl.") from e |
|
|
| self._apply_rotary_pos_emb = apply_rotary_pos_emb |
| self._repeat_kv = repeat_kv |
|
|
| self.layer_idx = int(layer_idx) |
| self.hidden_size = int(qwen_config.hidden_size) |
| self.num_heads = int(qwen_config.num_attention_heads) |
| self.num_kv_heads = int(qwen_config.num_key_value_heads) |
| self.num_kv_groups = self.num_heads // self.num_kv_heads |
| self.head_dim = int(getattr(qwen_config, "head_dim", self.hidden_size // self.num_heads)) |
|
|
| attn_bias = bool(getattr(qwen_config, "attention_bias", False)) |
| self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=attn_bias) |
| self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=attn_bias) |
| self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=attn_bias) |
| self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=attn_bias) |
|
|
| eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6)) |
| self.q_norm = Qwen3HeadRMSNorm(self.head_dim, eps=eps) |
| self.k_norm = Qwen3HeadRMSNorm(self.head_dim, eps=eps) |
|
|
| def forward( |
| self, |
| hidden_states: torch.Tensor, |
| *, |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], |
| attention_mask: torch.Tensor | None, |
| prefix_k: torch.Tensor, |
| prefix_v: torch.Tensor, |
| ) -> torch.Tensor: |
| bsz, seqlen, _ = hidden_states.shape |
| hidden_shape = (bsz, seqlen, -1, self.head_dim) |
|
|
| q = self.q_proj(hidden_states).view(hidden_shape) |
| k = self.k_proj(hidden_states).view(hidden_shape) |
| v = self.v_proj(hidden_states).view(hidden_shape) |
| q = self.q_norm(q).transpose(1, 2) |
| k = self.k_norm(k).transpose(1, 2) |
| v = v.transpose(1, 2) |
|
|
| cos, sin = position_embeddings |
| q, k = self._apply_rotary_pos_emb(q, k, cos, sin) |
|
|
| if prefix_k.ndim != 4 or prefix_v.ndim != 4: |
| raise ValueError( |
| f"prefix_k/v must be (B, n_kv, P, hd), got {tuple(prefix_k.shape)}, {tuple(prefix_v.shape)}" |
| ) |
| if int(prefix_k.shape[0]) != bsz or int(prefix_v.shape[0]) != bsz: |
| raise ValueError("prefix_k/v batch mismatch.") |
| if int(prefix_k.shape[1]) != self.num_kv_heads or int(prefix_v.shape[1]) != self.num_kv_heads: |
| raise ValueError( |
| "prefix_k/v num_kv_heads mismatch: " |
| f"{int(prefix_k.shape[1])=} {int(prefix_v.shape[1])=} vs {self.num_kv_heads=}" |
| ) |
| if int(prefix_k.shape[-1]) != self.head_dim or int(prefix_v.shape[-1]) != self.head_dim: |
| raise ValueError("prefix_k/v head_dim mismatch.") |
|
|
| k_all = torch.cat([prefix_k, k], dim=2) |
| v_all = torch.cat([prefix_v, v], dim=2) |
| k_all = self._repeat_kv(k_all, self.num_kv_groups) |
| v_all = self._repeat_kv(v_all, self.num_kv_groups) |
|
|
| if attention_mask is not None: |
| if attention_mask.ndim != 4: |
| raise ValueError(f"attention_mask must be 4D (B,1,S,K), got {tuple(attention_mask.shape)}") |
| attn_mask = attention_mask.expand(bsz, self.num_heads, seqlen, k_all.shape[-2]) |
| else: |
| attn_mask = None |
|
|
| attn_out = torch.nn.functional.scaled_dot_product_attention( |
| q, k_all, v_all, attn_mask=attn_mask, dropout_p=0.0, is_causal=False |
| ) |
| attn_out = attn_out.transpose(1, 2).contiguous().view(bsz, seqlen, self.num_heads * self.head_dim) |
| return self.o_proj(attn_out) |
|
|
|
|
| class Qwen3PiMLP(nn.Module): |
| def __init__(self, qwen_config: Any): |
| super().__init__() |
| hidden = int(qwen_config.hidden_size) |
| inter = int(qwen_config.intermediate_size) |
| self.gate_proj = nn.Linear(hidden, inter, bias=False) |
| self.up_proj = nn.Linear(hidden, inter, bias=False) |
| self.down_proj = nn.Linear(inter, hidden, bias=False) |
| act_name = str(getattr(qwen_config, "hidden_act", "silu")) |
| if act_name != "silu": |
| logger.warning_once("EO Pi action expert: forcing SiLU hidden_act for MLP (got %s).", act_name) |
| self.act = nn.SiLU() |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.down_proj(self.act(self.gate_proj(x)) * self.up_proj(x)) |
|
|
|
|
| class Qwen3PiDecoderLayer(nn.Module): |
| def __init__(self, qwen_config: Any, layer_idx: int): |
| super().__init__() |
| eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6)) |
| self.input_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps) |
| self.self_attn = Qwen3PiSelfAttention(qwen_config=qwen_config, layer_idx=layer_idx) |
| self.post_attention_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps) |
| self.mlp = Qwen3PiMLP(qwen_config=qwen_config) |
|
|
| def forward( |
| self, |
| hidden_states: torch.Tensor, |
| *, |
| adarms_cond: torch.Tensor, |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], |
| attention_mask: torch.Tensor | None, |
| prefix_k: torch.Tensor, |
| prefix_v: torch.Tensor, |
| ) -> torch.Tensor: |
| residual = hidden_states |
| x, gate = self.input_layernorm(hidden_states, adarms_cond) |
| x = self.self_attn( |
| x, |
| position_embeddings=position_embeddings, |
| attention_mask=attention_mask, |
| prefix_k=prefix_k, |
| prefix_v=prefix_v, |
| ) |
| hidden_states = residual + x * gate |
|
|
| residual = hidden_states |
| x, gate = self.post_attention_layernorm(hidden_states, adarms_cond) |
| x = self.mlp(x) |
| hidden_states = residual + x * gate |
| return hidden_states |
|
|
|
|
| class Qwen3PiActionExpert(nn.Module): |
| def __init__(self, qwen_config: Any, *, init_from_qwen3_lm: nn.Module | None = None): |
| super().__init__() |
| try: |
| from transformers.models.qwen3.modeling_qwen3 import Qwen3RotaryEmbedding |
| except Exception as e: |
| raise ImportError("transformers qwen3 internals are required for eo_pi_internvl.") from e |
|
|
| self.config = qwen_config |
| self.num_layers = int(qwen_config.num_hidden_layers) |
| self.layers = nn.ModuleList([Qwen3PiDecoderLayer(qwen_config, i) for i in range(self.num_layers)]) |
| self.rotary_emb = Qwen3RotaryEmbedding(qwen_config) |
| self.final_norm = AdaRMSNorm(int(qwen_config.hidden_size), eps=float(getattr(qwen_config, "rms_norm_eps", 1e-6))) |
|
|
| if init_from_qwen3_lm is not None: |
| self._init_from_qwen3_lm(init_from_qwen3_lm) |
|
|
| def _init_from_qwen3_lm(self, qwen3_lm: nn.Module): |
| """ |
| Copy attention/MLP weights from a Qwen3ForCausalLM (or Qwen3Model) into this expert. |
| AdaRMSNorm modulation stays zero-init to match Pi05. |
| """ |
| src_layers = None |
| if hasattr(qwen3_lm, "model") and hasattr(qwen3_lm.model, "layers"): |
| src_layers = qwen3_lm.model.layers |
| elif hasattr(qwen3_lm, "layers"): |
| src_layers = qwen3_lm.layers |
| if src_layers is None: |
| raise ValueError("Unsupported qwen3_lm: cannot locate `.model.layers`.") |
|
|
| if len(src_layers) != len(self.layers): |
| raise ValueError(f"Layer count mismatch: {len(src_layers)=} vs {len(self.layers)=}") |
|
|
| for dst, src in zip(self.layers, src_layers, strict=True): |
| dst.self_attn.q_proj.load_state_dict(src.self_attn.q_proj.state_dict()) |
| dst.self_attn.k_proj.load_state_dict(src.self_attn.k_proj.state_dict()) |
| dst.self_attn.v_proj.load_state_dict(src.self_attn.v_proj.state_dict()) |
| dst.self_attn.o_proj.load_state_dict(src.self_attn.o_proj.state_dict()) |
| |
| if hasattr(src.self_attn, "q_norm") and hasattr(dst.self_attn, "q_norm"): |
| dst.self_attn.q_norm.weight.data.copy_(src.self_attn.q_norm.weight.data) |
| if hasattr(src.self_attn, "k_norm") and hasattr(dst.self_attn, "k_norm"): |
| dst.self_attn.k_norm.weight.data.copy_(src.self_attn.k_norm.weight.data) |
| |
| dst.mlp.gate_proj.load_state_dict(src.mlp.gate_proj.state_dict()) |
| dst.mlp.up_proj.load_state_dict(src.mlp.up_proj.state_dict()) |
| dst.mlp.down_proj.load_state_dict(src.mlp.down_proj.state_dict()) |
|
|
| def forward( |
| self, |
| action_tokens: torch.Tensor, |
| *, |
| prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]], |
| prefix_key_mask: torch.Tensor, |
| position_ids: torch.Tensor, |
| adarms_cond: torch.Tensor, |
| suffix_key_mask: torch.Tensor | None = None, |
| ) -> torch.Tensor: |
| if action_tokens.ndim != 3: |
| raise ValueError(f"action_tokens must be (B,S,D), got {tuple(action_tokens.shape)}") |
| bsz, s_len, _ = action_tokens.shape |
| if prefix_key_mask.ndim != 2 or int(prefix_key_mask.shape[0]) != bsz: |
| raise ValueError(f"prefix_key_mask must be (B,P), got {tuple(prefix_key_mask.shape)}") |
| if position_ids.shape != (bsz, s_len): |
| raise ValueError(f"position_ids must be (B,S)={bsz,s_len}, got {tuple(position_ids.shape)}") |
| if len(prefix_kv_cache) == 0: |
| raise ValueError("prefix_kv_cache is empty.") |
|
|
| position_embeddings = self.rotary_emb(action_tokens, position_ids) |
|
|
| if suffix_key_mask is None: |
| suffix_key_mask = torch.ones((bsz, s_len), device=action_tokens.device, dtype=torch.bool) |
| if suffix_key_mask.shape != (bsz, s_len): |
| raise ValueError(f"suffix_key_mask must be (B,S), got {tuple(suffix_key_mask.shape)}") |
|
|
| prefix_part = (suffix_key_mask[:, None, :, None] & prefix_key_mask[:, None, None, :]) |
| suffix_part = (suffix_key_mask[:, None, :, None] & suffix_key_mask[:, None, None, :]) |
| allow = torch.cat([prefix_part, suffix_part], dim=-1) |
| attn_mask = torch.zeros( |
| (bsz, 1, s_len, int(prefix_key_mask.shape[1]) + s_len), |
| device=action_tokens.device, |
| dtype=action_tokens.dtype, |
| ) |
| attn_mask = _masked_fill_min(attn_mask, allow) |
|
|
| x = action_tokens |
| for layer_idx, layer in enumerate(self.layers): |
| if layer_idx >= len(prefix_kv_cache): |
| raise ValueError( |
| "prefix_kv_cache has fewer layers than action expert. " |
| f"{len(prefix_kv_cache)=} < {len(self.layers)=}" |
| ) |
| pk, pv = prefix_kv_cache[layer_idx] |
| x = layer( |
| x, |
| adarms_cond=adarms_cond, |
| position_embeddings=position_embeddings, |
| attention_mask=attn_mask, |
| prefix_k=pk, |
| prefix_v=pv, |
| ) |
|
|
| x, _ = self.final_norm(x, adarms_cond) |
| return x |
|
|
|
|
| @dataclass |
| class EO1InternVLPiFlowMatchingOutput(ModelOutput): |
| loss: torch.FloatTensor | None = None |
| fm_loss: torch.FloatTensor | None = None |
| fm_loss_pos: torch.FloatTensor | None = None |
| fm_loss_rot: torch.FloatTensor | None = None |
| fm_loss_gripper: torch.FloatTensor | None = None |
| ar_loss: torch.FloatTensor | None = None |
| actions: torch.FloatTensor | None = None |
|
|
| logits: torch.FloatTensor | None = None |
| hidden_states: tuple[torch.FloatTensor] | None = None |
| attentions: tuple[torch.FloatTensor] | None = None |
|
|
| def count_params(module, trainable_only=False): |
| ps = module.parameters() |
| if trainable_only: |
| ps = [p for p in ps if p.requires_grad] |
| return sum(p.numel() for p in ps) |
|
|
| class EO1InternVLPiFlowMatchingModel(PreTrainedModel): |
| """EO1 action model with InternVL prefix expert + Pi05-style (Qwen2/Qwen3) action expert (AdaRMSNorm timestep).""" |
|
|
| config_class = EO1InternVLPiFlowMatchingConfig |
| supports_gradient_checkpointing = True |
|
|
| def __init__( |
| self, |
| config: EO1InternVLPiFlowMatchingConfig, |
| internvl_backbone: nn.Module, |
| action_expert: nn.Module | None = None, |
| ): |
| super().__init__(config) |
| self.internvl_backbone = internvl_backbone |
|
|
| |
| if not hasattr(self.internvl_backbone, "language_model"): |
| raise ValueError("internvl_backbone must have `.language_model`.") |
| |
| |
| |
|
|
| |
| prefix_cfg = self.prefix_lm.config |
| cfg_name = prefix_cfg.__class__.__name__ |
|
|
| expert_cfg = copy.deepcopy(prefix_cfg) |
| if getattr(config, "expert_hidden_size", None) is not None: |
| expert_cfg.hidden_size = int(config.expert_hidden_size) |
| if getattr(config, "expert_intermediate_size", None) is not None: |
| expert_cfg.intermediate_size = int(config.expert_intermediate_size) |
| if getattr(config, "expert_num_attention_heads", None) is not None: |
| expert_cfg.num_attention_heads = int(config.expert_num_attention_heads) |
| if getattr(config, "expert_num_hidden_layers", None) is not None: |
| expert_cfg.num_hidden_layers = int(config.expert_num_hidden_layers) |
| |
| if int(getattr(expert_cfg, "num_key_value_heads", -1)) != int(getattr(prefix_cfg, "num_key_value_heads", -2)): |
| raise ValueError( |
| "To reuse prefix KV-cache, expert and prefix must share num_key_value_heads. " |
| f"{int(getattr(prefix_cfg, 'num_key_value_heads'))=} vs {int(getattr(expert_cfg, 'num_key_value_heads'))=}." |
| ) |
| if int(getattr(expert_cfg, "head_dim", -1)) != int(getattr(prefix_cfg, "head_dim", -2)): |
| raise ValueError( |
| "To reuse prefix KV-cache, expert and prefix must share head_dim. " |
| f"{int(getattr(prefix_cfg, 'head_dim'))=} vs {int(getattr(expert_cfg, 'head_dim'))=}." |
| ) |
| if int(expert_cfg.num_attention_heads) % int(expert_cfg.num_key_value_heads) != 0: |
| raise ValueError( |
| "expert_num_attention_heads must be divisible by num_key_value_heads. " |
| f"{int(expert_cfg.num_attention_heads)=} {int(expert_cfg.num_key_value_heads)=}." |
| ) |
| |
| if hasattr(expert_cfg, "layer_types") and isinstance(getattr(expert_cfg, "layer_types"), list): |
| if len(expert_cfg.layer_types) != int(expert_cfg.num_hidden_layers): |
| expert_cfg.layer_types = ["full_attention"] * int(expert_cfg.num_hidden_layers) |
|
|
| self._expert_hidden_size = int(expert_cfg.hidden_size) |
| self._expert_num_layers = int(expert_cfg.num_hidden_layers) |
| self._prefix_num_layers = int(getattr(prefix_cfg, "num_hidden_layers", self._expert_num_layers)) |
|
|
| if self._expert_num_layers > self._prefix_num_layers: |
| raise ValueError( |
| "expert_num_hidden_layers cannot exceed prefix LM layers when using prefix KV-cache. " |
| f"{self._expert_num_layers=} > {self._prefix_num_layers=}." |
| ) |
|
|
| mapping = str(getattr(config, "expert_layer_mapping", "last")).strip().lower() |
| if mapping == "last": |
| start = self._prefix_num_layers - self._expert_num_layers |
| self._prefix_kv_layer_indices = list(range(start, self._prefix_num_layers)) |
| elif mapping == "first": |
| self._prefix_kv_layer_indices = list(range(self._expert_num_layers)) |
| else: |
| raise ValueError(f"Unsupported expert_layer_mapping={mapping!r} (expected 'last' or 'first').") |
|
|
| max_action_dim = int(config.max_action_dim) |
|
|
| |
| self.action_in_proj = nn.Linear(max_action_dim, self._expert_hidden_size) |
| self.action_out_proj = nn.Linear(self._expert_hidden_size, max_action_dim) |
|
|
| |
| self.time_mlp_in = nn.Linear(self._expert_hidden_size, self._expert_hidden_size) |
| self.time_mlp_out = nn.Linear(self._expert_hidden_size, self._expert_hidden_size) |
|
|
| if action_expert is not None: |
| self.action_expert = action_expert |
| else: |
| |
| init_from = self.prefix_lm if bool(getattr(self.config, "expert_init_from_backbone", False)) else None |
| try: |
| if cfg_name == "Qwen2Config": |
| self.action_expert = Qwen2PiActionExpert(expert_cfg, init_from_qwen2_lm=init_from) |
| elif cfg_name == "Qwen3Config": |
| self.action_expert = Qwen3PiActionExpert(expert_cfg, init_from_qwen3_lm=init_from) |
| else: |
| raise NotImplementedError( |
| "eo_pi_internvl currently supports only Qwen2/Qwen3 LMs for action expert. " |
| f"Got: {cfg_name}" |
| ) |
| except Exception as e: |
| raise RuntimeError( |
| "Failed to build/initialize action expert. If you set `expert_init_from_backbone=True`, " |
| "make sure expert_* hyperparams exactly match the prefix LM shapes, or set it to False " |
| "for Pi05-style random init." |
| ) from e |
| |
| n_all = count_params(self.action_expert) |
| n_train = count_params(self.action_expert, trainable_only=True) |
| print(f"action_expert params: {n_all/1e6:.2f}M (trainable {n_train/1e6:.2f}M)") |
| self.post_init() |
|
|
| @property |
| def prefix_lm(self) -> nn.Module: |
| |
| return self.internvl_backbone.language_model |
|
|
| @staticmethod |
| def _action_group_indices(action_dim: int, *, action_dim_mask: torch.Tensor | None = None) -> dict[str, list[int]]: |
| """ |
| Best-effort split of action dims into position/rotation/gripper groups. |
| |
| Supports both common layouts: |
| 1) Compact (single-arm): [xyz(3), rotvec(3), gripper(1)] -> 7 dims (or bimanual 14 dims). |
| 2) EO unified action encoding (see `dataset/action_encoding.py`): |
| left: 0:3 pos, 3:6 rotvec (or 3:9 r6d), 9 gripper |
| right: 10:13 pos, 13:16 rotvec (or 13:19 r6d), 19 gripper |
| |
| Rotation repr is controlled via env var `EO_ACTION_ROT_REPR` (default rotvec). |
| """ |
| d = int(action_dim) |
| if d <= 0: |
| return {"pos": [], "rot": [], "gripper": [], "other": []} |
|
|
| rot_repr = os.environ.get("EO_ACTION_ROT_REPR", "rotvec").strip().lower() |
| rot_is_r6d = rot_repr in ("r6d", "rot6d", "6d") |
|
|
| m_any = None |
| if action_dim_mask is not None and torch.is_tensor(action_dim_mask): |
| m = action_dim_mask.detach() |
| if m.ndim == 1: |
| m_any = m.to(torch.bool) |
| elif m.ndim == 2: |
| m_any = m.to(torch.bool).any(dim=0) |
| elif m.ndim == 3 and int(m.shape[1]) == 1: |
| m_any = m[:, 0, :].to(torch.bool).any(dim=0) |
| else: |
| m_any = m.reshape(-1, m.shape[-1]).to(torch.bool).any(dim=0) |
| if int(m_any.numel()) != d: |
| if int(m_any.numel()) > d: |
| m_any = m_any[:d] |
| else: |
| pad = torch.zeros((d - int(m_any.numel()),), dtype=torch.bool, device=m_any.device) |
| m_any = torch.cat([m_any, pad], dim=0) |
|
|
| |
| eff = d |
| if m_any is not None and bool(m_any.any().item()): |
| eff = int(torch.nonzero(m_any, as_tuple=False).max().item()) + 1 |
|
|
| pos: list[int] = [] |
| rot: list[int] = [] |
| gripper: list[int] = [] |
|
|
| |
| if eff in (7, 14): |
| arm_offsets = [0] if eff == 7 else [0, 7] |
| for off in arm_offsets: |
| pos.extend([off + i for i in range(0, 3) if off + i < d]) |
| rot.extend([off + i for i in range(3, 6) if off + i < d]) |
| g = off + 6 |
| if g < d: |
| gripper.append(g) |
| used = set(pos) | set(rot) | set(gripper) |
| other = [i for i in range(d) if i not in used] |
| return {"pos": pos, "rot": rot, "gripper": gripper, "other": other} |
|
|
| |
| right_active = (d >= 20) |
| if m_any is not None and int(m_any.numel()) >= 20: |
| right_active = bool(m_any[10:20].any().item()) |
|
|
| |
| pos.extend([i for i in range(0, min(3, d))]) |
| rot.extend([i for i in range(3, min(6, d))]) |
| if rot_is_r6d: |
| rot.extend([i for i in range(6, min(9, d))]) |
| if 9 < d: |
| gripper.append(9) |
|
|
| |
| if right_active and d >= 20: |
| pos.extend([i for i in range(10, min(13, d))]) |
| rot.extend([i for i in range(13, min(16, d))]) |
| if rot_is_r6d: |
| rot.extend([i for i in range(16, min(19, d))]) |
| if 19 < d: |
| gripper.append(19) |
|
|
| used = set(pos) | set(rot) | set(gripper) |
| other = [i for i in range(d) if i not in used] |
| return {"pos": pos, "rot": rot, "gripper": gripper, "other": other} |
|
|
| |
| def sample_noise(self, shape, device): |
| return torch.normal(mean=0.0, std=1.0, size=shape, dtype=torch.float32, device=device) |
|
|
| def sample_time(self, bsize, device): |
| beta_dist = torch.distributions.Beta(concentration1=1.5, concentration0=1.0) |
| time_beta = beta_dist.sample((bsize,)).to(device=device, dtype=torch.float32) |
| return time_beta * 0.999 + 0.001 |
|
|
| def _embed_time_cond(self, timestep: torch.Tensor, *, dtype: torch.dtype, device: torch.device) -> torch.Tensor: |
| hidden = int(getattr(self, "_expert_hidden_size", self.prefix_lm.config.hidden_size)) |
| t_emb = create_sinusoidal_pos_embedding(timestep, hidden, device=device).to(dtype=dtype) |
| t_emb = self.time_mlp_in(t_emb) |
| t_emb = F.silu(t_emb) |
| t_emb = self.time_mlp_out(t_emb) |
| t_emb = F.silu(t_emb) |
| return t_emb |
|
|
| def _select_prefix_kv_cache(self, prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]]) -> list[tuple[torch.Tensor, torch.Tensor]]: |
| if not hasattr(self, "_prefix_kv_layer_indices"): |
| return prefix_kv_cache |
| idx = list(getattr(self, "_prefix_kv_layer_indices")) |
| if not idx: |
| return prefix_kv_cache |
| if max(idx) >= len(prefix_kv_cache): |
| raise ValueError( |
| "Prefix KV cache shorter than expected. " |
| f"{len(prefix_kv_cache)=} < {max(idx)+1=}." |
| ) |
| return [prefix_kv_cache[i] for i in idx] |
|
|
| def _replace_img_context_embeddings( |
| self, |
| input_ids: torch.LongTensor, |
| inputs_embeds: torch.FloatTensor, |
| pixel_values: torch.FloatTensor, |
| image_flags: torch.LongTensor | None, |
| ) -> torch.FloatTensor: |
| img_context_token_id = self.config.img_context_token_id |
| if img_context_token_id is None: |
| raise ValueError("config.img_context_token_id is None (tokenizer/model not initialized).") |
|
|
| try: |
| vision_dtype = next(self.internvl_backbone.vision_model.parameters()).dtype |
| except Exception: |
| vision_dtype = inputs_embeds.dtype |
| pixel_values = pixel_values.to(device=inputs_embeds.device, dtype=vision_dtype) |
|
|
| vit_embeds = self.internvl_backbone.extract_feature(pixel_values) |
| if image_flags is not None: |
| image_flags = image_flags.squeeze(-1) |
| vit_embeds = vit_embeds[image_flags == 1] |
|
|
| bsz, _, hidden = inputs_embeds.shape |
| selected = input_ids == int(img_context_token_id) |
| n_ctx = int(selected.sum().item()) |
| if n_ctx == 0: |
| return inputs_embeds |
|
|
| vit_flat = vit_embeds.reshape(-1, hidden) |
| if vit_flat.shape[0] < n_ctx: |
| raise ValueError(f"IMG_CONTEXT mismatch: need {n_ctx} embeddings, got {vit_flat.shape[0]}.") |
|
|
| mask3 = selected.unsqueeze(-1).expand_as(inputs_embeds) |
| src = vit_flat[:n_ctx].to(device=inputs_embeds.device, dtype=inputs_embeds.dtype).reshape(-1) |
| return inputs_embeds.masked_scatter(mask3, src) |
|
|
| @staticmethod |
| def _find_suffix_starts(action_mask_token: torch.Tensor, *, expected_horizon: int | None = None) -> torch.Tensor: |
| if action_mask_token.ndim != 2: |
| raise ValueError(f"action_mask_token must be (B,S), got {tuple(action_mask_token.shape)}") |
| bsz = int(action_mask_token.shape[0]) |
| starts = torch.empty((bsz,), dtype=torch.long, device=action_mask_token.device) |
| for b in range(bsz): |
| pos = torch.nonzero(action_mask_token[b], as_tuple=False).squeeze(-1) |
| if int(pos.numel()) == 0: |
| raise ValueError(f"Expected at least 1 action token per sample, got 0 for batch {b}.") |
| if expected_horizon is not None and int(pos.numel()) not in (1, int(expected_horizon)): |
| raise ValueError( |
| f"Expected 1 or {int(expected_horizon)} action tokens per sample, got {int(pos.numel())} for batch {b}." |
| ) |
| starts[b] = pos.min() |
| return starts |
|
|
| |
| def forward( |
| self, |
| input_ids: torch.LongTensor | None = None, |
| attention_mask: torch.Tensor | None = None, |
| position_ids: torch.LongTensor | None = None, |
| inputs_embeds: torch.FloatTensor | None = None, |
| labels: torch.LongTensor | None = None, |
| pixel_values: torch.FloatTensor | None = None, |
| image_flags: torch.LongTensor | None = None, |
| states: torch.Tensor | None = None, |
| actions: torch.Tensor | None = None, |
| action_is_pad: torch.Tensor | None = None, |
| action_dim_mask: torch.Tensor | None = None, |
| use_cache: bool | None = None, |
| output_attentions: bool | None = None, |
| output_hidden_states: bool | None = None, |
| **kwargs, |
| ) -> EO1InternVLPiFlowMatchingOutput: |
| if input_ids is None: |
| raise ValueError("Pi model requires `input_ids`.") |
| if actions is None: |
| raise ValueError("Pi model forward requires `actions` (flow-matching).") |
| if attention_mask is None: |
| attention_mask = torch.ones_like(input_ids, dtype=torch.long, device=input_ids.device) |
|
|
| action_token_id = self.config.action_token_id |
| if action_token_id is None: |
| raise ValueError("config.action_token_id is None (tokenizer/model not initialized).") |
| action_pass_id = self.config.action_pass_id |
|
|
| noise_mask = input_ids == int(action_token_id) |
| pass_mask = (input_ids == int(action_pass_id)) if action_pass_id is not None else torch.zeros_like(noise_mask) |
| action_mask_token = noise_mask | pass_mask |
|
|
| bsz, horizon, act_dim = actions.shape |
|
|
| suffix_starts = self._find_suffix_starts(action_mask_token, expected_horizon=int(horizon)) |
| prefix_len = int(suffix_starts.max().item()) |
|
|
| |
| prefix_ids = input_ids[:, :prefix_len] |
| prefix_am = attention_mask[:, :prefix_len].to(dtype=torch.bool, device=input_ids.device) |
| ar = torch.arange(prefix_len, device=input_ids.device) |
| prefix_valid = prefix_am & (ar[None, :] < suffix_starts[:, None]) |
|
|
| prefix_embeds = self.prefix_lm.get_input_embeddings()(prefix_ids).clone() |
| if pixel_values is not None: |
| prefix_embeds = self._replace_img_context_embeddings( |
| input_ids=prefix_ids, |
| inputs_embeds=prefix_embeds, |
| pixel_values=pixel_values, |
| image_flags=image_flags, |
| ) |
|
|
| prefix_attn = prefix_valid.to(dtype=torch.long) |
| prefix_out = self.prefix_lm.model( |
| inputs_embeds=prefix_embeds, |
| attention_mask=prefix_attn, |
| use_cache=True, |
| return_dict=True, |
| ) |
| prefix_pkv = prefix_out.past_key_values |
| prefix_kv_cache = [prefix_pkv[i] for i in range(len(prefix_pkv))] |
| prefix_kv_cache = self._select_prefix_kv_cache(prefix_kv_cache) |
|
|
| |
| actions_f32 = actions.to(dtype=torch.float32, device=input_ids.device) |
| time = self.sample_time(int(bsz), input_ids.device) |
| noise = self.sample_noise(actions_f32.shape, input_ids.device) |
| time_expanded = time[:, None, None] |
| x_t = time_expanded * noise + (1 - time_expanded) * actions_f32 |
| u_t = noise - actions_f32 |
|
|
| |
| action_tokens = self.action_in_proj(x_t.to(dtype=self.action_in_proj.weight.dtype)) |
|
|
| |
| adarms_cond = self._embed_time_cond(time, dtype=action_tokens.dtype, device=action_tokens.device) |
|
|
| |
| pos_ids = suffix_starts[:, None] + torch.arange(horizon, device=input_ids.device)[None, :] |
|
|
| suffix_valid = torch.ones((int(bsz), int(horizon)), device=input_ids.device, dtype=torch.bool) |
| if action_is_pad is not None: |
| suffix_valid = suffix_valid & (~action_is_pad.to(device=input_ids.device, dtype=torch.bool)) |
|
|
| expert_h = self.action_expert( |
| action_tokens, |
| prefix_kv_cache=prefix_kv_cache, |
| prefix_key_mask=prefix_valid, |
| position_ids=pos_ids, |
| adarms_cond=adarms_cond, |
| suffix_key_mask=suffix_valid, |
| ) |
| v_t = self.action_out_proj(expert_h).to(dtype=torch.float32) |
|
|
| |
| target = u_t.to(dtype=v_t.dtype) |
| per_elem = (v_t - target) ** 2 |
|
|
| valid = suffix_valid[:, :, None] if suffix_valid is not None else torch.ones((int(bsz), int(horizon), 1), device=per_elem.device, dtype=torch.bool) |
| adm_for_groups = None |
| if action_dim_mask is not None: |
| adm = action_dim_mask |
| if not torch.is_tensor(adm): |
| adm = torch.as_tensor(adm) |
| adm = adm.to(device=per_elem.device, dtype=torch.bool) |
| if adm.ndim == 1: |
| adm = adm.view(1, -1).expand(int(bsz), -1) |
| elif adm.ndim == 2: |
| pass |
| elif adm.ndim == 3 and int(adm.shape[1]) == 1: |
| adm = adm[:, 0, :] |
| else: |
| adm = adm.reshape(int(bsz), -1) |
|
|
| if int(adm.shape[-1]) == int(per_elem.shape[-1]): |
| valid = valid & adm[:, None, :] |
| adm_for_groups = adm |
| else: |
| logger.warning_once( |
| "Ignoring action_dim_mask due to shape mismatch in PI FM loss: " |
| f"{tuple(adm.shape)=} vs expected (B,{int(per_elem.shape[-1])})." |
| ) |
|
|
| |
| |
| pos_mask = rot_mask = grip_mask = None |
| try: |
| a_dim = int(per_elem.shape[-1]) |
| pos_mask = torch.zeros((int(bsz), a_dim), device=per_elem.device, dtype=torch.bool) |
| rot_mask = torch.zeros_like(pos_mask) |
| grip_mask = torch.zeros_like(pos_mask) |
| for bi in range(int(bsz)): |
| g = self._action_group_indices(a_dim, action_dim_mask=(adm_for_groups[bi] if adm_for_groups is not None else None)) |
| if g["pos"]: |
| pos_mask[bi, g["pos"]] = True |
| if g["rot"]: |
| rot_mask[bi, g["rot"]] = True |
| if g["gripper"]: |
| grip_mask[bi, g["gripper"]] = True |
| group_mask = pos_mask | rot_mask | grip_mask |
| empty = ~group_mask.any(dim=1) |
| if empty.any(): |
| fallback = adm_for_groups if adm_for_groups is not None else torch.ones_like(group_mask) |
| group_mask[empty] = fallback[empty] |
| valid = valid & group_mask[:, None, :] |
| except Exception: |
| pos_mask = rot_mask = grip_mask = None |
|
|
| weight = valid.to(dtype=per_elem.dtype) |
| denom = weight.sum().clamp_min(1) |
| fm_loss = (per_elem * weight).sum() / denom |
|
|
| fm_loss_pos = None |
| fm_loss_rot = None |
| fm_loss_gripper = None |
| |
| try: |
| if pos_mask is not None: |
| pos_w = (weight * pos_mask[:, None, :].to(dtype=weight.dtype)).sum() |
| if bool((pos_w > 0).item()): |
| fm_loss_pos = (per_elem * weight * pos_mask[:, None, :].to(dtype=weight.dtype)).sum() / pos_w.clamp_min(1) |
| if rot_mask is not None: |
| rot_w = (weight * rot_mask[:, None, :].to(dtype=weight.dtype)).sum() |
| if bool((rot_w > 0).item()): |
| fm_loss_rot = (per_elem * weight * rot_mask[:, None, :].to(dtype=weight.dtype)).sum() / rot_w.clamp_min(1) |
| if grip_mask is not None: |
| grip_w = (weight * grip_mask[:, None, :].to(dtype=weight.dtype)).sum() |
| if bool((grip_w > 0).item()): |
| fm_loss_gripper = (per_elem * weight * grip_mask[:, None, :].to(dtype=weight.dtype)).sum() / grip_w.clamp_min(1) |
| except Exception: |
| fm_loss_pos = fm_loss_rot = fm_loss_gripper = None |
|
|
| return EO1InternVLPiFlowMatchingOutput( |
| loss=fm_loss, |
| fm_loss=fm_loss, |
| fm_loss_pos=fm_loss_pos, |
| fm_loss_rot=fm_loss_rot, |
| fm_loss_gripper=fm_loss_gripper, |
| ar_loss=None, |
| actions=v_t, |
| logits=None, |
| hidden_states=None, |
| attentions=None, |
| ) |
|
|
| @torch.no_grad() |
| def sample_actions( |
| self, |
| input_ids: torch.LongTensor | None = None, |
| attention_mask: torch.Tensor | None = None, |
| position_ids: torch.LongTensor | None = None, |
| pixel_values: torch.FloatTensor | None = None, |
| image_flags: torch.LongTensor | None = None, |
| num_steps: int | None = None, |
| noise: torch.Tensor | None = None, |
| **kwargs, |
| ) -> Tensor: |
| if input_ids is None: |
| raise ValueError("sample_actions requires input_ids.") |
| if attention_mask is None: |
| attention_mask = torch.ones_like(input_ids, dtype=torch.long, device=input_ids.device) |
|
|
| chunk_size = int(self.config.action_chunk_size) |
| max_action_dim = int(self.config.max_action_dim) |
| steps = int(num_steps) if num_steps is not None else int(self.config.num_denoise_steps) |
| dt = torch.tensor(-1.0 / max(1, steps), device=input_ids.device, dtype=torch.float32) |
|
|
| action_token_id = self.config.action_token_id |
| if action_token_id is None: |
| raise ValueError("config.action_token_id is None (tokenizer/model not initialized).") |
| action_pass_id = self.config.action_pass_id |
|
|
| noise_mask = input_ids == int(action_token_id) |
| pass_mask = (input_ids == int(action_pass_id)) if action_pass_id is not None else torch.zeros_like(noise_mask) |
| action_mask_token = noise_mask | pass_mask |
|
|
| bsz = int(input_ids.shape[0]) |
|
|
| suffix_starts = self._find_suffix_starts(action_mask_token, expected_horizon=chunk_size) |
| prefix_len = int(suffix_starts.max().item()) |
|
|
| prefix_ids = input_ids[:, :prefix_len] |
| prefix_am = attention_mask[:, :prefix_len].to(dtype=torch.bool, device=input_ids.device) |
| ar = torch.arange(prefix_len, device=input_ids.device) |
| prefix_valid = prefix_am & (ar[None, :] < suffix_starts[:, None]) |
|
|
| prefix_embeds = self.prefix_lm.get_input_embeddings()(prefix_ids).clone() |
| if pixel_values is not None: |
| prefix_embeds = self._replace_img_context_embeddings( |
| input_ids=prefix_ids, |
| inputs_embeds=prefix_embeds, |
| pixel_values=pixel_values, |
| image_flags=image_flags, |
| ) |
|
|
| prefix_attn = prefix_valid.to(dtype=torch.long) |
| prefix_out = self.prefix_lm.model( |
| inputs_embeds=prefix_embeds, |
| attention_mask=prefix_attn, |
| use_cache=True, |
| return_dict=True, |
| ) |
| prefix_pkv = prefix_out.past_key_values |
| prefix_kv_cache = [prefix_pkv[i] for i in range(len(prefix_pkv))] |
| prefix_kv_cache = self._select_prefix_kv_cache(prefix_kv_cache) |
|
|
| device = input_ids.device |
| if noise is None: |
| x_t = self.sample_noise((bsz, chunk_size, max_action_dim), device=device).to(dtype=torch.float32) |
| else: |
| x_t = noise.to(device=device, dtype=torch.float32) |
|
|
| suffix_valid = torch.ones((bsz, chunk_size), device=device, dtype=torch.bool) |
| pos_ids = suffix_starts[:, None] + torch.arange(chunk_size, device=device)[None, :] |
|
|
| for s in range(steps): |
| t_scalar = 1.0 + float(s) * float(dt) |
| time = torch.full((bsz,), t_scalar, device=device, dtype=torch.float32) |
|
|
| action_tokens = self.action_in_proj(x_t.to(dtype=self.action_in_proj.weight.dtype)) |
| adarms_cond = self._embed_time_cond(time, dtype=action_tokens.dtype, device=action_tokens.device) |
|
|
| expert_h = self.action_expert( |
| action_tokens, |
| prefix_kv_cache=prefix_kv_cache, |
| prefix_key_mask=prefix_valid, |
| position_ids=pos_ids, |
| adarms_cond=adarms_cond, |
| suffix_key_mask=suffix_valid, |
| ) |
| v_t = self.action_out_proj(expert_h).to(dtype=torch.float32) |
| x_t = x_t + dt * v_t |
|
|
| return x_t |
|
|
|
|
| EO1InternVLPiFlowMatchingModel.register_for_auto_class() |
|
|