dhara-70m / modeling_dhara.py
codelion's picture
Upload folder using huggingface_hub
575face verified
#!/usr/bin/env python3
"""
Dhara: Diffusion LLM with Canon Layers
Combines:
1. Dhara's masked diffusion training (bidirectional attention, high throughput)
2. Canon layers (local context mixing via causal depthwise convolutions)
Canon layers from "Physics of Language Models: Part 4.1" by Zeyuan Allen-Zhu:
- Position A: After input LayerNorm, before attention
- Position C: After post-attention LayerNorm, before MLP
- kernel_size=4, residual=True, activation=False (default)
Expected benefits:
- ~280-290 tok/s throughput (Dhara's parallel generation)
- +0.25-0.5% accuracy improvement (Canon's local context mixing)
"""
import math
import warnings
from typing import Optional, Tuple, Union, List
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import CrossEntropyLoss
from transformers import PreTrainedModel
from transformers.generation import GenerationMixin
from transformers.modeling_outputs import BaseModelOutputWithPast, MaskedLMOutput
from transformers.utils import logging
from transformers.cache_utils import Cache, DynamicCache
from transformers import PretrainedConfig
logger = logging.get_logger(__name__)
# Optional performance imports
try:
from flash_attn import flash_attn_func
FLASH_ATTN_AVAILABLE = True
except ImportError:
FLASH_ATTN_AVAILABLE = False
try:
import xformers.ops as xops
XFORMERS_AVAILABLE = True
except ImportError:
XFORMERS_AVAILABLE = False
class DharaConfig(PretrainedConfig):
"""
Configuration for Dhara model.
Combines Dhara diffusion config with Canon layer parameters.
"""
model_type = "dhara"
def __init__(
self,
# Core architecture
vocab_size: int = 50304,
hidden_size: int = 384,
num_hidden_layers: int = 32,
num_attention_heads: int = 8,
num_key_value_heads: int = 4,
intermediate_size: int = 1024,
head_dim: int = None,
max_position_embeddings: int = 2048,
# Model specifics
hidden_act: str = "silu",
rms_norm_eps: float = 1e-6,
rope_theta: float = 10000.0,
initializer_range: float = 0.02,
tie_word_embeddings: bool = True,
attention_dropout: float = 0.0,
# Canon layer parameters
canon_set: str = "AC", # Positions: A (before attn), C (before MLP)
canon_kernel: int = 4, # Kernel size (2-4)
canon_residual: bool = True, # Highly recommended
canon_activation: bool = False, # NOT recommended for transformers
canon_bias: bool = False,
# Diffusion specific
mask_token_id: int = None, # Will be set from tokenizer
mask_epsilon: float = 0.001, # Minimum mask probability
num_diffusion_steps: int = 1000,
# Special tokens
bos_token_id: int = 1,
eos_token_id: int = 2,
pad_token_id: int = 0,
# Performance flags
use_cache: bool = False,
use_flash_attention: bool = True,
use_xformers: bool = False,
**kwargs
):
super().__init__(
bos_token_id=bos_token_id,
eos_token_id=eos_token_id,
pad_token_id=pad_token_id,
tie_word_embeddings=tie_word_embeddings,
**kwargs
)
# Core architecture
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.intermediate_size = intermediate_size
self.head_dim = head_dim or (hidden_size // num_attention_heads)
self.max_position_embeddings = max_position_embeddings
# Model specifics
self.hidden_act = hidden_act
self.rms_norm_eps = rms_norm_eps
self.rope_theta = rope_theta
self.initializer_range = initializer_range
self.tie_word_embeddings = tie_word_embeddings
self.attention_dropout = attention_dropout
# Canon parameters
self.canon_set = canon_set
self.canon_kernel = canon_kernel
self.canon_residual = canon_residual
self.canon_activation = canon_activation
self.canon_bias = canon_bias
# Diffusion specific
self.mask_token_id = mask_token_id if mask_token_id is not None else (vocab_size - 1)
self.mask_epsilon = mask_epsilon
self.num_diffusion_steps = num_diffusion_steps
# Special tokens
self.bos_token_id = bos_token_id
self.eos_token_id = eos_token_id
self.pad_token_id = pad_token_id
# Performance
self.use_cache = use_cache
self.use_flash_attention = use_flash_attention
self.use_xformers = use_xformers
class CanonLayer(nn.Module):
"""
Canon Layer: Causal 1D depthwise convolution for local context mixing.
From "Physics of Language Models: Part 4.1" by Zeyuan Allen-Zhu.
Captures local sequential dependencies with O(n) complexity.
"""
def __init__(
self,
hidden_size: int,
kernel_size: int = 4,
use_residual: bool = True,
use_activation: bool = False,
use_bias: bool = False,
):
super().__init__()
self.hidden_size = hidden_size
self.kernel_size = kernel_size
self.use_residual = use_residual
self.use_activation = use_activation
# Depthwise causal convolution
self.conv = nn.Conv1d(
in_channels=hidden_size,
out_channels=hidden_size,
kernel_size=kernel_size,
padding=kernel_size - 1, # Causal (left-pad)
groups=hidden_size, # Depthwise
bias=use_bias,
)
# Initialize for stability
nn.init.normal_(self.conv.weight, mean=0.0, std=0.02)
if use_bias:
nn.init.zeros_(self.conv.bias)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
"""
Args:
hidden_states: [batch_size, seq_len, hidden_size]
Returns:
output: [batch_size, seq_len, hidden_size]
"""
batch_size, seq_len, hidden_size = hidden_states.shape
# Transpose for Conv1d: [B, H, L]
x = hidden_states.transpose(1, 2)
# Apply conv with causal padding
out = self.conv(x)
# Remove right padding to make it causal
out = out[:, :, :seq_len]
# Optional activation
if self.use_activation:
out = F.silu(out)
# Transpose back: [B, L, H]
out = out.transpose(1, 2)
# Residual connection
if self.use_residual:
out = hidden_states + out
return out
class RMSNorm(nn.Module):
"""Root Mean Square Layer Normalization"""
def __init__(self, hidden_size, eps=1e-6):
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(torch.float32)
variance = hidden_states.pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
return self.weight * hidden_states.to(input_dtype)
class RotaryEmbedding(nn.Module):
"""Rotary Position Embeddings (RoPE)"""
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
self.dim = dim
self.max_position_embeddings = max_position_embeddings
self.base = base
inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
self.register_buffer("inv_freq", inv_freq, persistent=False)
self._set_cos_sin_cache(
seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
)
def _set_cos_sin_cache(self, seq_len, device, dtype):
self.max_seq_len_cached = seq_len
t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype)
freqs = torch.outer(t, self.inv_freq)
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
def forward(self, x, seq_len=None):
if seq_len > self.max_seq_len_cached:
self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
return (
self.cos_cached[:seq_len].to(dtype=x.dtype),
self.sin_cached[:seq_len].to(dtype=x.dtype),
)
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(q, k, cos, sin, position_ids, unsqueeze_dim=1):
"""Applies Rotary Position Embedding to query and key tensors."""
cos = cos[position_ids].unsqueeze(unsqueeze_dim)
sin = sin[position_ids].unsqueeze(unsqueeze_dim)
# Cast to input dtype for consistency
cos = cos.to(q.dtype)
sin = sin.to(q.dtype)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
class DharaMLP(nn.Module):
"""MLP with SwiGLU activation"""
def __init__(self, config):
super().__init__()
self.config = config
self.hidden_size = config.hidden_size
self.intermediate_size = config.intermediate_size
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
self.act_fn = nn.SiLU()
def forward(self, x):
return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor:
"""Repeat KV heads for GQA."""
batch, num_key_value_heads, slen, head_dim = hidden_states.shape
if n_rep == 1:
return hidden_states
hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)
class DharaAttention(nn.Module):
"""Multi-Head Bidirectional Attention with GQA support (for diffusion)"""
def __init__(self, config: DharaConfig, layer_idx: Optional[int] = None):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.attention_dropout = config.attention_dropout
self.hidden_size = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = config.head_dim
self.num_key_value_heads = config.num_key_value_heads
self.num_key_value_groups = self.num_heads // self.num_key_value_heads
self.max_position_embeddings = config.max_position_embeddings
self.rope_theta = config.rope_theta
self.is_causal = False # CRITICAL: Dhara uses bidirectional attention
if (self.head_dim * self.num_heads) != self.hidden_size:
raise ValueError(
f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
f" and `num_heads`: {self.num_heads})."
)
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
self.rotary_emb = RotaryEmbedding(
self.head_dim,
max_position_embeddings=self.max_position_embeddings,
base=self.rope_theta,
)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value=None,
output_attentions: bool = False,
use_cache: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
kv_seq_len = key_states.shape[-2]
if past_key_value is not None:
if self.layer_idx is None:
raise ValueError(
f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} "
"for auto-regressive decoding with k/v caching, please make sure to initialize the attention class "
"with a layer index."
)
kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx)
cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos}
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
key_states = repeat_kv(key_states, self.num_key_value_groups)
value_states = repeat_kv(value_states, self.num_key_value_groups)
# Flash Attention for bidirectional
if FLASH_ATTN_AVAILABLE and self.config.use_flash_attention:
query_states = query_states.transpose(1, 2).contiguous()
key_states = key_states.transpose(1, 2).contiguous()
value_states = value_states.transpose(1, 2).contiguous()
if query_states.dtype not in [torch.float16, torch.bfloat16]:
query_states = query_states.to(torch.bfloat16)
key_states = key_states.to(torch.bfloat16)
value_states = value_states.to(torch.bfloat16)
attn_output = flash_attn_func(
query_states,
key_states,
value_states,
dropout_p=0.0,
causal=False, # Bidirectional for diffusion
)
attn_output = attn_output.view(bsz, q_len, self.hidden_size)
else:
# Standard attention
attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
if attention_mask is not None:
attn_weights = attn_weights + attention_mask
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
attn_weights = nn.functional.dropout(attn_weights, p=self.attention_dropout, training=self.training)
attn_output = torch.matmul(attn_weights, value_states)
attn_output = attn_output.transpose(1, 2).contiguous()
attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
attn_output = self.o_proj(attn_output)
if not output_attentions:
attn_weights = None
return attn_output, attn_weights, past_key_value
class DharaDecoderLayer(nn.Module):
"""
Dhara decoder layer with Canon layers at positions A and C.
Flow:
x -> LayerNorm -> [CanonA] -> Attention -> + residual
x -> LayerNorm -> [CanonC] -> MLP -> + residual
"""
def __init__(self, config: DharaConfig, layer_idx: int):
super().__init__()
self.hidden_size = config.hidden_size
self.config = config
# Pre-attention norm
self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
# Canon-A: before attention
self.canon_a = None
if "A" in config.canon_set:
self.canon_a = CanonLayer(
hidden_size=config.hidden_size,
kernel_size=config.canon_kernel,
use_residual=config.canon_residual,
use_activation=config.canon_activation,
use_bias=config.canon_bias,
)
# Attention
self.self_attn = DharaAttention(config=config, layer_idx=layer_idx)
# Post-attention norm
self.post_attention_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
# Canon-C: before MLP
self.canon_c = None
if "C" in config.canon_set:
self.canon_c = CanonLayer(
hidden_size=config.hidden_size,
kernel_size=config.canon_kernel,
use_residual=config.canon_residual,
use_activation=config.canon_activation,
use_bias=config.canon_bias,
)
# MLP
self.mlp = DharaMLP(config)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value=None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
residual = hidden_states
# Pre-attention layernorm
hidden_states = self.input_layernorm(hidden_states)
# Canon-A (before attention)
if self.canon_a is not None:
hidden_states = self.canon_a(hidden_states)
# Self Attention (bidirectional)
hidden_states, self_attn_weights, present_key_value = self.self_attn(
hidden_states=hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
output_attentions=output_attentions,
use_cache=use_cache,
)
hidden_states = residual + hidden_states
# MLP block
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
# Canon-C (before MLP)
if self.canon_c is not None:
hidden_states = self.canon_c(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = residual + hidden_states
outputs = (hidden_states,)
if output_attentions:
outputs += (self_attn_weights,)
if use_cache:
outputs += (present_key_value,)
return outputs
class DharaPreTrainedModel(PreTrainedModel):
config_class = DharaConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = ["DharaDecoderLayer"]
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn_2 = True
_supports_cache_class = True
def _init_weights(self, module):
std = self.config.initializer_range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
class DharaModel(DharaPreTrainedModel):
"""
Dhara base model with bidirectional attention and Canon layers.
"""
def __init__(self, config: DharaConfig):
super().__init__(config)
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
self.layers = nn.ModuleList(
[DharaDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
)
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.gradient_checkpointing = False
self.config = config
self.mask_token_id = config.mask_token_id
self._use_flash_attention_2 = config.use_flash_attention and FLASH_ATTN_AVAILABLE
self.post_init()
def get_input_embeddings(self):
return self.embed_tokens
def set_input_embeddings(self, value):
self.embed_tokens = value
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values=None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if input_ids is not None and inputs_embeds is not None:
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
elif input_ids is not None:
batch_size, seq_length = input_ids.shape[:2]
elif inputs_embeds is not None:
batch_size, seq_length = inputs_embeds.shape[:2]
else:
raise ValueError("You have to specify either input_ids or inputs_embeds")
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
past_key_values_length = 0
if use_cache:
use_legacy_cache = not isinstance(past_key_values, Cache)
if use_legacy_cache:
past_key_values = DynamicCache.from_legacy_cache(past_key_values)
past_key_values_length = past_key_values.get_usable_length(seq_length)
if position_ids is None:
device = input_ids.device if input_ids is not None else inputs_embeds.device
position_ids = torch.arange(
past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device
)
position_ids = position_ids.unsqueeze(0)
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
if self._use_flash_attention_2:
attention_mask = attention_mask if (attention_mask is not None and 0 in attention_mask) else None
else:
# Bidirectional attention mask (not causal)
if attention_mask is not None:
if attention_mask.dim() == 2:
batch_size, seq_length = attention_mask.shape
attention_mask_4d = attention_mask[:, None, None, :].expand(
batch_size, 1, seq_length, seq_length
).to(dtype=inputs_embeds.dtype)
attention_mask = torch.where(
attention_mask_4d == 0,
torch.tensor(float('-inf'), dtype=inputs_embeds.dtype, device=attention_mask_4d.device),
torch.tensor(0.0, dtype=inputs_embeds.dtype, device=attention_mask_4d.device)
)
else:
attention_mask = attention_mask
else:
attention_mask = None
hidden_states = inputs_embeds
all_hidden_states = () if output_hidden_states else None
all_self_attns = () if output_attentions else None
next_decoder_cache = None
for decoder_layer in self.layers:
if output_hidden_states:
all_hidden_states += (hidden_states,)
if self.gradient_checkpointing and self.training:
layer_outputs = self._gradient_checkpointing_func(
decoder_layer.__call__,
hidden_states,
attention_mask,
position_ids,
past_key_values,
output_attentions,
use_cache,
)
else:
layer_outputs = decoder_layer(
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_values,
output_attentions=output_attentions,
use_cache=use_cache,
)
hidden_states = layer_outputs[0]
if use_cache:
next_decoder_cache = layer_outputs[2 if output_attentions else 1]
if output_attentions:
all_self_attns += (layer_outputs[1],)
hidden_states = self.norm(hidden_states)
if output_hidden_states:
all_hidden_states += (hidden_states,)
next_cache = None
if use_cache:
next_cache = next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache
if not return_dict:
return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=hidden_states,
past_key_values=next_cache,
hidden_states=all_hidden_states,
attentions=all_self_attns,
)
def add_noise_to_tokens(self, input_ids: torch.LongTensor, t: torch.FloatTensor, eps: float = None):
"""
MDM-style masking: Replace tokens with [MASK] based on noise level t.
Args:
input_ids: Input token IDs [batch_size, seq_len]
t: Noise level in [0, 1] [batch_size]
eps: Minimum mask probability
Returns:
Tuple of (noisy_input_ids, corruption_mask, p_mask)
"""
batch_size, seq_len = input_ids.shape
device = input_ids.device
if eps is None:
eps = getattr(self.config, 'mask_epsilon', 0.001)
p_mask = (1 - eps) * t + eps
p_mask = p_mask.unsqueeze(-1).expand(batch_size, seq_len)
corruption_mask = torch.rand(batch_size, seq_len, device=device) < p_mask
mask_token_id = self.mask_token_id
noisy_input_ids = torch.where(corruption_mask, mask_token_id, input_ids)
return noisy_input_ids, corruption_mask, p_mask
class DharaForMaskedDiffusion(DharaPreTrainedModel, GenerationMixin):
"""Dhara Model with Masked Diffusion head for training"""
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config):
super().__init__(config)
self.model = DharaModel(config)
self.vocab_size = config.vocab_size
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
self.config = config
self.mask_token_id = config.mask_token_id
self.post_init()
def get_input_embeddings(self):
return self.model.embed_tokens
def set_input_embeddings(self, value):
self.model.embed_tokens = value
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def set_decoder(self, decoder):
self.model = decoder
def get_decoder(self):
return self.model
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values=None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
corruption_mask: Optional[torch.BoolTensor] = None,
p_mask: Optional[torch.Tensor] = None,
) -> Union[Tuple, MaskedLMOutput]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
hidden_states = outputs[0]
if self.config.tie_word_embeddings:
logits = F.linear(hidden_states, self.model.embed_tokens.weight)
else:
logits = self.lm_head(hidden_states)
logits = logits.float()
loss = None
if labels is not None:
loss = self.compute_diffusion_loss(logits, labels, corruption_mask, p_mask)
if not return_dict:
output = (logits,) + outputs[1:]
return (loss,) + output if loss is not None else output
return MaskedLMOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
def compute_diffusion_loss(self, logits, labels, corruption_mask=None, p_mask=None):
"""
MDM loss with p_mask importance weighting.
"""
if corruption_mask is None or p_mask is None:
raise ValueError(
"MDM requires both corruption_mask and p_mask for loss computation."
)
loss = F.cross_entropy(
logits.view(-1, self.config.vocab_size),
labels.view(-1),
reduction='none'
)
loss = loss.view(labels.shape)
masked_losses = loss[corruption_mask]
masked_p_mask = p_mask[corruption_mask]
weighted_losses = masked_losses / masked_p_mask
total_positions = labels.shape[0] * labels.shape[1]
return weighted_losses.sum() / total_positions
def add_noise_to_tokens(self, input_ids: torch.LongTensor, t: torch.FloatTensor, eps: float = None):
"""Delegate to the base model"""
return self.model.add_noise_to_tokens(input_ids, t, eps)
def prepare_inputs_for_generation(
self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs
):
if past_key_values is not None:
if isinstance(past_key_values, Cache):
cache_length = past_key_values.get_seq_length()
past_length = past_key_values.seen_tokens
max_cache_length = past_key_values.get_max_length()
else:
cache_length = past_length = past_key_values[0][0].shape[2]
max_cache_length = None
if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]:
input_ids = input_ids[:, -(attention_mask.shape[1] - past_length) :]
elif past_length < input_ids.shape[1]:
input_ids = input_ids[:, past_length:]
if (
max_cache_length is not None
and attention_mask is not None
and cache_length + input_ids.shape[1] > max_cache_length
):
attention_mask = attention_mask[:, -max_cache_length:]
position_ids = kwargs.get("position_ids", None)
if attention_mask is not None and position_ids is None:
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
if past_key_values:
position_ids = position_ids[:, -input_ids.shape[1] :]
if inputs_embeds is not None and past_key_values is None:
model_inputs = {"inputs_embeds": inputs_embeds}
else:
model_inputs = {"input_ids": input_ids}
model_inputs.update(
{
"position_ids": position_ids,
"past_key_values": past_key_values,
"use_cache": kwargs.get("use_cache"),
"attention_mask": attention_mask,
}
)
return model_inputs
@staticmethod
def _reorder_cache(past_key_values, beam_idx):
reordered_past = ()
for layer_past in past_key_values:
reordered_past += (
tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),
)
return reordered_past
@torch.no_grad()
def generate(
self,
input_ids: Optional[torch.LongTensor] = None,
max_length: Optional[int] = None,
max_new_tokens: Optional[int] = None,
num_diffusion_steps: int = 10,
temperature: float = 1.0,
top_p: float = 0.9,
top_k: int = 50,
do_sample: bool = True,
pad_token_id: Optional[int] = None,
eos_token_id: Optional[int] = None,
repetition_penalty: float = 1.2,
**kwargs
) -> torch.LongTensor:
"""
Generate text using autoregressive sampling with the diffusion model.
Since this model was converted from AR to diffusion via WSD training,
we generate tokens one at a time left-to-right, using the model's
next-token predictions at each position.
Args:
input_ids: Input prompt token IDs [batch_size, prompt_len]
max_length: Maximum total sequence length (prompt + generation)
max_new_tokens: Number of new tokens to generate (alternative to max_length)
num_diffusion_steps: Number of refinement iterations per token (higher = better quality)
temperature: Sampling temperature (higher = more random)
top_p: Nucleus sampling threshold
top_k: Top-k sampling threshold
do_sample: Whether to sample or take argmax
pad_token_id: Token ID for padding
eos_token_id: Token ID for end of sequence
repetition_penalty: Penalty for repeating tokens (>1 = less repetition)
Returns:
Generated token IDs including the prompt
"""
# Handle device and dtype
device = input_ids.device if input_ids is not None else next(self.parameters()).device
# Determine generation length
if input_ids is not None:
batch_size, prompt_len = input_ids.shape
else:
batch_size, prompt_len = 1, 0
input_ids = torch.empty(batch_size, 0, dtype=torch.long, device=device)
if max_new_tokens is not None:
gen_len = max_new_tokens
elif max_length is not None:
gen_len = max_length - prompt_len
else:
gen_len = 50 # Default generation length
if gen_len <= 0:
return input_ids
# Get special token IDs
mask_token_id = self.config.mask_token_id
if pad_token_id is None:
pad_token_id = self.config.pad_token_id if hasattr(self.config, 'pad_token_id') else 0
if eos_token_id is None:
eos_token_id = self.config.eos_token_id if hasattr(self.config, 'eos_token_id') else 2
# Start with the prompt
generated = input_ids.clone()
# Track generated tokens for repetition penalty
generated_set = set()
for i in range(prompt_len):
for b in range(batch_size):
generated_set.add(input_ids[b, i].item())
# Generate tokens one at a time (autoregressive style)
for pos in range(gen_len):
# Add a mask token at the next position
current_seq = torch.cat([
generated,
torch.full((batch_size, 1), mask_token_id, dtype=torch.long, device=device)
], dim=1)
# Get model predictions
outputs = self(input_ids=current_seq)
logits = outputs.logits # [batch, seq_len, vocab]
# Get logits for the last (masked) position
next_token_logits = logits[:, -1, :] # [batch, vocab]
# Apply repetition penalty
if repetition_penalty != 1.0:
for b in range(batch_size):
for prev_token in generated_set:
if prev_token < next_token_logits.shape[1]:
next_token_logits[b, prev_token] /= repetition_penalty
# Apply temperature
if temperature != 1.0 and temperature > 0:
next_token_logits = next_token_logits / temperature
if do_sample and temperature > 0:
# Apply top-k filtering
if top_k > 0:
indices_to_remove = next_token_logits < torch.topk(next_token_logits, top_k)[0][..., -1, None]
next_token_logits[indices_to_remove] = float('-inf')
# Apply top-p (nucleus) filtering
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
# Remove tokens with cumulative probability above threshold
sorted_indices_to_remove = cumulative_probs > top_p
# Shift the indices to the right to keep the first token above threshold
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = False
# Scatter sorted indices to original indexing
indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove)
next_token_logits[indices_to_remove] = float('-inf')
# Sample from the filtered distribution
probs = F.softmax(next_token_logits, dim=-1)
next_tokens = torch.multinomial(probs, num_samples=1).squeeze(-1)
else:
# Greedy decoding
next_tokens = next_token_logits.argmax(dim=-1)
# Add to generated sequence
generated = torch.cat([generated, next_tokens.unsqueeze(-1)], dim=1)
# Update generated set for repetition penalty
for b in range(batch_size):
generated_set.add(next_tokens[b].item())
# Check for EOS
if eos_token_id is not None and (next_tokens == eos_token_id).all():
break
return generated
def save_pretrained(self, save_directory, **kwargs):
"""Override to save in SafeTensors format by default"""
kwargs['safe_serialization'] = kwargs.get('safe_serialization', True)
return super().save_pretrained(save_directory, **kwargs)
def count_parameters(model):
"""Count total and Canon-specific parameters."""
total = sum(p.numel() for p in model.parameters())
canon = sum(p.numel() for n, p in model.named_parameters() if 'canon' in n.lower())
return total, canon
if __name__ == "__main__":
# Quick test
print("Testing Dhara model creation...")
config = DharaConfig(
vocab_size=50304,
hidden_size=384,
num_hidden_layers=32,
num_attention_heads=8,
num_key_value_heads=4,
intermediate_size=1024,
canon_set="AC",
canon_kernel=4,
canon_residual=True,
)
model = DharaForMaskedDiffusion(config)
total, canon = count_parameters(model)
print(f"Model created successfully!")
print(f"Total params: {total:,} ({total/1e6:.2f}M)")
print(f"Canon params: {canon:,} ({100*canon/total:.3f}%)")
print(f"Base Dhara would be: {total - canon:,}")
# Test forward pass
batch_size, seq_len = 2, 64
input_ids = torch.randint(0, 50304, (batch_size, seq_len))
# Test with diffusion noise
t = torch.rand(batch_size)
noisy_ids, corruption_mask, p_mask = model.add_noise_to_tokens(input_ids, t)
with torch.no_grad():
outputs = model(
input_ids=noisy_ids,
labels=input_ids,
corruption_mask=corruption_mask,
p_mask=p_mask,
)
print(f"Forward pass: loss={outputs.loss.item():.4f}")
print("Ready for training!")