import torch, torch.nn as nn, torch.nn.functional as F from typing import Optional from transformers import PreTrainedModel from transformers.modeling_outputs import CausalLMOutputWithPast from .configuration_quark import QuarkConfig class QuarkRMSNorm(nn.Module): def __init__(self, dim, eps=1e-5): super().__init__(); self.eps=eps; self.scale=nn.Parameter(torch.ones(dim)) def forward(self, x): return (x.float()*(x.float().pow(2).mean(-1,keepdim=True).add(self.eps).rsqrt())).to(x.dtype)*self.scale class QuarkRoPE(nn.Module): def __init__(self, hd, ml, th=10000.): super().__init__() self.register_buffer("inv",1./(th**(torch.arange(0,hd,2).float()/hd)),persistent=False); self._b(ml) def _b(self, sl): f=torch.outer(torch.arange(sl,device=self.inv.device).float(),self.inv); e=torch.cat([f,f],-1) self.register_buffer("cos_c",e.cos()[None,None],persistent=False); self.register_buffer("sin_c",e.sin()[None,None],persistent=False); self._m=sl @staticmethod def _r(x): a,b=x.chunk(2,-1); return torch.cat([-b,a],-1) def forward(self, q, k): T=q.size(2) if T>self._m: self._b(T) c,s=self.cos_c[:,:,:T,:],self.sin_c[:,:,:T,:] return q*c+self._r(q)*s, k*c+self._r(k)*s class QuarkAttention(nn.Module): def __init__(self, cfg): super().__init__() self.nh,self.nkv,self.ng,self.hd=cfg.n_heads,cfg.n_kv_heads,cfg.n_heads//cfg.n_kv_heads,cfg.head_dim self.q_proj=nn.Linear(cfg.d_model,cfg.n_heads*cfg.head_dim,bias=cfg.qkv_bias) self.k_proj=nn.Linear(cfg.d_model,cfg.n_kv_heads*cfg.head_dim,bias=cfg.qkv_bias) self.v_proj=nn.Linear(cfg.d_model,cfg.n_kv_heads*cfg.head_dim,bias=cfg.qkv_bias) self.o_proj=nn.Linear(cfg.n_heads*cfg.head_dim,cfg.d_model,bias=False) self.rope=QuarkRoPE(cfg.head_dim,cfg.max_seq_len,cfg.rope_theta) def forward(self, x): B,T,_=x.shape q=self.q_proj(x).view(B,T,self.nh,self.hd).transpose(1,2) k=self.k_proj(x).view(B,T,self.nkv,self.hd).transpose(1,2) v=self.v_proj(x).view(B,T,self.nkv,self.hd).transpose(1,2) q,k=self.rope(q,k); q,k=q.to(v.dtype),k.to(v.dtype) if self.ng>1: k=k.repeat_interleave(self.ng,1); v=v.repeat_interleave(self.ng,1) return self.o_proj(F.scaled_dot_product_attention(q,k,v,is_causal=True).transpose(1,2).contiguous().view(B,T,-1)) class QuarkFFN(nn.Module): def __init__(self, cfg): super().__init__() self.gate_proj=nn.Linear(cfg.d_model,cfg.d_ff,bias=False) self.up_proj=nn.Linear(cfg.d_model,cfg.d_ff,bias=False) self.down_proj=nn.Linear(cfg.d_ff,cfg.d_model,bias=False) def forward(self, x): return self.down_proj(F.silu(self.gate_proj(x))*self.up_proj(x)) class QuarkBlock(nn.Module): def __init__(self, cfg): super().__init__() self.norm_attn=QuarkRMSNorm(cfg.d_model,cfg.rms_eps); self.attn=QuarkAttention(cfg) self.norm_ffn=QuarkRMSNorm(cfg.d_model,cfg.rms_eps); self.ffn=QuarkFFN(cfg) def forward(self, x): x=x+self.attn(self.norm_attn(x)); return x+self.ffn(self.norm_ffn(x)) class QuarkPreTrainedModel(PreTrainedModel): config_class=QuarkConfig; base_model_prefix="model"; supports_gradient_checkpointing=False def _init_weights(self, module): if isinstance(module, nn.Linear): module.weight.data.normal_(0.0, 0.02) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(0.0, 0.02) class QuarkForCausalLM(QuarkPreTrainedModel): def __init__(self, config): super().__init__(config); self.config=config self.embed_tokens=nn.Embedding(config.vocab_size,config.d_model) self.layers=nn.ModuleList([QuarkBlock(config) for _ in range(config.n_layers)]) self.norm=QuarkRMSNorm(config.d_model,config.rms_eps) self.lm_head=nn.Linear(config.d_model,config.vocab_size,bias=False) self.lm_head.weight=self.embed_tokens.weight # weight tying self.post_init() def _load_from_state_dict(self, state_dict, prefix, *args, **kwargs): """Se lm_head.weight manca, copia da embed_tokens.weight (weight tying)""" lm_key = f"{prefix}lm_head.weight" emb_key = f"{prefix}embed_tokens.weight" if lm_key not in state_dict and emb_key in state_dict: state_dict[lm_key] = state_dict[emb_key].clone() super()._load_from_state_dict(state_dict, prefix, *args, **kwargs) def get_input_embeddings(self): return self.embed_tokens def set_input_embeddings(self, v): self.embed_tokens=v def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, v): self.lm_head=v def forward(self, input_ids, attention_mask=None, labels=None, **kwargs): h=self.embed_tokens(input_ids) for layer in self.layers: h=layer(h) logits=self.lm_head(self.norm(h)) loss=None if labels is not None: loss=F.cross_entropy(logits[...,:-1,:].contiguous().view(-1,self.config.vocab_size), labels[...,1:].contiguous().view(-1),ignore_index=-100) return CausalLMOutputWithPast(loss=loss, logits=logits) def prepare_inputs_for_generation(self, input_ids, **kwargs): return {"input_ids": input_ids}