Quark-270m-Instruct / modeling_quark.py
ThingsAI's picture
Upload modeling_quark.py with huggingface_hub
f950097 verified
import torch, torch.nn as nn, torch.nn.functional as F
from typing import Optional
from transformers import PreTrainedModel
from transformers.modeling_outputs import CausalLMOutputWithPast
from .configuration_quark import QuarkConfig
class QuarkRMSNorm(nn.Module):
def __init__(self, dim, eps=1e-5):
super().__init__(); self.eps=eps; self.scale=nn.Parameter(torch.ones(dim))
def forward(self, x):
return (x.float()*(x.float().pow(2).mean(-1,keepdim=True).add(self.eps).rsqrt())).to(x.dtype)*self.scale
class QuarkRoPE(nn.Module):
def __init__(self, hd, ml, th=10000.):
super().__init__()
self.register_buffer("inv",1./(th**(torch.arange(0,hd,2).float()/hd)),persistent=False); self._b(ml)
def _b(self, sl):
f=torch.outer(torch.arange(sl,device=self.inv.device).float(),self.inv); e=torch.cat([f,f],-1)
self.register_buffer("cos_c",e.cos()[None,None],persistent=False); self.register_buffer("sin_c",e.sin()[None,None],persistent=False); self._m=sl
@staticmethod
def _r(x): a,b=x.chunk(2,-1); return torch.cat([-b,a],-1)
def forward(self, q, k):
T=q.size(2)
if T>self._m: self._b(T)
c,s=self.cos_c[:,:,:T,:],self.sin_c[:,:,:T,:]
return q*c+self._r(q)*s, k*c+self._r(k)*s
class QuarkAttention(nn.Module):
def __init__(self, cfg):
super().__init__()
self.nh,self.nkv,self.ng,self.hd=cfg.n_heads,cfg.n_kv_heads,cfg.n_heads//cfg.n_kv_heads,cfg.head_dim
self.q_proj=nn.Linear(cfg.d_model,cfg.n_heads*cfg.head_dim,bias=cfg.qkv_bias)
self.k_proj=nn.Linear(cfg.d_model,cfg.n_kv_heads*cfg.head_dim,bias=cfg.qkv_bias)
self.v_proj=nn.Linear(cfg.d_model,cfg.n_kv_heads*cfg.head_dim,bias=cfg.qkv_bias)
self.o_proj=nn.Linear(cfg.n_heads*cfg.head_dim,cfg.d_model,bias=False)
self.rope=QuarkRoPE(cfg.head_dim,cfg.max_seq_len,cfg.rope_theta)
def forward(self, x):
B,T,_=x.shape
q=self.q_proj(x).view(B,T,self.nh,self.hd).transpose(1,2)
k=self.k_proj(x).view(B,T,self.nkv,self.hd).transpose(1,2)
v=self.v_proj(x).view(B,T,self.nkv,self.hd).transpose(1,2)
q,k=self.rope(q,k); q,k=q.to(v.dtype),k.to(v.dtype)
if self.ng>1: k=k.repeat_interleave(self.ng,1); v=v.repeat_interleave(self.ng,1)
return self.o_proj(F.scaled_dot_product_attention(q,k,v,is_causal=True).transpose(1,2).contiguous().view(B,T,-1))
class QuarkFFN(nn.Module):
def __init__(self, cfg):
super().__init__()
self.gate_proj=nn.Linear(cfg.d_model,cfg.d_ff,bias=False)
self.up_proj=nn.Linear(cfg.d_model,cfg.d_ff,bias=False)
self.down_proj=nn.Linear(cfg.d_ff,cfg.d_model,bias=False)
def forward(self, x): return self.down_proj(F.silu(self.gate_proj(x))*self.up_proj(x))
class QuarkBlock(nn.Module):
def __init__(self, cfg):
super().__init__()
self.norm_attn=QuarkRMSNorm(cfg.d_model,cfg.rms_eps); self.attn=QuarkAttention(cfg)
self.norm_ffn=QuarkRMSNorm(cfg.d_model,cfg.rms_eps); self.ffn=QuarkFFN(cfg)
def forward(self, x):
x=x+self.attn(self.norm_attn(x)); return x+self.ffn(self.norm_ffn(x))
class QuarkPreTrainedModel(PreTrainedModel):
config_class=QuarkConfig; base_model_prefix="model"; supports_gradient_checkpointing=False
def _init_weights(self, module):
if isinstance(module, nn.Linear):
module.weight.data.normal_(0.0, 0.02)
if module.bias is not None: module.bias.data.zero_()
elif isinstance(module, nn.Embedding): module.weight.data.normal_(0.0, 0.02)
class QuarkForCausalLM(QuarkPreTrainedModel):
def __init__(self, config):
super().__init__(config); self.config=config
self.embed_tokens=nn.Embedding(config.vocab_size,config.d_model)
self.layers=nn.ModuleList([QuarkBlock(config) for _ in range(config.n_layers)])
self.norm=QuarkRMSNorm(config.d_model,config.rms_eps)
self.lm_head=nn.Linear(config.d_model,config.vocab_size,bias=False)
self.lm_head.weight=self.embed_tokens.weight # weight tying
self.post_init()
def _load_from_state_dict(self, state_dict, prefix, *args, **kwargs):
"""Se lm_head.weight manca, copia da embed_tokens.weight (weight tying)"""
lm_key = f"{prefix}lm_head.weight"
emb_key = f"{prefix}embed_tokens.weight"
if lm_key not in state_dict and emb_key in state_dict:
state_dict[lm_key] = state_dict[emb_key].clone()
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
def get_input_embeddings(self): return self.embed_tokens
def set_input_embeddings(self, v): self.embed_tokens=v
def get_output_embeddings(self): return self.lm_head
def set_output_embeddings(self, v): self.lm_head=v
def forward(self, input_ids, attention_mask=None, labels=None, **kwargs):
h=self.embed_tokens(input_ids)
for layer in self.layers: h=layer(h)
logits=self.lm_head(self.norm(h))
loss=None
if labels is not None:
loss=F.cross_entropy(logits[...,:-1,:].contiguous().view(-1,self.config.vocab_size),
labels[...,1:].contiguous().view(-1),ignore_index=-100)
return CausalLMOutputWithPast(loss=loss, logits=logits)
def prepare_inputs_for_generation(self, input_ids, **kwargs): return {"input_ids": input_ids}