SH-0 / modeling_eo1_internvl.py
jasonzhango's picture
Upload folder using huggingface_hub
6784e94 verified
# Copyright 2026 EO-Robotics Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import copy
import math
import os
from dataclasses import dataclass
from typing import Any
import torch
import torch.nn as nn
import torch.nn.functional as F # noqa: N812
from torch import Tensor
from transformers.modeling_outputs import ModelOutput
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import logging
from .configuration_eo1_internvl import EO1InternVLPiFlowMatchingConfig
logger = logging.get_logger(__name__)
def create_sinusoidal_pos_embedding(
time: torch.tensor,
dimension: int,
min_period: float = 4e-3,
max_period: float = 4.0,
device: str | torch.device = "cpu",
) -> Tensor:
"""Sine-cosine embedding for scalar time in [0,1]. Matches openpi `posemb_sincos` sensitivity."""
if dimension % 2 != 0:
raise ValueError(f"dimension ({dimension}) must be divisible by 2")
if time.ndim != 1:
raise ValueError("The time tensor is expected to be of shape `(batch_size, )`.")
fraction = torch.linspace(0.0, 1.0, dimension // 2, device=device)
period = min_period * (max_period / min_period) ** fraction
scaling_factor = 1.0 / period * 2 * math.pi
sin_input = scaling_factor[None, :] * time[:, None]
return torch.cat([torch.sin(sin_input), torch.cos(sin_input)], dim=1)
def _masked_fill_min(x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
"""Fill with dtype-min where mask is False. `mask` is broadcastable to `x`."""
return x.masked_fill(~mask, torch.finfo(x.dtype).min)
class AdaRMSNorm(nn.Module):
"""
Pi05-style AdaRMSNorm (openpi `gemma.RMSNorm` with `cond!=None`):
- RMS normalize in float32
- per-layer modulation = Linear(cond -> 3*D) initialized to zeros
- output = normed * (1 + scale) + shift
- returns gate for gated residual.
"""
def __init__(self, dim: int, *, eps: float = 1e-6):
super().__init__()
self.eps = float(eps)
self.modulation = nn.Linear(dim, dim * 3, bias=True)
nn.init.zeros_(self.modulation.weight)
nn.init.zeros_(self.modulation.bias)
def forward(self, x: torch.Tensor, cond: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
if cond is None:
raise ValueError("AdaRMSNorm requires `cond` (Pi05 mode).")
if cond.ndim != 2:
raise ValueError(f"cond must be (B,D), got {tuple(cond.shape)}")
if x.ndim != 3:
raise ValueError(f"x must be (B,T,D), got {tuple(x.shape)}")
if x.shape[0] != cond.shape[0]:
raise ValueError(f"Batch mismatch: {x.shape[0]=} vs {cond.shape[0]=}")
if x.shape[-1] != cond.shape[-1]:
raise ValueError(f"Dim mismatch: {x.shape[-1]=} vs {cond.shape[-1]=}")
x_dtype = x.dtype
x_f32 = x.float()
var = x_f32.pow(2).mean(dim=-1, keepdim=True)
normed = x_f32 * torch.rsqrt(var + self.eps)
mod = self.modulation(cond).to(dtype=x_f32.dtype)
scale, shift, gate = mod.chunk(3, dim=-1)
scale = scale[:, None, :]
shift = shift[:, None, :]
gate = gate[:, None, :]
out = normed * (1 + scale) + shift
return out.to(dtype=x_dtype), gate.to(dtype=x_dtype)
class Qwen2PiSelfAttention(nn.Module):
"""
Qwen2 attention variant for Pi05 action expert:
- queries from suffix tokens (action tokens)
- keys/values from concat(prefix_kv_cache, suffix_kv)
- uses full (non-causal) attention mask provided by caller.
"""
def __init__(self, qwen_config: Any, layer_idx: int):
super().__init__()
try:
from transformers.models.qwen2.modeling_qwen2 import apply_rotary_pos_emb, repeat_kv
except Exception as e: # pragma: no cover
raise ImportError("transformers qwen2 internals are required for eo_pi_internvl.") from e
self._apply_rotary_pos_emb = apply_rotary_pos_emb
self._repeat_kv = repeat_kv
self.layer_idx = int(layer_idx)
self.hidden_size = int(qwen_config.hidden_size)
self.num_heads = int(qwen_config.num_attention_heads)
self.num_kv_heads = int(qwen_config.num_key_value_heads)
self.num_kv_groups = self.num_heads // self.num_kv_heads
self.head_dim = int(getattr(qwen_config, "head_dim", self.hidden_size // self.num_heads))
self.scaling = self.head_dim**-0.5
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=True)
self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=True)
self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=True)
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
def forward(
self,
hidden_states: torch.Tensor,
*,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: torch.Tensor | None,
prefix_k: torch.Tensor,
prefix_v: torch.Tensor,
) -> torch.Tensor:
# hidden_states: (B, S, D)
bsz, seqlen, _ = hidden_states.shape
q = self.q_proj(hidden_states).view(bsz, seqlen, self.num_heads, self.head_dim).transpose(1, 2)
k = self.k_proj(hidden_states).view(bsz, seqlen, self.num_kv_heads, self.head_dim).transpose(1, 2)
v = self.v_proj(hidden_states).view(bsz, seqlen, self.num_kv_heads, self.head_dim).transpose(1, 2)
cos, sin = position_embeddings
q, k = self._apply_rotary_pos_emb(q, k, cos, sin)
if prefix_k.ndim != 4 or prefix_v.ndim != 4:
raise ValueError(f"prefix_k/v must be (B, n_kv, P, hd), got {tuple(prefix_k.shape)}, {tuple(prefix_v.shape)}")
if int(prefix_k.shape[0]) != bsz or int(prefix_v.shape[0]) != bsz:
raise ValueError("prefix_k/v batch mismatch.")
if int(prefix_k.shape[1]) != self.num_kv_heads or int(prefix_v.shape[1]) != self.num_kv_heads:
raise ValueError(
"prefix_k/v num_kv_heads mismatch: "
f"{int(prefix_k.shape[1])=} {int(prefix_v.shape[1])=} vs {self.num_kv_heads=}"
)
if int(prefix_k.shape[-1]) != self.head_dim or int(prefix_v.shape[-1]) != self.head_dim:
raise ValueError("prefix_k/v head_dim mismatch.")
k_all = torch.cat([prefix_k, k], dim=2) # (B, n_kv, P+S, hd)
v_all = torch.cat([prefix_v, v], dim=2)
k_all = self._repeat_kv(k_all, self.num_kv_groups) # (B, n_heads, P+S, hd)
v_all = self._repeat_kv(v_all, self.num_kv_groups)
# attention_mask: (B, 1, S, P+S) additive (0 or -inf), broadcast to heads
if attention_mask is not None:
if attention_mask.ndim != 4:
raise ValueError(f"attention_mask must be 4D (B,1,S,K), got {tuple(attention_mask.shape)}")
attn_mask = attention_mask.expand(bsz, self.num_heads, seqlen, k_all.shape[-2])
else:
attn_mask = None
attn_out = torch.nn.functional.scaled_dot_product_attention(
q, k_all, v_all, attn_mask=attn_mask, dropout_p=0.0, is_causal=False
) # (B, n_heads, S, hd)
attn_out = attn_out.transpose(1, 2).contiguous().view(bsz, seqlen, self.num_heads * self.head_dim)
return self.o_proj(attn_out)
class Qwen2PiMLP(nn.Module):
def __init__(self, qwen_config: Any):
super().__init__()
hidden = int(qwen_config.hidden_size)
inter = int(qwen_config.intermediate_size)
self.gate_proj = nn.Linear(hidden, inter, bias=False)
self.up_proj = nn.Linear(hidden, inter, bias=False)
self.down_proj = nn.Linear(inter, hidden, bias=False)
act_name = str(getattr(qwen_config, "hidden_act", "silu"))
if act_name != "silu":
logger.warning_once("EO Pi action expert: forcing SiLU hidden_act for MLP (got %s).", act_name)
self.act = nn.SiLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.down_proj(self.act(self.gate_proj(x)) * self.up_proj(x))
class Qwen2PiDecoderLayer(nn.Module):
def __init__(self, qwen_config: Any, layer_idx: int):
super().__init__()
eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6))
self.input_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps)
self.self_attn = Qwen2PiSelfAttention(qwen_config=qwen_config, layer_idx=layer_idx)
self.post_attention_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps)
self.mlp = Qwen2PiMLP(qwen_config=qwen_config)
def forward(
self,
hidden_states: torch.Tensor,
*,
adarms_cond: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: torch.Tensor | None,
prefix_k: torch.Tensor,
prefix_v: torch.Tensor,
) -> torch.Tensor:
residual = hidden_states
x, gate = self.input_layernorm(hidden_states, adarms_cond)
x = self.self_attn(
x,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
prefix_k=prefix_k,
prefix_v=prefix_v,
)
hidden_states = residual + x * gate
residual = hidden_states
x, gate = self.post_attention_layernorm(hidden_states, adarms_cond)
x = self.mlp(x)
hidden_states = residual + x * gate
return hidden_states
class Qwen2PiActionExpert(nn.Module):
def __init__(self, qwen_config: Any, *, init_from_qwen2_lm: nn.Module | None = None):
super().__init__()
try:
from transformers.models.qwen2.modeling_qwen2 import Qwen2RotaryEmbedding
except Exception as e: # pragma: no cover
raise ImportError("transformers qwen2 internals are required for eo_pi_internvl.") from e
self.config = qwen_config
self.num_layers = int(qwen_config.num_hidden_layers)
self.layers = nn.ModuleList([Qwen2PiDecoderLayer(qwen_config, i) for i in range(self.num_layers)])
self.rotary_emb = Qwen2RotaryEmbedding(qwen_config)
self.final_norm = AdaRMSNorm(int(qwen_config.hidden_size), eps=float(getattr(qwen_config, "rms_norm_eps", 1e-6)))
if init_from_qwen2_lm is not None:
self._init_from_qwen2_lm(init_from_qwen2_lm)
def _init_from_qwen2_lm(self, qwen2_lm: nn.Module):
"""
Copy attention/MLP weights from a Qwen2ForCausalLM (or Qwen2Model) into this expert.
AdaRMSNorm modulation stays zero-init to match Pi05.
"""
src_layers = None
if hasattr(qwen2_lm, "model") and hasattr(qwen2_lm.model, "layers"):
src_layers = qwen2_lm.model.layers
elif hasattr(qwen2_lm, "layers"):
src_layers = qwen2_lm.layers
if src_layers is None:
raise ValueError("Unsupported qwen2_lm: cannot locate `.model.layers`.")
if len(src_layers) != len(self.layers):
raise ValueError(f"Layer count mismatch: {len(src_layers)=} vs {len(self.layers)=}")
for dst, src in zip(self.layers, src_layers, strict=True):
# attention
dst.self_attn.q_proj.load_state_dict(src.self_attn.q_proj.state_dict())
dst.self_attn.k_proj.load_state_dict(src.self_attn.k_proj.state_dict())
dst.self_attn.v_proj.load_state_dict(src.self_attn.v_proj.state_dict())
dst.self_attn.o_proj.load_state_dict(src.self_attn.o_proj.state_dict())
# mlp
dst.mlp.gate_proj.load_state_dict(src.mlp.gate_proj.state_dict())
dst.mlp.up_proj.load_state_dict(src.mlp.up_proj.state_dict())
dst.mlp.down_proj.load_state_dict(src.mlp.down_proj.state_dict())
def forward(
self,
action_tokens: torch.Tensor,
*,
prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]],
prefix_key_mask: torch.Tensor,
position_ids: torch.Tensor,
adarms_cond: torch.Tensor,
suffix_key_mask: torch.Tensor | None = None,
) -> torch.Tensor:
"""
Args:
action_tokens: (B, S, D)
prefix_kv_cache: list[(k,v)] each (B, n_kv, P, hd) from InternVL prefix expert.
prefix_key_mask: (B, P) bool, True = valid prefix token.
position_ids: (B, S) positions for action tokens (prefix_len + [0..S-1]).
adarms_cond: (B, D) time conditioning vector.
suffix_key_mask: (B, S) bool, True = valid suffix token (optional; for padding).
"""
if action_tokens.ndim != 3:
raise ValueError(f"action_tokens must be (B,S,D), got {tuple(action_tokens.shape)}")
bsz, s_len, _ = action_tokens.shape
if prefix_key_mask.ndim != 2 or int(prefix_key_mask.shape[0]) != bsz:
raise ValueError(f"prefix_key_mask must be (B,P), got {tuple(prefix_key_mask.shape)}")
if position_ids.shape != (bsz, s_len):
raise ValueError(f"position_ids must be (B,S)={bsz,s_len}, got {tuple(position_ids.shape)}")
if len(prefix_kv_cache) == 0:
raise ValueError("prefix_kv_cache is empty.")
# (cos,sin) for suffix tokens only (RoPE positions already baked into prefix cache).
position_embeddings = self.rotary_emb(action_tokens, position_ids)
if suffix_key_mask is None:
suffix_key_mask = torch.ones((bsz, s_len), device=action_tokens.device, dtype=torch.bool)
if suffix_key_mask.shape != (bsz, s_len):
raise ValueError(f"suffix_key_mask must be (B,S), got {tuple(suffix_key_mask.shape)}")
# Build Pi05 action-block attention mask: suffix queries attend to (valid prefix keys) + (valid suffix keys) fully.
prefix_part = (suffix_key_mask[:, None, :, None] & prefix_key_mask[:, None, None, :]) # (B,1,S,P)
suffix_part = (suffix_key_mask[:, None, :, None] & suffix_key_mask[:, None, None, :]) # (B,1,S,S)
allow = torch.cat([prefix_part, suffix_part], dim=-1) # (B,1,S,P+S)
attn_mask = torch.zeros(
(bsz, 1, s_len, int(prefix_key_mask.shape[1]) + s_len),
device=action_tokens.device,
dtype=action_tokens.dtype,
)
attn_mask = _masked_fill_min(attn_mask, allow)
x = action_tokens
for layer_idx, layer in enumerate(self.layers):
if layer_idx >= len(prefix_kv_cache):
raise ValueError(
"prefix_kv_cache has fewer layers than action expert. "
f"{len(prefix_kv_cache)=} < {len(self.layers)=}"
)
pk, pv = prefix_kv_cache[layer_idx]
x = layer(
x,
adarms_cond=adarms_cond,
position_embeddings=position_embeddings,
attention_mask=attn_mask,
prefix_k=pk,
prefix_v=pv,
)
x, _ = self.final_norm(x, adarms_cond)
return x
class Qwen3HeadRMSNorm(nn.Module):
"""Qwen3-style RMSNorm used for `q_norm`/`k_norm` on per-head dim."""
def __init__(self, dim: int, *, eps: float = 1e-6):
super().__init__()
self.weight = nn.Parameter(torch.ones(dim))
self.eps = float(eps)
def forward(self, x: torch.Tensor) -> torch.Tensor:
dtype = x.dtype
x_f32 = x.float()
var = x_f32.pow(2).mean(dim=-1, keepdim=True)
x_norm = x_f32 * torch.rsqrt(var + self.eps)
return (self.weight * x_norm).to(dtype=dtype)
class Qwen3PiSelfAttention(nn.Module):
"""
Qwen3 attention variant for Pi05 action expert:
- queries from suffix tokens (action tokens)
- keys/values from concat(prefix_kv_cache, suffix_kv)
- uses full (non-causal) attention mask provided by caller.
"""
def __init__(self, qwen_config: Any, layer_idx: int):
super().__init__()
try:
from transformers.models.qwen3.modeling_qwen3 import apply_rotary_pos_emb, repeat_kv
except Exception as e: # pragma: no cover
raise ImportError("transformers qwen3 internals are required for eo_pi_internvl.") from e
self._apply_rotary_pos_emb = apply_rotary_pos_emb
self._repeat_kv = repeat_kv
self.layer_idx = int(layer_idx)
self.hidden_size = int(qwen_config.hidden_size)
self.num_heads = int(qwen_config.num_attention_heads)
self.num_kv_heads = int(qwen_config.num_key_value_heads)
self.num_kv_groups = self.num_heads // self.num_kv_heads
self.head_dim = int(getattr(qwen_config, "head_dim", self.hidden_size // self.num_heads))
attn_bias = bool(getattr(qwen_config, "attention_bias", False))
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=attn_bias)
self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=attn_bias)
self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=attn_bias)
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=attn_bias)
eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6))
self.q_norm = Qwen3HeadRMSNorm(self.head_dim, eps=eps)
self.k_norm = Qwen3HeadRMSNorm(self.head_dim, eps=eps)
def forward(
self,
hidden_states: torch.Tensor,
*,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: torch.Tensor | None,
prefix_k: torch.Tensor,
prefix_v: torch.Tensor,
) -> torch.Tensor:
bsz, seqlen, _ = hidden_states.shape
hidden_shape = (bsz, seqlen, -1, self.head_dim)
q = self.q_proj(hidden_states).view(hidden_shape)
k = self.k_proj(hidden_states).view(hidden_shape)
v = self.v_proj(hidden_states).view(hidden_shape)
q = self.q_norm(q).transpose(1, 2) # (B,n_heads,S,hd)
k = self.k_norm(k).transpose(1, 2) # (B,n_kv,S,hd)
v = v.transpose(1, 2) # (B,n_kv,S,hd)
cos, sin = position_embeddings
q, k = self._apply_rotary_pos_emb(q, k, cos, sin)
if prefix_k.ndim != 4 or prefix_v.ndim != 4:
raise ValueError(
f"prefix_k/v must be (B, n_kv, P, hd), got {tuple(prefix_k.shape)}, {tuple(prefix_v.shape)}"
)
if int(prefix_k.shape[0]) != bsz or int(prefix_v.shape[0]) != bsz:
raise ValueError("prefix_k/v batch mismatch.")
if int(prefix_k.shape[1]) != self.num_kv_heads or int(prefix_v.shape[1]) != self.num_kv_heads:
raise ValueError(
"prefix_k/v num_kv_heads mismatch: "
f"{int(prefix_k.shape[1])=} {int(prefix_v.shape[1])=} vs {self.num_kv_heads=}"
)
if int(prefix_k.shape[-1]) != self.head_dim or int(prefix_v.shape[-1]) != self.head_dim:
raise ValueError("prefix_k/v head_dim mismatch.")
k_all = torch.cat([prefix_k, k], dim=2) # (B, n_kv, P+S, hd)
v_all = torch.cat([prefix_v, v], dim=2)
k_all = self._repeat_kv(k_all, self.num_kv_groups) # (B, n_heads, P+S, hd)
v_all = self._repeat_kv(v_all, self.num_kv_groups)
if attention_mask is not None:
if attention_mask.ndim != 4:
raise ValueError(f"attention_mask must be 4D (B,1,S,K), got {tuple(attention_mask.shape)}")
attn_mask = attention_mask.expand(bsz, self.num_heads, seqlen, k_all.shape[-2])
else:
attn_mask = None
attn_out = torch.nn.functional.scaled_dot_product_attention(
q, k_all, v_all, attn_mask=attn_mask, dropout_p=0.0, is_causal=False
)
attn_out = attn_out.transpose(1, 2).contiguous().view(bsz, seqlen, self.num_heads * self.head_dim)
return self.o_proj(attn_out)
class Qwen3PiMLP(nn.Module):
def __init__(self, qwen_config: Any):
super().__init__()
hidden = int(qwen_config.hidden_size)
inter = int(qwen_config.intermediate_size)
self.gate_proj = nn.Linear(hidden, inter, bias=False)
self.up_proj = nn.Linear(hidden, inter, bias=False)
self.down_proj = nn.Linear(inter, hidden, bias=False)
act_name = str(getattr(qwen_config, "hidden_act", "silu"))
if act_name != "silu":
logger.warning_once("EO Pi action expert: forcing SiLU hidden_act for MLP (got %s).", act_name)
self.act = nn.SiLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.down_proj(self.act(self.gate_proj(x)) * self.up_proj(x))
class Qwen3PiDecoderLayer(nn.Module):
def __init__(self, qwen_config: Any, layer_idx: int):
super().__init__()
eps = float(getattr(qwen_config, "rms_norm_eps", 1e-6))
self.input_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps)
self.self_attn = Qwen3PiSelfAttention(qwen_config=qwen_config, layer_idx=layer_idx)
self.post_attention_layernorm = AdaRMSNorm(int(qwen_config.hidden_size), eps=eps)
self.mlp = Qwen3PiMLP(qwen_config=qwen_config)
def forward(
self,
hidden_states: torch.Tensor,
*,
adarms_cond: torch.Tensor,
position_embeddings: tuple[torch.Tensor, torch.Tensor],
attention_mask: torch.Tensor | None,
prefix_k: torch.Tensor,
prefix_v: torch.Tensor,
) -> torch.Tensor:
residual = hidden_states
x, gate = self.input_layernorm(hidden_states, adarms_cond)
x = self.self_attn(
x,
position_embeddings=position_embeddings,
attention_mask=attention_mask,
prefix_k=prefix_k,
prefix_v=prefix_v,
)
hidden_states = residual + x * gate
residual = hidden_states
x, gate = self.post_attention_layernorm(hidden_states, adarms_cond)
x = self.mlp(x)
hidden_states = residual + x * gate
return hidden_states
class Qwen3PiActionExpert(nn.Module):
def __init__(self, qwen_config: Any, *, init_from_qwen3_lm: nn.Module | None = None):
super().__init__()
try:
from transformers.models.qwen3.modeling_qwen3 import Qwen3RotaryEmbedding
except Exception as e: # pragma: no cover
raise ImportError("transformers qwen3 internals are required for eo_pi_internvl.") from e
self.config = qwen_config
self.num_layers = int(qwen_config.num_hidden_layers)
self.layers = nn.ModuleList([Qwen3PiDecoderLayer(qwen_config, i) for i in range(self.num_layers)])
self.rotary_emb = Qwen3RotaryEmbedding(qwen_config)
self.final_norm = AdaRMSNorm(int(qwen_config.hidden_size), eps=float(getattr(qwen_config, "rms_norm_eps", 1e-6)))
if init_from_qwen3_lm is not None:
self._init_from_qwen3_lm(init_from_qwen3_lm)
def _init_from_qwen3_lm(self, qwen3_lm: nn.Module):
"""
Copy attention/MLP weights from a Qwen3ForCausalLM (or Qwen3Model) into this expert.
AdaRMSNorm modulation stays zero-init to match Pi05.
"""
src_layers = None
if hasattr(qwen3_lm, "model") and hasattr(qwen3_lm.model, "layers"):
src_layers = qwen3_lm.model.layers
elif hasattr(qwen3_lm, "layers"):
src_layers = qwen3_lm.layers
if src_layers is None:
raise ValueError("Unsupported qwen3_lm: cannot locate `.model.layers`.")
if len(src_layers) != len(self.layers):
raise ValueError(f"Layer count mismatch: {len(src_layers)=} vs {len(self.layers)=}")
for dst, src in zip(self.layers, src_layers, strict=True):
dst.self_attn.q_proj.load_state_dict(src.self_attn.q_proj.state_dict())
dst.self_attn.k_proj.load_state_dict(src.self_attn.k_proj.state_dict())
dst.self_attn.v_proj.load_state_dict(src.self_attn.v_proj.state_dict())
dst.self_attn.o_proj.load_state_dict(src.self_attn.o_proj.state_dict())
# head norms
if hasattr(src.self_attn, "q_norm") and hasattr(dst.self_attn, "q_norm"):
dst.self_attn.q_norm.weight.data.copy_(src.self_attn.q_norm.weight.data)
if hasattr(src.self_attn, "k_norm") and hasattr(dst.self_attn, "k_norm"):
dst.self_attn.k_norm.weight.data.copy_(src.self_attn.k_norm.weight.data)
# mlp
dst.mlp.gate_proj.load_state_dict(src.mlp.gate_proj.state_dict())
dst.mlp.up_proj.load_state_dict(src.mlp.up_proj.state_dict())
dst.mlp.down_proj.load_state_dict(src.mlp.down_proj.state_dict())
def forward(
self,
action_tokens: torch.Tensor,
*,
prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]],
prefix_key_mask: torch.Tensor,
position_ids: torch.Tensor,
adarms_cond: torch.Tensor,
suffix_key_mask: torch.Tensor | None = None,
) -> torch.Tensor:
if action_tokens.ndim != 3:
raise ValueError(f"action_tokens must be (B,S,D), got {tuple(action_tokens.shape)}")
bsz, s_len, _ = action_tokens.shape
if prefix_key_mask.ndim != 2 or int(prefix_key_mask.shape[0]) != bsz:
raise ValueError(f"prefix_key_mask must be (B,P), got {tuple(prefix_key_mask.shape)}")
if position_ids.shape != (bsz, s_len):
raise ValueError(f"position_ids must be (B,S)={bsz,s_len}, got {tuple(position_ids.shape)}")
if len(prefix_kv_cache) == 0:
raise ValueError("prefix_kv_cache is empty.")
position_embeddings = self.rotary_emb(action_tokens, position_ids)
if suffix_key_mask is None:
suffix_key_mask = torch.ones((bsz, s_len), device=action_tokens.device, dtype=torch.bool)
if suffix_key_mask.shape != (bsz, s_len):
raise ValueError(f"suffix_key_mask must be (B,S), got {tuple(suffix_key_mask.shape)}")
prefix_part = (suffix_key_mask[:, None, :, None] & prefix_key_mask[:, None, None, :]) # (B,1,S,P)
suffix_part = (suffix_key_mask[:, None, :, None] & suffix_key_mask[:, None, None, :]) # (B,1,S,S)
allow = torch.cat([prefix_part, suffix_part], dim=-1) # (B,1,S,P+S)
attn_mask = torch.zeros(
(bsz, 1, s_len, int(prefix_key_mask.shape[1]) + s_len),
device=action_tokens.device,
dtype=action_tokens.dtype,
)
attn_mask = _masked_fill_min(attn_mask, allow)
x = action_tokens
for layer_idx, layer in enumerate(self.layers):
if layer_idx >= len(prefix_kv_cache):
raise ValueError(
"prefix_kv_cache has fewer layers than action expert. "
f"{len(prefix_kv_cache)=} < {len(self.layers)=}"
)
pk, pv = prefix_kv_cache[layer_idx]
x = layer(
x,
adarms_cond=adarms_cond,
position_embeddings=position_embeddings,
attention_mask=attn_mask,
prefix_k=pk,
prefix_v=pv,
)
x, _ = self.final_norm(x, adarms_cond)
return x
@dataclass
class EO1InternVLPiFlowMatchingOutput(ModelOutput):
loss: torch.FloatTensor | None = None
fm_loss: torch.FloatTensor | None = None
fm_loss_pos: torch.FloatTensor | None = None
fm_loss_rot: torch.FloatTensor | None = None
fm_loss_gripper: torch.FloatTensor | None = None
ar_loss: torch.FloatTensor | None = None
actions: torch.FloatTensor | None = None
logits: torch.FloatTensor | None = None
hidden_states: tuple[torch.FloatTensor] | None = None
attentions: tuple[torch.FloatTensor] | None = None
def count_params(module, trainable_only=False):
ps = module.parameters()
if trainable_only:
ps = [p for p in ps if p.requires_grad]
return sum(p.numel() for p in ps)
class EO1InternVLPiFlowMatchingModel(PreTrainedModel):
"""EO1 action model with InternVL prefix expert + Pi05-style (Qwen2/Qwen3) action expert (AdaRMSNorm timestep)."""
config_class = EO1InternVLPiFlowMatchingConfig
supports_gradient_checkpointing = True
def __init__(
self,
config: EO1InternVLPiFlowMatchingConfig,
internvl_backbone: nn.Module,
action_expert: nn.Module | None = None,
):
super().__init__(config)
self.internvl_backbone = internvl_backbone
# InternVL uses a HF causal LM as `.language_model` (e.g. Qwen2ForCausalLM).
if not hasattr(self.internvl_backbone, "language_model"):
raise ValueError("internvl_backbone must have `.language_model`.")
# Do NOT register an alias module (e.g. `self.prefix_lm = self.internvl_backbone.language_model`).
# Registering both creates shared tensors under different state_dict keys, which safetensors refuses
# to save unless they are declared as tied weights. Use the property `prefix_lm` instead.
# ------------------------- Build action expert config (Pi05-style: smaller expert) -------------------------
prefix_cfg = self.prefix_lm.config
cfg_name = prefix_cfg.__class__.__name__
expert_cfg = copy.deepcopy(prefix_cfg)
if getattr(config, "expert_hidden_size", None) is not None:
expert_cfg.hidden_size = int(config.expert_hidden_size)
if getattr(config, "expert_intermediate_size", None) is not None:
expert_cfg.intermediate_size = int(config.expert_intermediate_size)
if getattr(config, "expert_num_attention_heads", None) is not None:
expert_cfg.num_attention_heads = int(config.expert_num_attention_heads)
if getattr(config, "expert_num_hidden_layers", None) is not None:
expert_cfg.num_hidden_layers = int(config.expert_num_hidden_layers)
# Keep head geometry aligned with prefix kv-cache.
if int(getattr(expert_cfg, "num_key_value_heads", -1)) != int(getattr(prefix_cfg, "num_key_value_heads", -2)):
raise ValueError(
"To reuse prefix KV-cache, expert and prefix must share num_key_value_heads. "
f"{int(getattr(prefix_cfg, 'num_key_value_heads'))=} vs {int(getattr(expert_cfg, 'num_key_value_heads'))=}."
)
if int(getattr(expert_cfg, "head_dim", -1)) != int(getattr(prefix_cfg, "head_dim", -2)):
raise ValueError(
"To reuse prefix KV-cache, expert and prefix must share head_dim. "
f"{int(getattr(prefix_cfg, 'head_dim'))=} vs {int(getattr(expert_cfg, 'head_dim'))=}."
)
if int(expert_cfg.num_attention_heads) % int(expert_cfg.num_key_value_heads) != 0:
raise ValueError(
"expert_num_attention_heads must be divisible by num_key_value_heads. "
f"{int(expert_cfg.num_attention_heads)=} {int(expert_cfg.num_key_value_heads)=}."
)
# Keep layer_types length consistent (Qwen3Config defines it).
if hasattr(expert_cfg, "layer_types") and isinstance(getattr(expert_cfg, "layer_types"), list):
if len(expert_cfg.layer_types) != int(expert_cfg.num_hidden_layers):
expert_cfg.layer_types = ["full_attention"] * int(expert_cfg.num_hidden_layers)
self._expert_hidden_size = int(expert_cfg.hidden_size)
self._expert_num_layers = int(expert_cfg.num_hidden_layers)
self._prefix_num_layers = int(getattr(prefix_cfg, "num_hidden_layers", self._expert_num_layers))
if self._expert_num_layers > self._prefix_num_layers:
raise ValueError(
"expert_num_hidden_layers cannot exceed prefix LM layers when using prefix KV-cache. "
f"{self._expert_num_layers=} > {self._prefix_num_layers=}."
)
mapping = str(getattr(config, "expert_layer_mapping", "last")).strip().lower()
if mapping == "last":
start = self._prefix_num_layers - self._expert_num_layers
self._prefix_kv_layer_indices = list(range(start, self._prefix_num_layers))
elif mapping == "first":
self._prefix_kv_layer_indices = list(range(self._expert_num_layers))
else:
raise ValueError(f"Unsupported expert_layer_mapping={mapping!r} (expected 'last' or 'first').")
max_action_dim = int(config.max_action_dim)
# Pi05: action embeddings do NOT concatenate timestep.
self.action_in_proj = nn.Linear(max_action_dim, self._expert_hidden_size)
self.action_out_proj = nn.Linear(self._expert_hidden_size, max_action_dim)
# Pi05: timestep is injected via AdaRMSNorm in the action expert.
self.time_mlp_in = nn.Linear(self._expert_hidden_size, self._expert_hidden_size)
self.time_mlp_out = nn.Linear(self._expert_hidden_size, self._expert_hidden_size)
if action_expert is not None:
self.action_expert = action_expert
else:
# Default: build an action expert (Qwen2/Qwen3) with its own (possibly smaller) config.
init_from = self.prefix_lm if bool(getattr(self.config, "expert_init_from_backbone", False)) else None
try:
if cfg_name == "Qwen2Config":
self.action_expert = Qwen2PiActionExpert(expert_cfg, init_from_qwen2_lm=init_from)
elif cfg_name == "Qwen3Config":
self.action_expert = Qwen3PiActionExpert(expert_cfg, init_from_qwen3_lm=init_from)
else:
raise NotImplementedError(
"eo_pi_internvl currently supports only Qwen2/Qwen3 LMs for action expert. "
f"Got: {cfg_name}"
)
except Exception as e:
raise RuntimeError(
"Failed to build/initialize action expert. If you set `expert_init_from_backbone=True`, "
"make sure expert_* hyperparams exactly match the prefix LM shapes, or set it to False "
"for Pi05-style random init."
) from e
n_all = count_params(self.action_expert)
n_train = count_params(self.action_expert, trainable_only=True)
print(f"action_expert params: {n_all/1e6:.2f}M (trainable {n_train/1e6:.2f}M)")
self.post_init()
@property
def prefix_lm(self) -> nn.Module:
# A convenience accessor for the InternVL backbone LM used as the prefix model.
return self.internvl_backbone.language_model
@staticmethod
def _action_group_indices(action_dim: int, *, action_dim_mask: torch.Tensor | None = None) -> dict[str, list[int]]:
"""
Best-effort split of action dims into position/rotation/gripper groups.
Supports both common layouts:
1) Compact (single-arm): [xyz(3), rotvec(3), gripper(1)] -> 7 dims (or bimanual 14 dims).
2) EO unified action encoding (see `dataset/action_encoding.py`):
left: 0:3 pos, 3:6 rotvec (or 3:9 r6d), 9 gripper
right: 10:13 pos, 13:16 rotvec (or 13:19 r6d), 19 gripper
Rotation repr is controlled via env var `EO_ACTION_ROT_REPR` (default rotvec).
"""
d = int(action_dim)
if d <= 0:
return {"pos": [], "rot": [], "gripper": [], "other": []}
rot_repr = os.environ.get("EO_ACTION_ROT_REPR", "rotvec").strip().lower()
rot_is_r6d = rot_repr in ("r6d", "rot6d", "6d")
m_any = None
if action_dim_mask is not None and torch.is_tensor(action_dim_mask):
m = action_dim_mask.detach()
if m.ndim == 1:
m_any = m.to(torch.bool)
elif m.ndim == 2:
m_any = m.to(torch.bool).any(dim=0)
elif m.ndim == 3 and int(m.shape[1]) == 1:
m_any = m[:, 0, :].to(torch.bool).any(dim=0)
else:
m_any = m.reshape(-1, m.shape[-1]).to(torch.bool).any(dim=0)
if int(m_any.numel()) != d:
if int(m_any.numel()) > d:
m_any = m_any[:d]
else:
pad = torch.zeros((d - int(m_any.numel()),), dtype=torch.bool, device=m_any.device)
m_any = torch.cat([m_any, pad], dim=0)
# Infer effective dim span from mask if available (common when original action dim < max_action_dim).
eff = d
if m_any is not None and bool(m_any.any().item()):
eff = int(torch.nonzero(m_any, as_tuple=False).max().item()) + 1
pos: list[int] = []
rot: list[int] = []
gripper: list[int] = []
# Compact layout: 7D single-arm / 14D bimanual.
if eff in (7, 14):
arm_offsets = [0] if eff == 7 else [0, 7]
for off in arm_offsets:
pos.extend([off + i for i in range(0, 3) if off + i < d])
rot.extend([off + i for i in range(3, 6) if off + i < d])
g = off + 6
if g < d:
gripper.append(g)
used = set(pos) | set(rot) | set(gripper)
other = [i for i in range(d) if i not in used]
return {"pos": pos, "rot": rot, "gripper": gripper, "other": other}
# EO unified layout (10 dims per arm slot, supports bimanual at offset 10).
right_active = (d >= 20)
if m_any is not None and int(m_any.numel()) >= 20:
right_active = bool(m_any[10:20].any().item())
# Left arm.
pos.extend([i for i in range(0, min(3, d))])
rot.extend([i for i in range(3, min(6, d))])
if rot_is_r6d:
rot.extend([i for i in range(6, min(9, d))])
if 9 < d:
gripper.append(9)
# Right arm (offset 10) when active.
if right_active and d >= 20:
pos.extend([i for i in range(10, min(13, d))])
rot.extend([i for i in range(13, min(16, d))])
if rot_is_r6d:
rot.extend([i for i in range(16, min(19, d))])
if 19 < d:
gripper.append(19)
used = set(pos) | set(rot) | set(gripper)
other = [i for i in range(d) if i not in used]
return {"pos": pos, "rot": rot, "gripper": gripper, "other": other}
# ------------------------- EO1 Flow Matching utils -------------------------
def sample_noise(self, shape, device):
return torch.normal(mean=0.0, std=1.0, size=shape, dtype=torch.float32, device=device)
def sample_time(self, bsize, device):
beta_dist = torch.distributions.Beta(concentration1=1.5, concentration0=1.0)
time_beta = beta_dist.sample((bsize,)).to(device=device, dtype=torch.float32)
return time_beta * 0.999 + 0.001
def _embed_time_cond(self, timestep: torch.Tensor, *, dtype: torch.dtype, device: torch.device) -> torch.Tensor:
hidden = int(getattr(self, "_expert_hidden_size", self.prefix_lm.config.hidden_size))
t_emb = create_sinusoidal_pos_embedding(timestep, hidden, device=device).to(dtype=dtype)
t_emb = self.time_mlp_in(t_emb)
t_emb = F.silu(t_emb)
t_emb = self.time_mlp_out(t_emb)
t_emb = F.silu(t_emb)
return t_emb
def _select_prefix_kv_cache(self, prefix_kv_cache: list[tuple[torch.Tensor, torch.Tensor]]) -> list[tuple[torch.Tensor, torch.Tensor]]:
if not hasattr(self, "_prefix_kv_layer_indices"):
return prefix_kv_cache
idx = list(getattr(self, "_prefix_kv_layer_indices"))
if not idx:
return prefix_kv_cache
if max(idx) >= len(prefix_kv_cache):
raise ValueError(
"Prefix KV cache shorter than expected. "
f"{len(prefix_kv_cache)=} < {max(idx)+1=}."
)
return [prefix_kv_cache[i] for i in idx]
def _replace_img_context_embeddings(
self,
input_ids: torch.LongTensor,
inputs_embeds: torch.FloatTensor,
pixel_values: torch.FloatTensor,
image_flags: torch.LongTensor | None,
) -> torch.FloatTensor:
img_context_token_id = self.config.img_context_token_id
if img_context_token_id is None:
raise ValueError("config.img_context_token_id is None (tokenizer/model not initialized).")
try:
vision_dtype = next(self.internvl_backbone.vision_model.parameters()).dtype
except Exception:
vision_dtype = inputs_embeds.dtype
pixel_values = pixel_values.to(device=inputs_embeds.device, dtype=vision_dtype)
vit_embeds = self.internvl_backbone.extract_feature(pixel_values) # (n_img, n_token, hidden)
if image_flags is not None:
image_flags = image_flags.squeeze(-1)
vit_embeds = vit_embeds[image_flags == 1]
bsz, _, hidden = inputs_embeds.shape
selected = input_ids == int(img_context_token_id) # (B,S)
n_ctx = int(selected.sum().item())
if n_ctx == 0:
return inputs_embeds
vit_flat = vit_embeds.reshape(-1, hidden)
if vit_flat.shape[0] < n_ctx:
raise ValueError(f"IMG_CONTEXT mismatch: need {n_ctx} embeddings, got {vit_flat.shape[0]}.")
mask3 = selected.unsqueeze(-1).expand_as(inputs_embeds)
src = vit_flat[:n_ctx].to(device=inputs_embeds.device, dtype=inputs_embeds.dtype).reshape(-1)
return inputs_embeds.masked_scatter(mask3, src)
@staticmethod
def _find_suffix_starts(action_mask_token: torch.Tensor, *, expected_horizon: int | None = None) -> torch.Tensor:
if action_mask_token.ndim != 2:
raise ValueError(f"action_mask_token must be (B,S), got {tuple(action_mask_token.shape)}")
bsz = int(action_mask_token.shape[0])
starts = torch.empty((bsz,), dtype=torch.long, device=action_mask_token.device)
for b in range(bsz):
pos = torch.nonzero(action_mask_token[b], as_tuple=False).squeeze(-1)
if int(pos.numel()) == 0:
raise ValueError(f"Expected at least 1 action token per sample, got 0 for batch {b}.")
if expected_horizon is not None and int(pos.numel()) not in (1, int(expected_horizon)):
raise ValueError(
f"Expected 1 or {int(expected_horizon)} action tokens per sample, got {int(pos.numel())} for batch {b}."
)
starts[b] = pos.min()
return starts
# ------------------------- Forward (train) -------------------------
def forward(
self,
input_ids: torch.LongTensor | None = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None, # noqa: ARG002 - recomputed for pi05 mask
inputs_embeds: torch.FloatTensor | None = None, # noqa: ARG002 - use InternVL embeddings
labels: torch.LongTensor | None = None, # noqa: ARG002 - Pi05 does not train AR loss here
pixel_values: torch.FloatTensor | None = None,
image_flags: torch.LongTensor | None = None,
states: torch.Tensor | None = None, # noqa: ARG002 - Pi05: state should be discrete in text
actions: torch.Tensor | None = None,
action_is_pad: torch.Tensor | None = None,
action_dim_mask: torch.Tensor | None = None,
use_cache: bool | None = None, # noqa: ARG002
output_attentions: bool | None = None, # noqa: ARG002
output_hidden_states: bool | None = None, # noqa: ARG002
**kwargs,
) -> EO1InternVLPiFlowMatchingOutput:
if input_ids is None:
raise ValueError("Pi model requires `input_ids`.")
if actions is None:
raise ValueError("Pi model forward requires `actions` (flow-matching).")
if attention_mask is None:
attention_mask = torch.ones_like(input_ids, dtype=torch.long, device=input_ids.device)
action_token_id = self.config.action_token_id
if action_token_id is None:
raise ValueError("config.action_token_id is None (tokenizer/model not initialized).")
action_pass_id = self.config.action_pass_id
noise_mask = input_ids == int(action_token_id)
pass_mask = (input_ids == int(action_pass_id)) if action_pass_id is not None else torch.zeros_like(noise_mask)
action_mask_token = noise_mask | pass_mask # (B, S)
bsz, horizon, act_dim = actions.shape
suffix_starts = self._find_suffix_starts(action_mask_token, expected_horizon=int(horizon)) # (B,)
prefix_len = int(suffix_starts.max().item())
# ---------------- Prefix expert (InternVL LM) ----------------
prefix_ids = input_ids[:, :prefix_len]
prefix_am = attention_mask[:, :prefix_len].to(dtype=torch.bool, device=input_ids.device)
ar = torch.arange(prefix_len, device=input_ids.device)
prefix_valid = prefix_am & (ar[None, :] < suffix_starts[:, None])
prefix_embeds = self.prefix_lm.get_input_embeddings()(prefix_ids).clone()
if pixel_values is not None:
prefix_embeds = self._replace_img_context_embeddings(
input_ids=prefix_ids,
inputs_embeds=prefix_embeds,
pixel_values=pixel_values,
image_flags=image_flags,
)
prefix_attn = prefix_valid.to(dtype=torch.long)
prefix_out = self.prefix_lm.model(
inputs_embeds=prefix_embeds,
attention_mask=prefix_attn,
use_cache=True,
return_dict=True,
)
prefix_pkv = prefix_out.past_key_values
prefix_kv_cache = [prefix_pkv[i] for i in range(len(prefix_pkv))]
prefix_kv_cache = self._select_prefix_kv_cache(prefix_kv_cache)
# ---------------- Flow Matching ----------------
actions_f32 = actions.to(dtype=torch.float32, device=input_ids.device)
time = self.sample_time(int(bsz), input_ids.device) # (B,)
noise = self.sample_noise(actions_f32.shape, input_ids.device)
time_expanded = time[:, None, None]
x_t = time_expanded * noise + (1 - time_expanded) * actions_f32
u_t = noise - actions_f32
# Action tokens: no timestep concatenation in Pi05.
action_tokens = self.action_in_proj(x_t.to(dtype=self.action_in_proj.weight.dtype)) # (B,H,D)
# AdaRMSNorm conditioning vector (Pi05).
adarms_cond = self._embed_time_cond(time, dtype=action_tokens.dtype, device=action_tokens.device)
# Suffix RoPE positions follow the *per-sample* prefix length (suffix_starts).
pos_ids = suffix_starts[:, None] + torch.arange(horizon, device=input_ids.device)[None, :]
suffix_valid = torch.ones((int(bsz), int(horizon)), device=input_ids.device, dtype=torch.bool)
if action_is_pad is not None:
suffix_valid = suffix_valid & (~action_is_pad.to(device=input_ids.device, dtype=torch.bool))
expert_h = self.action_expert(
action_tokens,
prefix_kv_cache=prefix_kv_cache,
prefix_key_mask=prefix_valid,
position_ids=pos_ids,
adarms_cond=adarms_cond,
suffix_key_mask=suffix_valid,
)
v_t = self.action_out_proj(expert_h).to(dtype=torch.float32) # (B,H,A)
# Loss: average over *valid elements* (step mask + action_dim_mask).
target = u_t.to(dtype=v_t.dtype)
per_elem = (v_t - target) ** 2 # (B,H,A)
valid = suffix_valid[:, :, None] if suffix_valid is not None else torch.ones((int(bsz), int(horizon), 1), device=per_elem.device, dtype=torch.bool)
adm_for_groups = None
if action_dim_mask is not None:
adm = action_dim_mask
if not torch.is_tensor(adm):
adm = torch.as_tensor(adm)
adm = adm.to(device=per_elem.device, dtype=torch.bool)
if adm.ndim == 1:
adm = adm.view(1, -1).expand(int(bsz), -1)
elif adm.ndim == 2:
pass
elif adm.ndim == 3 and int(adm.shape[1]) == 1:
adm = adm[:, 0, :]
else:
adm = adm.reshape(int(bsz), -1)
if int(adm.shape[-1]) == int(per_elem.shape[-1]):
valid = valid & adm[:, None, :]
adm_for_groups = adm
else:
logger.warning_once(
"Ignoring action_dim_mask due to shape mismatch in PI FM loss: "
f"{tuple(adm.shape)=} vs expected (B,{int(per_elem.shape[-1])})."
)
# Exclude padding/unused action dims ("other") from FM loss.
# We only train on {pos, rot, gripper} dims so `fm_loss` matches the meaningful action space.
pos_mask = rot_mask = grip_mask = None
try:
a_dim = int(per_elem.shape[-1])
pos_mask = torch.zeros((int(bsz), a_dim), device=per_elem.device, dtype=torch.bool)
rot_mask = torch.zeros_like(pos_mask)
grip_mask = torch.zeros_like(pos_mask)
for bi in range(int(bsz)):
g = self._action_group_indices(a_dim, action_dim_mask=(adm_for_groups[bi] if adm_for_groups is not None else None))
if g["pos"]:
pos_mask[bi, g["pos"]] = True
if g["rot"]:
rot_mask[bi, g["rot"]] = True
if g["gripper"]:
grip_mask[bi, g["gripper"]] = True
group_mask = pos_mask | rot_mask | grip_mask # (B,A)
empty = ~group_mask.any(dim=1)
if empty.any():
fallback = adm_for_groups if adm_for_groups is not None else torch.ones_like(group_mask)
group_mask[empty] = fallback[empty]
valid = valid & group_mask[:, None, :]
except Exception:
pos_mask = rot_mask = grip_mask = None
weight = valid.to(dtype=per_elem.dtype)
denom = weight.sum().clamp_min(1)
fm_loss = (per_elem * weight).sum() / denom
fm_loss_pos = None
fm_loss_rot = None
fm_loss_gripper = None
# Decompose FM loss by action groups (auxiliary logs only; never crash training for these).
try:
if pos_mask is not None:
pos_w = (weight * pos_mask[:, None, :].to(dtype=weight.dtype)).sum()
if bool((pos_w > 0).item()):
fm_loss_pos = (per_elem * weight * pos_mask[:, None, :].to(dtype=weight.dtype)).sum() / pos_w.clamp_min(1)
if rot_mask is not None:
rot_w = (weight * rot_mask[:, None, :].to(dtype=weight.dtype)).sum()
if bool((rot_w > 0).item()):
fm_loss_rot = (per_elem * weight * rot_mask[:, None, :].to(dtype=weight.dtype)).sum() / rot_w.clamp_min(1)
if grip_mask is not None:
grip_w = (weight * grip_mask[:, None, :].to(dtype=weight.dtype)).sum()
if bool((grip_w > 0).item()):
fm_loss_gripper = (per_elem * weight * grip_mask[:, None, :].to(dtype=weight.dtype)).sum() / grip_w.clamp_min(1)
except Exception:
fm_loss_pos = fm_loss_rot = fm_loss_gripper = None
return EO1InternVLPiFlowMatchingOutput(
loss=fm_loss,
fm_loss=fm_loss,
fm_loss_pos=fm_loss_pos,
fm_loss_rot=fm_loss_rot,
fm_loss_gripper=fm_loss_gripper,
ar_loss=None,
actions=v_t,
logits=None,
hidden_states=None,
attentions=None,
)
@torch.no_grad()
def sample_actions(
self,
input_ids: torch.LongTensor | None = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None, # noqa: ARG002
pixel_values: torch.FloatTensor | None = None,
image_flags: torch.LongTensor | None = None,
num_steps: int | None = None,
noise: torch.Tensor | None = None,
**kwargs,
) -> Tensor:
if input_ids is None:
raise ValueError("sample_actions requires input_ids.")
if attention_mask is None:
attention_mask = torch.ones_like(input_ids, dtype=torch.long, device=input_ids.device)
chunk_size = int(self.config.action_chunk_size)
max_action_dim = int(self.config.max_action_dim)
steps = int(num_steps) if num_steps is not None else int(self.config.num_denoise_steps)
dt = torch.tensor(-1.0 / max(1, steps), device=input_ids.device, dtype=torch.float32)
action_token_id = self.config.action_token_id
if action_token_id is None:
raise ValueError("config.action_token_id is None (tokenizer/model not initialized).")
action_pass_id = self.config.action_pass_id
noise_mask = input_ids == int(action_token_id)
pass_mask = (input_ids == int(action_pass_id)) if action_pass_id is not None else torch.zeros_like(noise_mask)
action_mask_token = noise_mask | pass_mask
bsz = int(input_ids.shape[0])
suffix_starts = self._find_suffix_starts(action_mask_token, expected_horizon=chunk_size)
prefix_len = int(suffix_starts.max().item())
prefix_ids = input_ids[:, :prefix_len]
prefix_am = attention_mask[:, :prefix_len].to(dtype=torch.bool, device=input_ids.device)
ar = torch.arange(prefix_len, device=input_ids.device)
prefix_valid = prefix_am & (ar[None, :] < suffix_starts[:, None])
prefix_embeds = self.prefix_lm.get_input_embeddings()(prefix_ids).clone()
if pixel_values is not None:
prefix_embeds = self._replace_img_context_embeddings(
input_ids=prefix_ids,
inputs_embeds=prefix_embeds,
pixel_values=pixel_values,
image_flags=image_flags,
)
prefix_attn = prefix_valid.to(dtype=torch.long)
prefix_out = self.prefix_lm.model(
inputs_embeds=prefix_embeds,
attention_mask=prefix_attn,
use_cache=True,
return_dict=True,
)
prefix_pkv = prefix_out.past_key_values
prefix_kv_cache = [prefix_pkv[i] for i in range(len(prefix_pkv))]
prefix_kv_cache = self._select_prefix_kv_cache(prefix_kv_cache)
device = input_ids.device
if noise is None:
x_t = self.sample_noise((bsz, chunk_size, max_action_dim), device=device).to(dtype=torch.float32)
else:
x_t = noise.to(device=device, dtype=torch.float32)
suffix_valid = torch.ones((bsz, chunk_size), device=device, dtype=torch.bool)
pos_ids = suffix_starts[:, None] + torch.arange(chunk_size, device=device)[None, :]
for s in range(steps):
t_scalar = 1.0 + float(s) * float(dt)
time = torch.full((bsz,), t_scalar, device=device, dtype=torch.float32)
action_tokens = self.action_in_proj(x_t.to(dtype=self.action_in_proj.weight.dtype))
adarms_cond = self._embed_time_cond(time, dtype=action_tokens.dtype, device=action_tokens.device)
expert_h = self.action_expert(
action_tokens,
prefix_kv_cache=prefix_kv_cache,
prefix_key_mask=prefix_valid,
position_ids=pos_ids,
adarms_cond=adarms_cond,
suffix_key_mask=suffix_valid,
)
v_t = self.action_out_proj(expert_h).to(dtype=torch.float32)
x_t = x_t + dt * v_t
return x_t
EO1InternVLPiFlowMatchingModel.register_for_auto_class()