HybriKo-117M-Exp6-FunctionCall / modeling_hybridko.py
gyung's picture
Upload Function Calling SFT model (Epoch 2, Loss 0.14)
b7e6c9d verified
# -*- coding: utf-8 -*-
"""HybriKo Model - Hugging Face Compatible
A hybrid RNN-Attention language model optimized for Korean.
Uses a 2:1 ratio of RNN (Griffin) blocks to Attention blocks.
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.checkpoint import checkpoint
from typing import Optional, Dict, Any, Tuple, Union
from transformers import PreTrainedModel
from transformers.modeling_outputs import CausalLMOutputWithPast
try:
from .configuration_hybridko import HybriKoConfig
except ImportError:
from configuration_hybridko import HybriKoConfig
# ============================================================================
# Basic Layer Components
# ============================================================================
class RMSNorm(nn.Module):
"""Root Mean Square Layer Normalization."""
def __init__(self, d_model: int, eps: float = 1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(d_model))
def forward(self, x: torch.Tensor) -> torch.Tensor:
rms = torch.sqrt(torch.mean(x ** 2, dim=-1, keepdim=True) + self.eps)
return x / rms * self.weight
class GeGLU(nn.Module):
"""Gated GELU Feed-Forward Network."""
def __init__(self, d_model: int, d_ff: int):
super().__init__()
self.w1 = nn.Linear(d_model, d_ff, bias=False)
self.w2 = nn.Linear(d_model, d_ff, bias=False)
self.w3 = nn.Linear(d_ff, d_model, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.w3(F.gelu(self.w1(x)) * self.w2(x))
class RGLRU(nn.Module):
"""Real-Gated Linear Recurrent Unit (Griffin/LFM2 style)."""
def __init__(self, d_model: int, eps: float = 1e-6):
super().__init__()
self.d_model = d_model
self.eps = eps
self.input_proj = nn.Linear(d_model, d_model * 2)
self.gate_proj = nn.Linear(d_model, d_model * 2)
self.a_param = nn.Parameter(torch.zeros(d_model))
self.out_proj = nn.Linear(d_model, d_model)
self._init_weights()
def _init_weights(self):
nn.init.xavier_uniform_(self.input_proj.weight)
nn.init.xavier_uniform_(self.gate_proj.weight)
nn.init.xavier_uniform_(self.out_proj.weight)
nn.init.uniform_(self.a_param, -0.5, 0.5)
def forward(
self, x: torch.Tensor, h_prev: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
batch, seq_len, _ = x.shape
# Input gating
input_gate = self.input_proj(x)
x_in, x_gate = input_gate.chunk(2, dim=-1)
x_in = x_in * torch.sigmoid(x_gate)
# Recurrent gating
gates = self.gate_proj(x)
r, i = gates.chunk(2, dim=-1)
r = torch.sigmoid(r)
i = torch.sigmoid(i)
# Compute recurrence coefficients
a_base = torch.sigmoid(F.softplus(self.a_param))
a = a_base.unsqueeze(0).unsqueeze(0) * r
sqrt_1_minus_a2 = torch.sqrt(torch.clamp(1 - a ** 2, min=self.eps))
# Initialize hidden state
h = h_prev if h_prev is not None else torch.zeros(
batch, self.d_model, device=x.device, dtype=x.dtype
)
# Sequential recurrence
outputs = []
for t in range(seq_len):
h = a[:, t] * h + sqrt_1_minus_a2[:, t] * (i[:, t] * x_in[:, t])
outputs.append(h)
h_seq = torch.stack(outputs, dim=1)
return self.out_proj(h_seq), h
# ============================================================================
# Attention Components
# ============================================================================
class RotaryEmbedding(nn.Module):
"""Rotary Positional Embedding (RoPE)."""
def __init__(self, d_head: int, max_seq_len: int = 2048):
super().__init__()
inv_freq = 1.0 / (10000 ** (torch.arange(0, d_head, 2).float() / d_head))
self.register_buffer("inv_freq", inv_freq)
self._cache = None
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
seq_len = x.shape[2]
if self._cache is None or self._cache[0].shape[2] < seq_len:
t = torch.arange(seq_len, device=x.device, dtype=x.dtype)
freqs = torch.outer(t, self.inv_freq.to(x.device))
emb = torch.cat([freqs, freqs], dim=-1)
self._cache = (
emb.cos().unsqueeze(0).unsqueeze(0),
emb.sin().unsqueeze(0).unsqueeze(0),
)
return self._cache[0][:, :, :seq_len], self._cache[1][:, :, :seq_len]
def apply_rope(
x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor
) -> torch.Tensor:
"""Apply Rotary Positional Embedding to input tensor."""
d_half = x.shape[-1] // 2
x1, x2 = x[..., :d_half], x[..., d_half:]
cos = cos[..., :d_half]
sin = sin[..., :d_half]
return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1)
class GQAttention(nn.Module):
"""Grouped Query Attention with RoPE."""
def __init__(
self,
d_model: int,
n_heads: int = 8,
n_kv_heads: int = 2,
dropout: float = 0.0,
):
super().__init__()
self.n_heads = n_heads
self.n_kv_heads = n_kv_heads
self.d_head = d_model // n_heads
self.scale = 1.0 / math.sqrt(self.d_head)
self.dropout = dropout
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, n_kv_heads * self.d_head, bias=False)
self.v_proj = nn.Linear(d_model, n_kv_heads * self.d_head, bias=False)
self.o_proj = nn.Linear(d_model, d_model, bias=False)
self.rope = RotaryEmbedding(self.d_head)
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, L, _ = x.shape
# Project to Q, K, V
q = self.q_proj(x).view(B, L, self.n_heads, self.d_head)
k = self.k_proj(x).view(B, L, self.n_kv_heads, self.d_head)
v = self.v_proj(x).view(B, L, self.n_kv_heads, self.d_head)
# Transpose to [B, n_heads, L, d_head]
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
# Apply RoPE
cos, sin = self.rope(q)
q = apply_rope(q, cos, sin)
k = apply_rope(k, cos, sin)
# Expand KV heads to match query heads
n_rep = self.n_heads // self.n_kv_heads
k = k.repeat_interleave(n_rep, dim=1)
v = v.repeat_interleave(n_rep, dim=1)
# Attention with causal mask
attn = (q @ k.transpose(-2, -1)) * self.scale
mask = torch.triu(torch.ones(L, L, device=q.device), diagonal=1).bool()
attn = attn.masked_fill(mask, float("-inf"))
attn = F.softmax(attn, dim=-1)
if self.training and self.dropout > 0:
attn = F.dropout(attn, p=self.dropout)
out = (attn @ v).transpose(1, 2).contiguous()
return self.o_proj(out.view(B, L, -1))
# ============================================================================
# Block Components
# ============================================================================
class GriffinBlock(nn.Module):
"""RNN-based block using RGLRU."""
def __init__(self, d_model: int, ff_mult: int = 3):
super().__init__()
self.norm1 = RMSNorm(d_model)
self.rglru = RGLRU(d_model)
self.norm2 = RMSNorm(d_model)
self.ffn = GeGLU(d_model, d_model * ff_mult)
def forward(self, x: torch.Tensor) -> torch.Tensor:
rnn_out, _ = self.rglru(self.norm1(x))
x = x + rnn_out
x = x + self.ffn(self.norm2(x))
return x
class AttentionBlock(nn.Module):
"""Attention-based block using GQA."""
def __init__(
self, d_model: int, n_heads: int = 8, n_kv_heads: int = 2, ff_mult: int = 3
):
super().__init__()
self.norm1 = RMSNorm(d_model)
self.attn = GQAttention(d_model, n_heads, n_kv_heads)
self.norm2 = RMSNorm(d_model)
self.ffn = GeGLU(d_model, d_model * ff_mult)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x + self.attn(self.norm1(x))
x = x + self.ffn(self.norm2(x))
return x
# ============================================================================
# Main Model
# ============================================================================
class HybriKoPreTrainedModel(PreTrainedModel):
"""Base class for HybriKo models."""
config_class = HybriKoConfig
base_model_prefix = "hybridko"
supports_gradient_checkpointing = True
def _init_weights(self, module):
if isinstance(module, nn.Linear):
nn.init.normal_(module.weight, std=0.02)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
nn.init.normal_(module.weight, std=0.02)
class HybriKoModel(HybriKoPreTrainedModel):
"""HybriKo: Hybrid RNN-Attention Language Model for Korean.
Uses a 2:1 ratio of RNN (Griffin) blocks to Attention blocks.
- Layers 1, 2: GriffinBlock (RNN)
- Layer 3: AttentionBlock
- Pattern repeats...
"""
def __init__(self, config: HybriKoConfig):
super().__init__(config)
self.config = config
self.gradient_checkpointing = False
# Token embedding
self.embed = nn.Embedding(config.vocab_size, config.d_model)
# Hybrid layers: 2 RNN : 1 Attention pattern
self.layers = nn.ModuleList()
for i in range(config.n_layers):
if (i + 1) % 3 == 0: # Every 3rd layer is Attention
self.layers.append(
AttentionBlock(
config.d_model, config.n_heads, config.n_kv_heads, config.ff_mult
)
)
else: # RNN blocks
self.layers.append(GriffinBlock(config.d_model, config.ff_mult))
# Final normalization and LM head
self.norm = RMSNorm(config.d_model)
self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False)
# Weight tying
self.lm_head.weight = self.embed.weight
# Initialize weights
self.post_init()
def _forward_layer(self, layer: nn.Module, x: torch.Tensor) -> torch.Tensor:
"""Forward pass through a single layer (for checkpointing)."""
return layer(x)
def forward(
self,
input_ids: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
return_dict: bool = True,
**kwargs
) -> Union[Dict[str, Any], CausalLMOutputWithPast]:
"""Forward pass.
Args:
input_ids: Token IDs [batch, seq_len]
attention_mask: Attention mask (unused for causal LM, for HF compatibility)
labels: Target token IDs for loss computation
return_dict: Whether to return a dict or CausalLMOutputWithPast
Returns:
CausalLMOutputWithPast or dict with 'logits' and optionally 'loss'
"""
x = self.embed(input_ids)
for layer in self.layers:
if self.gradient_checkpointing and self.training:
x = checkpoint(
self._forward_layer,
layer,
x,
use_reentrant=False,
)
else:
x = layer(x)
x = self.norm(x)
logits = self.lm_head(x)
loss = None
if labels is not None:
loss = F.cross_entropy(
logits[:, :-1].contiguous().view(-1, self.config.vocab_size),
labels[:, 1:].contiguous().view(-1),
ignore_index=-100,
)
if return_dict:
return CausalLMOutputWithPast(
loss=loss,
logits=logits,
)
return {"logits": logits, "loss": loss}
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor,
max_new_tokens: int = 50,
temperature: float = 0.8,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> torch.Tensor:
"""Generate text tokens.
Args:
input_ids: Prompt token IDs [batch, seq_len]
max_new_tokens: Number of tokens to generate
temperature: Sampling temperature
top_k: If set, only sample from top k tokens
top_p: If set, use nucleus sampling with this probability
Returns:
Generated token IDs including prompt
"""
self.eval()
for _ in range(max_new_tokens):
idx = input_ids[:, -self.config.max_seq_len:]
outputs = self(idx)
logits = outputs.logits[:, -1] / temperature
# Apply top-k filtering
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = float("-inf")
# Apply top-p (nucleus) filtering
if top_p is not None:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
sorted_indices_to_remove[:, 0] = 0
indices_to_remove = sorted_indices_to_remove.scatter(
1, sorted_indices, sorted_indices_to_remove
)
logits[indices_to_remove] = float("-inf")
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
input_ids = torch.cat([input_ids, next_token], dim=1)
return input_ids
def get_num_params(self, non_embedding: bool = True) -> int:
"""Return the number of parameters in the model."""
n_params = sum(p.numel() for p in self.parameters())
if non_embedding:
n_params -= self.embed.weight.numel()
return n_params
# Register for AutoModel
HybriKoConfig.register_for_auto_class()
HybriKoModel.register_for_auto_class("AutoModel")
HybriKoModel.register_for_auto_class("AutoModelForCausalLM")