moondream3-preview-hf / modeling_moondream3.py
CalamitousFelicitousness's picture
Turn off hardcoded debug
772c6e8 verified
raw
history blame
54.3 kB
# coding=utf-8
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Optional, Tuple, Union
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
from transformers.activations import ACT2FN
from transformers.cache_utils import Cache, DynamicCache
from transformers.masking_utils import create_causal_mask
from dataclasses import dataclass
from transformers.modeling_outputs import (
BaseModelOutputWithPast,
CausalLMOutputWithPast,
)
from transformers.processing_utils import Unpack
from transformers.modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update
from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
from transformers.generation import GenerationMixin
from transformers.generation.utils import GenerateDecoderOnlyOutput
from transformers.utils import logging, TransformersKwargs
from .configuration_moondream3 import Moondream3Config, Moondream3TextConfig, Moondream3VisionConfig, Moondream3RegionConfig
logger = logging.get_logger(__name__)
_CONFIG_FOR_DOC = "Moondream3Config"
import torch
DEBUG=False
def apply_rotary_pos_emb(
q: torch.Tensor, # [B, H, L, D]
k: torch.Tensor, # [B, H, L, D]
cos: torch.Tensor, # [B, L, rot_dim]
sin: torch.Tensor, # [B, L, rot_dim]
rot_dim: int = 32,
):
"""
Apply rotary position embeddings to query and key tensors.
Args:
q: Query tensor [batch, num_heads, seq_len, head_dim]
k: Key tensor [batch, num_heads, seq_len, head_dim]
cos: Cosine frequencies [batch, seq_len, rot_dim]
sin: Sine frequencies [batch, seq_len, rot_dim]
rot_dim: Number of dimensions to apply rotation to (default: 32)
Returns:
Tuple of (rotated_q, rotated_k)
"""
def apply_rope(x):
dtype = x.dtype
x = x.to(torch.float64)
x_rot, x_pass = x[..., :rot_dim], x[..., rot_dim:]
d_q = x_rot.shape[-1] // 2
xq_r, xq_i = x_rot[..., :d_q], x_rot[..., d_q:]
xq_out_r = xq_r * cos - xq_i * sin
xq_out_i = xq_r * sin + xq_i * cos
xq_out = torch.stack((xq_out_r, xq_out_i), dim=-1).flatten(-2)
return torch.cat([xq_out, x_pass], dim=-1)
return apply_rope(q), apply_rope(k)
class Moondream3RotaryEmbedding(nn.Module):
inv_freq: torch.Tensor
def __init__(self, config: Moondream3Config, device=None):
super().__init__()
self.max_seq_len_cached = config.max_position_embeddings
self.original_max_seq_len = config.max_position_embeddings
self.config = config
self.rope_type = self.config.rope_parameters["rope_type"]
rope_init_fn: Callable = self.compute_default_rope_parameters
if self.rope_type != "default":
rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type]
inv_freq, self.attention_scaling = rope_init_fn(self.config, device)
self.register_buffer("inv_freq", inv_freq, persistent=False)
self.original_inv_freq = inv_freq
@staticmethod
def compute_default_rope_parameters(
config: Optional[Moondream3Config] = None,
device: Optional["torch.device"] = None,
seq_len: Optional[int] = None,
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies according to the original RoPE implementation
"""
base = config.rope_parameters["rope_theta"] # Should be 1500000.0 to match original
dim = getattr(config, "head_dim", None) or config.hidden_size // config.num_attention_heads
dim //= 2
attention_factor = 1.0
# Compute the inverse frequencies - matches your original formula
inv_freq = 1.0 / (
base ** (torch.arange(0, dim, 2, dtype=torch.float64)[: (dim // 2)] / dim)
)
if device is not None:
inv_freq = inv_freq.to(device=device)
return inv_freq, attention_factor
@torch.no_grad()
@dynamic_rope_update
def forward(self, x, position_ids):
# inv_freq shape: [dim//2]
# position_ids shape: [batch_size, seq_len]
inv_freq_expanded = self.inv_freq[None, :, None].to(torch.float64).expand(position_ids.shape[0], -1, 1).to(x.device)
position_ids_expanded = position_ids[:, None, :].to(torch.float64)
freqs = (inv_freq_expanded.to(torch.float64) @ position_ids_expanded.to(torch.float64)).transpose(1, 2)
cfreqs = torch.exp(1j * freqs).unsqueeze(1).expand(-1, self.config.num_attention_heads, -1, -1)
return cfreqs.real, cfreqs.imag
class Moondream3Attention(nn.Module):
def __init__(self, config: Moondream3TextConfig | Moondream3VisionConfig, layer_idx: Optional[int] = None, use_tau: bool = True):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.hidden_size = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = getattr(config, "head_dim", self.hidden_size // self.num_heads)
self.num_key_value_heads = getattr(config, "num_key_value_heads", self.num_heads)
attention_bias = config.attention_bias
self.attention_dropout = config.attention_dropout
# Initialize parameters based on config type
if isinstance(config, Moondream3TextConfig):
self.is_causal = True
elif isinstance(config, Moondream3VisionConfig): # Moondream3VisionConfig
self.is_causal = False
else:
raise TypeError(f"Unsupported config type: {type(config)}")
self.num_key_value_groups = self.num_heads // self.num_key_value_heads
self.use_tau = use_tau
if (self.head_dim * self.num_heads) != self.hidden_size:
raise ValueError(
f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
f" and `num_heads`: {self.num_heads})."
)
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=attention_bias)
self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=attention_bias)
self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=attention_bias)
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=attention_bias)
# Tau parameters for token-level attention (from original Moondream) - only for text model
if self.use_tau:
# In original, tau weights are (n_heads, qkv_dim) where qkv_dim is the combined QKV dimension
qkv_dim = self.num_heads * self.head_dim + 2 * self.num_key_value_heads * self.head_dim
self.tau_wq = nn.Linear(qkv_dim, self.num_heads, bias=False)
self.tau_wv = nn.Linear(qkv_dim, self.num_heads, bias=False)
self.tau_alpha = nn.Parameter(torch.empty(self.num_heads))
def forward(
self,
hidden_states: torch.Tensor,
position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
output_attentions: bool = False,
use_cache: bool = False,
cache_position: Optional[torch.LongTensor] = None,
**kwargs,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Cache]]:
input_shape = hidden_states.shape[:-1]
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(hidden_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_input_states")
bsz, q_len, _ = hidden_states.size()
# Get qkv combined for tau (before splitting)
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
if self.use_tau:
qkv_out = torch.cat([query_states, key_states, value_states], dim=-1)
tok_feat = F.gelu(qkv_out)
tok_q = torch.tanh(self.tau_wq(tok_feat)).permute(0, 2, 1) # (bsz, n_heads, seq_len)
tok_v = torch.tanh(self.tau_wv(tok_feat)).permute(0, 2, 1) # (bsz, n_heads, seq_len)
pos = position_ids.to(tok_q.dtype) + 1
alpha = self.tau_alpha.to(tok_q.dtype)
tau_pos = 1 + (torch.sigmoid(alpha[None, :, None] * pos[:, None, :].log()) - 0.5) # (n_heads, seq_len)
tau_q = (tok_q + tau_pos).unsqueeze(-1) # (bsz, n_heads, seq_len, 1)
tau_v = (tok_v + tau_pos).unsqueeze(-1) # (bsz, n_heads, seq_len, 1)
query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(value_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_pre_tau_value")
if self.use_tau:
query_states = query_states * tau_q
if self.num_key_value_groups > 1:
tau_v_repeated = tau_v.repeat(1, self.num_key_value_groups, 1, 1)[:, :self.num_key_value_heads, :, :]
else:
tau_v_repeated = tau_v
value_states = value_states * tau_v_repeated
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(value_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_post_tau_value")
torch.save(key_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_pre_rope_key")
cos, sin = None, None
if position_embeddings is not None:
cos, sin = position_embeddings
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(cos, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_cos")
torch.save(sin, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_sin")
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(key_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_post_rope_key")
query_states, key_states = query_states.to(value_states.dtype), key_states.to(value_states.dtype)
if past_key_values is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs)
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(key_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_post_cache_key")
torch.save(attention_mask, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_attn_mask")
query_states = query_states.contiguous()
key_states = key_states.contiguous()
value_states = value_states.contiguous()
attn_output, attn_weights = ALL_ATTENTION_FUNCTIONS["sdpa"](
self,
query_states,
key_states,
value_states,
attention_mask,
dropout=0.0 if not self.training else self.attention_dropout,
)
attn_output = attn_output.reshape(*input_shape, -1).contiguous()
attn_output = self.o_proj(attn_output)
if isinstance(self.config, Moondream3TextConfig) and DEBUG:
torch.save(attn_output, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_attn_out")
return attn_output, attn_weights
class Moondream3MLP(nn.Module):
def __init__(self, hidden_size: int, intermediate_size: int, hidden_act: str = "gelu_pytorch_tanh", out_size: int | None = None, gated: bool = False, bias: bool = True):
super().__init__()
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.out_size = self.hidden_size if out_size is None else out_size
self.hidden_act = hidden_act
self.gated = gated
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=bias)
self.down_proj = nn.Linear(self.intermediate_size, self.out_size, bias=bias)
self.gate_proj = None
if self.gated:
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=bias)
self.act_fn = ACT2FN[self.hidden_act]
def forward(self, x) -> torch.Tensor:
if self.gated:
# separate up and gate causes precision issues
combined_weight = torch.cat([self.up_proj.weight, self.gate_proj.weight], dim=0)
h_full = F.linear(x, combined_weight)
h, g = h_full.chunk(2, dim=-1)
x = self.act_fn(h) * (g + 1)
else:
x = self.act_fn(self.up_proj(x))
return self.down_proj(x)
class Moondream3SparseMoeBlock(nn.Module):
def __init__(self, config: Moondream3TextConfig, layer_idx = None):
super().__init__()
self.layer_idx = layer_idx
self.hidden_size = config.hidden_size
self.moe_intermediate_size = config.moe_intermediate_size
self.num_experts = config.num_experts
self.top_k = config.num_experts_per_tok
self.gate = nn.Linear(self.hidden_size, self.num_experts, bias=True)
self.experts = nn.ModuleList([Moondream3MLP(hidden_size=self.hidden_size, intermediate_size=self.moe_intermediate_size, hidden_act="gelu", gated=True, bias=False) for _ in range(self.num_experts)])
def forward(self, hidden_states: torch.Tensor, cache_position=None) -> Tuple[torch.Tensor, torch.Tensor]:
batch_size, sequence_length, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
router_logits: torch.Tensor = self.gate(hidden_states)
routing_weights, selected_experts = torch.topk(router_logits, self.top_k, dim=-1)
routing_weights = F.softmax(routing_weights, dim=-1, dtype=torch.float32)
routing_weights = routing_weights.to(hidden_states.dtype)
final_hidden_states = torch.zeros(
(batch_size * sequence_length, hidden_dim), dtype=hidden_states.dtype, device=hidden_states.device
)
for expert_idx in range(self.num_experts):
expert_layer = self.experts[expert_idx]
top_x, idx = (selected_experts == expert_idx).nonzero(as_tuple=True)
if top_x.shape[0] == 0:
continue
current_state = hidden_states[None, top_x].reshape(-1, hidden_dim)
# torch.save(current_state, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_e{expert_idx}")
current_hidden_states = expert_layer(current_state) * routing_weights[top_x, idx, None]
# torch.save(current_hidden_states, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_e{expert_idx}")
final_hidden_states.index_add_(0, top_x, current_hidden_states.to(hidden_states.dtype))
final_hidden_states = final_hidden_states.reshape(batch_size, sequence_length, hidden_dim)
return final_hidden_states, router_logits
class Moondream3DecoderLayer(nn.Module):
def __init__(self, config: Moondream3TextConfig, layer_idx: int):
super().__init__()
self.layer_idx = layer_idx
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.self_attn = Moondream3Attention(config, layer_idx, use_tau=True)
self.input_layernorm = nn.LayerNorm(config.hidden_size, eps=config.rms_norm_eps)
self.is_moe_layer = layer_idx >= config.moe_start_layer
if self.is_moe_layer:
self.mlp = Moondream3SparseMoeBlock(config, layer_idx=layer_idx)
else:
self.mlp = Moondream3MLP(self.hidden_size, self.intermediate_size)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
output_attentions: bool = False,
output_router_logits: bool = False,
use_cache: bool = False,
cache_position: Optional[torch.LongTensor] = None,
position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None,
**kwargs,
) -> Tuple:
residual = hidden_states
# Apply layer norm like original
l_in = self.input_layernorm(hidden_states)
if DEBUG:
torch.save(l_in, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_ln_out")
# Attention
hidden_states_attn, self_attn_weights = self.self_attn(
hidden_states=l_in,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
output_attentions=output_attentions,
use_cache=use_cache,
cache_position=cache_position,
position_embeddings=position_embeddings,
**kwargs
)
# MLP
if self.is_moe_layer:
hidden_states_mlp, router_logits = self.mlp(l_in, cache_position=cache_position)
else:
hidden_states_mlp = self.mlp(l_in)
router_logits = None
if DEBUG:
torch.save(hidden_states_mlp, f"dbg/hf_l{self.layer_idx}_c{cache_position[-1].item()}_mlp_out")
# Add both attention and MLP to residual like original
hidden_states = residual + hidden_states_attn + hidden_states_mlp
outputs = (hidden_states,)
if output_attentions:
outputs += (self_attn_weights,)
if output_router_logits:
outputs += (router_logits,)
return outputs
class Moondream3PreTrainedModel(PreTrainedModel):
config_class = Moondream3Config
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = ["Moondream3DecoderLayer", "Moondream3SparseMoeBlock"]
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn_2 = True
_supports_sdpa = True
_supports_cache_class = True
def _init_weights(self, module):
# Use text_config initializer_range if available, otherwise use default
if hasattr(self.config, 'text_config') and hasattr(self.config.text_config, 'initializer_range'):
std = self.config.text_config.initializer_range
elif hasattr(self.config, 'initializer_range'):
std = self.config.initializer_range
else:
std = 0.02 # Default initialization range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
class Moondream3TextModel(Moondream3PreTrainedModel):
config_class = Moondream3TextConfig
def __init__(self, config: Moondream3TextConfig):
super().__init__(config)
self.padding_idx = config.pad_token_id if hasattr(config, "pad_token_id") else 0
self.vocab_size = config.vocab_size
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
self.layers = nn.ModuleList(
[Moondream3DecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
)
self.norm = nn.LayerNorm(config.hidden_size, eps=config.rms_norm_eps)
self.rotary_emb = Moondream3RotaryEmbedding(config=config)
self.gradient_checkpointing = False
self.post_init()
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
output_router_logits: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_router_logits = (
output_router_logits if output_router_logits is not None else self.config.output_router_logits
)
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time, and must specify either one")
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
hidden_states = inputs_embeds
batch_size = hidden_states.shape[0]
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
if use_cache and past_key_values is None:
past_key_values = DynamicCache()
if cache_position is None:
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
cache_position = torch.arange(
past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device
)
if position_ids is None:
position_ids = cache_position.unsqueeze(0)
position_embeddings = self.rotary_emb(hidden_states, position_ids=position_ids)
all_hidden_states = () if output_hidden_states else None
all_self_attns = () if output_attentions else None
all_router_logits = () if output_router_logits else None
for decoder_layer in self.layers:
if output_hidden_states:
all_hidden_states += (hidden_states,)
if self.gradient_checkpointing and self.training:
layer_outputs = self._gradient_checkpointing_func(
decoder_layer.__call__,
hidden_states,
attention_mask,
position_ids,
past_key_values,
output_attentions,
output_router_logits,
use_cache,
cache_position,
position_embeddings
)
else:
layer_outputs = decoder_layer(
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
output_attentions=output_attentions,
output_router_logits=output_router_logits,
use_cache=use_cache,
cache_position=cache_position,
position_embeddings=position_embeddings
)
hidden_states = layer_outputs[0]
if output_attentions:
all_self_attns += (layer_outputs[1],)
if output_router_logits and layer_outputs[-1] is not None:
all_router_logits += (layer_outputs[-1],)
if output_hidden_states:
all_hidden_states += (hidden_states,)
next_cache = None
if use_cache:
next_cache = past_key_values
if not return_dict:
return tuple(
v
for v in [hidden_states, next_cache, all_hidden_states, all_self_attns, all_router_logits]
if v is not None
)
return BaseModelOutputWithPast(
last_hidden_state=hidden_states,
past_key_values=next_cache,
hidden_states=all_hidden_states,
attentions=all_self_attns,
)
class Moondream3VisionPatchEmbeddings(nn.Module):
def __init__(self, config: Moondream3VisionConfig):
super().__init__()
self.patch_size = config.patch_size
self.num_channels = config.in_channels
self.hidden_size = config.hidden_size
self.crop_size = config.crop_size
self.patch_size = config.patch_size
self.grid_size = self.crop_size // self.patch_size
self.num_patches = self.grid_size * self.grid_size
self.projection = nn.Linear(self.patch_size * self.patch_size * self.num_channels, self.hidden_size, bias=True)
self.position_embeddings = nn.Parameter(torch.zeros(1, self.num_patches, config.hidden_size))
def forward(self, pixel_values: torch.Tensor) -> torch.Tensor:
B, C, H, W = pixel_values.shape
P1 = P2 = self.patch_size
x = pixel_values.reshape(B, C, H // P1, P1, W // P2, P2)
x = x.permute(0, 2, 4, 1, 3, 5)
x = x.reshape(B, (H // P1) * (W // P2), C * P1 * P2)
x = self.projection(x)
return x + self.position_embeddings
class Moondream3VisionEncoderLayer(nn.Module):
def __init__(self, config: Moondream3VisionConfig, layer_idx: int):
super().__init__()
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.layer_idx = layer_idx
self.self_attn = Moondream3Attention(config, layer_idx=self.layer_idx, use_tau=False)
self.input_layernorm = nn.LayerNorm(config.hidden_size, eps=1e-5)
self.post_attention_layernorm = nn.LayerNorm(config.hidden_size, eps=1e-5)
self.mlp = Moondream3MLP(hidden_size=self.hidden_size, intermediate_size=self.intermediate_size)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
hidden_states, _ = self.self_attn(hidden_states=hidden_states)
hidden_states = residual + hidden_states
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = residual + hidden_states
return hidden_states
class Moondream3VisionModel(Moondream3PreTrainedModel):
config_class = Moondream3VisionConfig
main_input_name = "pixel_values"
_no_split_modules = ["Moondream3VisionEncoderLayer"]
def __init__(self, config: Moondream3VisionConfig):
super().__init__(config)
self.config = config
self.hidden_size = self.config.hidden_size
self.num_hidden_layers = self.config.num_hidden_layers
self.proj_inner_dim = self.config.proj_inner_dim
self.proj_out_dim = self.config.proj_out_dim
self.embeddings = Moondream3VisionPatchEmbeddings(config)
self.layers = nn.ModuleList([Moondream3VisionEncoderLayer(config,layer_idx) for layer_idx in range(self.num_hidden_layers)])
self.post_layernorm = nn.LayerNorm(self.hidden_size, eps=1e-5)
self.vision_projection = Moondream3MLP(hidden_size=self.hidden_size * 2, intermediate_size=self.proj_inner_dim, out_size=self.proj_out_dim)
self.gradient_checkpointing = False
self.post_init()
def _reconstruct_from_crops(
self,
crops: torch.Tensor,
tiling: tuple[int, int],
overlap_margin: int = 4,
patch_size: int = 14,
) -> torch.Tensor:
"""
Reconstruct the original image from overlapping crops into a single seamless image.
Takes a list of overlapping image crops along with their positional metadata and
reconstructs them into a single coherent image by carefully stitching together
non-overlapping regions. Handles both numpy arrays and PyTorch tensors.
Args:
crops: List of image crops as numpy arrays or PyTorch tensors with shape
(H,W,C)
tiling: Tuple of (height,width) indicating crop grid layout
patch_size: Size in pixels of each patch, default 14
overlap_margin: Number of overlapping patches on each edge, default 4
Returns:
Reconstructed image as numpy array or PyTorch tensor matching input type,
with shape (H,W,C) where H,W are the original image dimensions
"""
if isinstance(tiling, torch.Tensor):
tiling_h, tiling_w = tiling[0].item(), tiling[1].item()
else:
tiling_h, tiling_w = tiling
tiling_h, tiling_w = int(tiling_h), int(tiling_w)
crop_height, crop_width = crops[0].shape[:2]
margin_pixels = overlap_margin * patch_size
# Calculate output size (only adding margins once)
output_h = (crop_height - 2 * margin_pixels) * tiling_h + 2 * margin_pixels
output_w = (crop_width - 2 * margin_pixels) * tiling_w + 2 * margin_pixels
reconstructed = torch.zeros(
(output_h, output_w, crops[0].shape[2]),
device=crops[0].device,
dtype=crops[0].dtype,
)
for i, crop in enumerate(crops):
tile_y = i // tiling_w
tile_x = i % tiling_w
# For each tile, determine which part to keep
# Keep left margin only for first column
x_start = 0 if tile_x == 0 else margin_pixels
# Keep right margin only for last column
x_end = crop_width if tile_x == tiling_w - 1 else crop_width - margin_pixels
# Keep top margin only for first row
y_start = 0 if tile_y == 0 else margin_pixels
# Keep bottom margin only for last row
y_end = crop_height if tile_y == tiling_h - 1 else crop_height - margin_pixels
# Calculate where this piece belongs in the output
out_x = tile_x * (crop_width - 2 * margin_pixels)
out_y = tile_y * (crop_height - 2 * margin_pixels)
# Place the piece
reconstructed[
out_y + y_start : out_y + y_end, out_x + x_start : out_x + x_end
] = crop[y_start:y_end, x_start:x_end]
return reconstructed
def forward(
self,
pixel_values: torch.FloatTensor,
tiling: Tuple[int,int],
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
batch_size, num_crops = pixel_values.shape[:2]
# flatten batch_size and num_crops into same dim
pixel_values = pixel_values.view(-1, *pixel_values.shape[2:])
hidden_states: torch.Tensor = self.embeddings(pixel_values)
all_hidden_states = () if output_hidden_states else None
all_attentions = () if output_attentions else None
for encoder_layer in self.layers:
if output_hidden_states and all_hidden_states is not None:
all_hidden_states += (hidden_states,)
if self.gradient_checkpointing and self.training:
layer_outputs = self._gradient_checkpointing_func(encoder_layer.__call__, hidden_states)
else:
layer_outputs = encoder_layer(hidden_states)
hidden_states = layer_outputs
hidden_states = self.post_layernorm(hidden_states)
# B, _, _
# back out into batch_size, num_crops
hidden_states = hidden_states.view(batch_size, num_crops, *hidden_states.shape[1:])
outputs = []
for b in range(batch_size):
hs = hidden_states[b]
t = tiling[b]
global_features = hs[0]
local_features = hs[1:].view(
-1,
self.num_hidden_layers,
self.num_hidden_layers,
self.hidden_size,
)
reconstructed = self._reconstruct_from_crops(
local_features,
t,
patch_size=1,
overlap_margin=self.config.overlap_margin,
)
reconstructed = reconstructed.permute(2, 0, 1)
reconstructed = F.adaptive_avg_pool2d(
reconstructed, output_size=(self.num_hidden_layers, self.num_hidden_layers)
)
reconstructed = reconstructed.permute(1, 2, 0).view(729, self.hidden_size)
final_features = torch.cat([global_features, reconstructed], dim=-1)
outputs.append(final_features)
output = torch.stack(outputs, 0)
hidden_states = self.vision_projection(output)
if output_hidden_states and all_hidden_states is not None:
all_hidden_states += (hidden_states,)
if not return_dict:
return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=hidden_states,
hidden_states=all_hidden_states,
attentions=all_attentions,
)
class Moondream3RegionEncoder(nn.Module):
def __init__(self, config: Moondream3RegionConfig):
super().__init__()
self.coord_encoder = nn.Linear(config.coord_feat_dim, config.hidden_size)
self.size_encoder = nn.Linear(config.size_feat_dim, config.hidden_size)
coord_freq = torch.randn(config.coord_feat_dim // 2, 1) * 10.0
size_freq = torch.randn(config.size_feat_dim // 2, 2) * 10.0
self.register_buffer("coord_freq", coord_freq.T)
self.register_buffer("size_freq", size_freq.T)
def fourier_features(self, x: torch.Tensor, w: torch.Tensor) -> torch.Tensor:
x_proj = 2 * torch.pi * x @ w
return torch.cat([x_proj.cos(), x_proj.sin()], dim=-1)
def encode_coordinate(self, coord: torch.Tensor) -> torch.Tensor:
fourier_features = self.fourier_features(coord, self.coord_freq)
return self.coord_encoder(fourier_features)
def encode_size(self, size: torch.Tensor) -> torch.Tensor:
fourier_features = self.fourier_features(size, self.size_freq)
return self.size_encoder(fourier_features)
class Moondream3RegionDecoder(nn.Module):
def __init__(self, config: Moondream3RegionConfig):
super().__init__()
self.coord_decoder = nn.Linear(config.hidden_size, config.coord_out_dim)
self.size_decoder = nn.Linear(config.hidden_size, config.size_out_dim)
def decode_coordinate(self, hidden_state: torch.Tensor) -> torch.Tensor:
return self.coord_decoder(hidden_state)
def decode_size(self, hidden_state: torch.Tensor) -> torch.Tensor:
return self.size_decoder(hidden_state).view(hidden_state.shape[0],2,-1)
class Moondream3Model(Moondream3PreTrainedModel):
def __init__(self, config: Moondream3Config):
super().__init__(config)
self.text_model = Moondream3TextModel(config.text_config)
self.vision_model = Moondream3VisionModel(config.vision_config)
self.vocab_size = config.text_config.vocab_size
self.region_encoder = Moondream3RegionEncoder(config.region_config)
self.region_decoder = Moondream3RegionDecoder(config.region_config)
self.post_init()
def get_input_embeddings(self):
return self.text_model.embed_tokens
def set_input_embeddings(self, value):
self.text_model.embed_tokens = value
def set_decoder(self, decoder):
self.text_model = decoder
def get_decoder(self):
return self.text_model
def forward(
self,
input_ids: torch.LongTensor = None,
pixel_values: torch.FloatTensor = None,
tiling: Tuple[int,int] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: int = 0,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if (input_ids is not None) == (inputs_embeds is not None):
raise ValueError("Provide exactly one of input_ids or inputs_embeds.")
if not ((pixel_values is not None) ^ (tiling is None)):
raise ValueError("You must specify both pixel_values and tiling")
# Case A: inputs_embeds provided -> assume it already contains BOS+image+text in correct order.
if inputs_embeds is not None and (pixel_values is not None or tiling is not None):
raise ValueError(
"When inputs_embeds is provided, do not pass pixel_values/tiling; "
"inputs_embeds must already include BOS+image(+text)."
)
if inputs_embeds is None:
inputs_embeds: torch.Tensor = self.text_model.embed_tokens(input_ids)
if use_cache and past_key_values is None:
past_key_values = DynamicCache(config=self.config)
if cache_position is None:
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
cache_position: torch.Tensor = torch.arange(
past_seen_tokens, past_seen_tokens, device=inputs_embeds.device
)
if position_ids is None:
position_ids = cache_position.unsqueeze(0)
def image_mask_function(batch_idx: int, head_idx: int, q_idx: int, kv_idx: int):
# set all up to `self.config.vision_config.prefix_len` to true
return kv_idx <= q_idx
if pixel_values is not None:
# Vision embeds
pixel_values = pixel_values.to(dtype=self.vision_model.embeddings.projection.weight.dtype)
image_embeds = self.vision_model(pixel_values, tiling=tiling)["last_hidden_state"] # [B,P,D]
prefix = self.text_model.embed_tokens(
torch.full((input_ids.shape[0], 1), self.config.text_config.bos_token_id, dtype=input_ids.dtype, device=input_ids.device)
)
embeds = torch.cat([prefix, image_embeds], dim=1)
cache_pos = torch.arange(embeds.shape[-2], device=embeds.device)
pos = cache_pos.unsqueeze(0).expand(embeds.shape[0],-1)
attn_mask = torch.full(
(embeds.shape[0], 1, embeds.shape[-2], pos.shape[-1]),
True,
dtype=torch.bool,
device=embeds.device,
)
outputs = self.text_model(
input_ids=None,
attention_mask=attn_mask,
position_ids=pos,
past_key_values=past_key_values,
inputs_embeds=embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_pos,
)
attn_mask = create_causal_mask(
config=self.config,
input_embeds=inputs_embeds,
attention_mask=torch.cat([torch.ones(attention_mask.shape[0], cache_position[-1] + 1 - attention_mask.shape[-1], device=attention_mask.device, dtype=attention_mask.dtype), attention_mask], dim=-1),
cache_position=cache_position,
past_key_values=past_key_values,
position_ids=position_ids,
and_mask_function=image_mask_function
)
outputs = self.text_model(
input_ids=None,
attention_mask=attn_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position,
)
if not return_dict:
return tuple(v for v in [
outputs.last_hidden_state,
getattr(outputs, "past_key_values", None),
getattr(outputs, "hidden_states", None),
getattr(outputs, "attentions", None),
] if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=outputs.last_hidden_state,
past_key_values=getattr(outputs, "past_key_values", None),
hidden_states=getattr(outputs, "hidden_states", None),
attentions=getattr(outputs, "attentions", None),
)
@dataclass
class Moondream3GenerateOutput(GenerateDecoderOnlyOutput):
objects: Optional[list[dict[str,float]]] = None
class Moondream3ForConditionalGeneration(Moondream3PreTrainedModel, GenerationMixin):
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config: Moondream3Config):
super().__init__(config)
self.objects = None
self.model = Moondream3Model(config)
self.vocab_size = config.text_config.vocab_size
self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=True)
self.post_init()
def get_input_embeddings(self):
return self.model.text_model.embed_tokens
def set_input_embeddings(self, value):
self.model.text_model.embed_tokens = value
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def set_decoder(self, decoder):
self.model.text_model = decoder
def get_decoder(self):
return self.model.text_model
def _prepare_generated_length(
self,
generation_config,
**kwargs,
):
generation_config = super()._prepare_generated_length(generation_config, **kwargs)
generation_config.max_length += 730
return generation_config
def forward(
self,
input_ids: torch.LongTensor = None,
pixel_values: torch.FloatTensor = None,
tiling: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Cache] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
logits_to_keep: int = 0,
**kwargs: Unpack[TransformersKwargs],
) -> Union[Tuple, CausalLMOutputWithPast]:
if pixel_values is not None and inputs_embeds is None:
position_ids += self.config.vision_config.prefix_len
cache_position += self.config.vision_config.prefix_len
# Get hidden states from the base model (it already builds the multimodal prefix)
model_outputs = self.model(
input_ids=input_ids,
pixel_values=pixel_values,
tiling=tiling,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
labels=None,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position,
logits_to_keep=logits_to_keep,
)
hidden_states = model_outputs.last_hidden_state # [B, T, D]
# Compute logits; only keep the tail if requested
if isinstance(logits_to_keep, int) and logits_to_keep > 0:
hs = hidden_states[:, -logits_to_keep:, :]
elif isinstance(logits_to_keep, slice):
hs = hidden_states[:, logits_to_keep, :]
else:
hs = hidden_states
hs = self.model.text_model.norm(hs)
logits = self.lm_head(hs) # [B, T', V]
pred = torch.argmax(logits, dim=-1)
pos_ids = position_ids[:,-1:] + 1
cache_pos = cache_position[-1:] + 1
mask = torch.ones(
hidden_states.shape[0], 1, device=self.device, dtype=torch.long
)
while torch.any(pred == 5):
batch_mask = (pred[:, -1] == 5)
hidden_states = hidden_states[:, -1:, :]
x_logits = self.model.region_decoder.decode_coordinate(hidden_states)
x_center = torch.argmax(x_logits, dim=-1) / x_logits.size(-1)
next_embeds = self.model.region_encoder.encode_coordinate(x_center.to(x_logits.dtype)).unsqueeze(1)
model_outputs = self.model(
input_ids=None,
pixel_values=None,
tiling=None,
attention_mask=mask,
position_ids=pos_ids,
past_key_values=past_key_values,
inputs_embeds=next_embeds,
labels=None,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_pos,
logits_to_keep=logits_to_keep,
)
hidden_states = model_outputs.last_hidden_state # [B, T, D]
y_logits = self.model.region_decoder.decode_coordinate(hidden_states)
y_center = torch.argmax(y_logits, dim=-1) / y_logits.size(-1)
next_embeds = self.model.region_encoder.encode_coordinate(y_center.to(y_logits.dtype)).unsqueeze(1)
coords = torch.cat([x_center, y_center], dim=1)
coords = coords * (batch_mask).unsqueeze(1)
pos_ids += 1
cache_pos = cache_pos + 1
bbox = None
if input_ids[0,1] == 7235:
model_outputs = self.model(
input_ids=None,
pixel_values=None,
tiling=None,
attention_mask=mask,
position_ids=pos_ids,
past_key_values=past_key_values,
inputs_embeds=next_embeds,
labels=None,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_pos,
logits_to_keep=logits_to_keep,
)
hidden_states = model_outputs.last_hidden_state # [B, T, D]
size_logits = self.model.region_decoder.decode_size(hidden_states)
bins = torch.argmax(size_logits, dim=-1)
w_bin = bins[:,0]
h_bin = bins[:,1]
w = torch.pow(2.0, (w_bin.float() / 1023.0) * 10.0 - 10.0)
h = torch.pow(2.0, (h_bin.float() / 1023.0) * 10.0 - 10.0)
next_embeds = (
self.model.region_encoder.encode_size(
torch.stack([w, h],dim=-1).to(size_logits.dtype)
)
).unsqueeze(1)
x_center = x_center.squeeze(1)
y_center = y_center.squeeze(1)
bbox = [
x_center - w / 2,
y_center - h / 2,
x_center + w / 2,
y_center + h / 2,
]
bbox = torch.stack(bbox, dim=1)
bbox = bbox * (batch_mask).unsqueeze(1)
pos_ids += 1
cache_pos = cache_pos + 1
new = coords.unsqueeze(1) if bbox is None else bbox.unsqueeze(1)
if self.objects is None:
self.objects = new
else:
self.objects = torch.cat([self.objects, new], dim=1)
model_outputs = self.model(
input_ids=None,
pixel_values=None,
tiling=None,
attention_mask=mask,
position_ids=pos_ids,
past_key_values=past_key_values,
inputs_embeds=next_embeds,
labels=None,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_pos,
logits_to_keep=logits_to_keep,
)
pos_ids += 1
cache_pos = cache_pos + 1
hidden_states = model_outputs.last_hidden_state # [B, T, D]
indices = torch.tensor(
[self.config.text_config.coord_token_id, self.config.text_config.eos_token_id],
device=self.device,
)
hidden_states = self.model.text_model.norm(hidden_states)
logits = hidden_states @ self.lm_head.weight[indices].T + self.lm_head.bias[indices]
logits_full = torch.full((logits.shape[0], logits.shape[1], self.config.text_config.vocab_size), float('-inf'), device=logits.device, dtype=logits.dtype)
logits_full[:, :, torch.tensor([5,0])] = logits
logits = logits_full
pred[batch_mask] = torch.argmax(logits, dim=-1)[batch_mask]
loss = None
if labels is not None:
# Shift if your training uses standard LM convention; here we assume labels aligned with hs
loss = self.loss_function(logits=logits, labels=labels, vocab_size=self.vocab_size)
return CausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=getattr(model_outputs, "past_key_values", None),
hidden_states=getattr(model_outputs, "hidden_states", None),
attentions=getattr(model_outputs, "attentions", None),
)
def generate(self, **kwargs) -> Union[Moondream3GenerateOutput, torch.LongTensor]:
outputs = super().generate(**kwargs)
if len(self.objects) > 0:
if isinstance(outputs, torch.Tensor):
outputs = self.objects
self.objects = []
else:
outputs = Moondream3GenerateOutput(
**outputs,
objects=self.objects
)
self.objects = []
return outputs
def prepare_inputs_for_generation(
self,
input_ids,
**model_kwargs
):
model_inputs = super().prepare_inputs_for_generation(input_ids, **model_kwargs)
model_inputs["position_ids"] += model_inputs["cache_position"].unsqueeze(0) - model_inputs["position_ids"]
return model_inputs
def _update_model_kwargs_for_generation(
self,
outputs,
model_kwargs,
is_encoder_decoder,
num_new_tokens: int = 1,
):
model_kwargs = super()._update_model_kwargs_for_generation(
outputs,
model_kwargs,
is_encoder_decoder=is_encoder_decoder,
num_new_tokens=num_new_tokens,
)
model_kwargs["pixel_values"] = None
model_kwargs["tiling"] = None
return model_kwargs
@staticmethod
def _reorder_cache(past_key_values, beam_idx):
reordered_past = ()
for layer_past in past_key_values:
reordered_past += (
tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),
)
return reordered_past
__all__ = [
"Moondream3Config",
"Moondream3TextConfig",
"Moondream3VisionConfig",
"Moondream3RegionConfig",
"Moondream3PreTrainedModel",
"Moondream3Model",
"Moondream3TextModel",
"Moondream3VisionModel",
"Moondream3ForConditionalGeneration",
]