|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import sys |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from safetensors.torch import load_file |
|
|
from tokenizers import Tokenizer |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class ModelConfig: |
|
|
vocab_size: int = 32000 |
|
|
hidden_size: int = 768 |
|
|
intermediate_size: int = 2048 |
|
|
num_hidden_layers: int = 12 |
|
|
num_attention_heads: int = 12 |
|
|
num_key_value_heads: int = 4 |
|
|
rms_norm_eps: float = 1e-5 |
|
|
max_position_embeddings: int = 1024 |
|
|
rope_theta: float = 10000.0 |
|
|
attention_dropout: float = 0.0 |
|
|
attn_window: int = 0 |
|
|
attn_block_size: int = 256 |
|
|
initializer_range: float = 0.02 |
|
|
tie_word_embeddings: bool = True |
|
|
pad_token_id: int = 0 |
|
|
bos_token_id: int = 2 |
|
|
eos_token_id: int = 3 |
|
|
|
|
|
@property |
|
|
def head_dim(self) -> int: |
|
|
return self.hidden_size // self.num_attention_heads |
|
|
|
|
|
|
|
|
class RMSNorm(nn.Module): |
|
|
def __init__(self, dim: int, eps: float): |
|
|
super().__init__() |
|
|
self.eps = eps |
|
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
orig_dtype = x.dtype |
|
|
x = x.float() |
|
|
var = x.pow(2).mean(dim=-1, keepdim=True) |
|
|
x = x * torch.rsqrt(var + self.eps) |
|
|
return (x.to(orig_dtype)) * self.weight |
|
|
|
|
|
|
|
|
class RotaryEmbedding(nn.Module): |
|
|
def __init__(self, head_dim: int, max_pos: int, theta: float): |
|
|
super().__init__() |
|
|
self.head_dim = head_dim |
|
|
inv_freq = 1.0 / (theta ** (torch.arange(0, head_dim, 2).float() / head_dim)) |
|
|
t = torch.arange(max_pos, dtype=inv_freq.dtype) |
|
|
freqs = torch.einsum("i,j->ij", t, inv_freq) |
|
|
emb = torch.cat([freqs, freqs], dim=-1) |
|
|
self.register_buffer("_cos", emb.cos(), persistent=False) |
|
|
self.register_buffer("_sin", emb.sin(), persistent=False) |
|
|
|
|
|
def forward(self, q: torch.Tensor, k: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: |
|
|
b, h, t, hd = q.shape |
|
|
cos = self._cos[:t].to(q.dtype).unsqueeze(0).unsqueeze(0) |
|
|
sin = self._sin[:t].to(q.dtype).unsqueeze(0).unsqueeze(0) |
|
|
|
|
|
def rotate_half(x: torch.Tensor) -> torch.Tensor: |
|
|
x1 = x[..., : hd // 2] |
|
|
x2 = x[..., hd // 2 :] |
|
|
return torch.cat([-x2, x1], dim=-1) |
|
|
|
|
|
q_out = (q * cos) + (rotate_half(q) * sin) |
|
|
k_out = (k * cos) + (rotate_half(k) * sin) |
|
|
return q_out, k_out |
|
|
|
|
|
|
|
|
class LlamaMLP(nn.Module): |
|
|
def __init__(self, cfg: ModelConfig): |
|
|
super().__init__() |
|
|
self.gate_proj = nn.Linear(cfg.hidden_size, cfg.intermediate_size, bias=False) |
|
|
self.up_proj = nn.Linear(cfg.hidden_size, cfg.intermediate_size, bias=False) |
|
|
self.down_proj = nn.Linear(cfg.intermediate_size, cfg.hidden_size, bias=False) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
return self.down_proj(F.silu(self.gate_proj(x)) * self.up_proj(x)) |
|
|
|
|
|
|
|
|
class LlamaAttention(nn.Module): |
|
|
def __init__(self, cfg: ModelConfig): |
|
|
super().__init__() |
|
|
self.cfg = cfg |
|
|
self.num_heads = cfg.num_attention_heads |
|
|
self.num_kv_heads = cfg.num_key_value_heads |
|
|
self.head_dim = cfg.head_dim |
|
|
self.kv_repeat = self.num_heads // self.num_kv_heads |
|
|
self.q_proj = nn.Linear(cfg.hidden_size, self.num_heads * self.head_dim, bias=False) |
|
|
self.k_proj = nn.Linear(cfg.hidden_size, self.num_kv_heads * self.head_dim, bias=False) |
|
|
self.v_proj = nn.Linear(cfg.hidden_size, self.num_kv_heads * self.head_dim, bias=False) |
|
|
self.o_proj = nn.Linear(cfg.hidden_size, cfg.hidden_size, bias=False) |
|
|
self.rotary = RotaryEmbedding(self.head_dim, cfg.max_position_embeddings, cfg.rope_theta) |
|
|
self.attn_dropout = float(cfg.attention_dropout) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
b, t, d = x.shape |
|
|
q = self.q_proj(x).view(b, t, self.num_heads, self.head_dim).transpose(1, 2) |
|
|
k = self.k_proj(x).view(b, t, self.num_kv_heads, self.head_dim).transpose(1, 2) |
|
|
v = self.v_proj(x).view(b, t, self.num_kv_heads, self.head_dim).transpose(1, 2) |
|
|
q, k = self.rotary(q, k) |
|
|
if self.kv_repeat != 1: |
|
|
k = k.repeat_interleave(self.kv_repeat, dim=1) |
|
|
v = v.repeat_interleave(self.kv_repeat, dim=1) |
|
|
y = F.scaled_dot_product_attention( |
|
|
q, k, v, |
|
|
attn_mask=None, |
|
|
dropout_p=self.attn_dropout if self.training else 0.0, |
|
|
is_causal=True, |
|
|
) |
|
|
y = y.transpose(1, 2).contiguous().view(b, t, d) |
|
|
return self.o_proj(y) |
|
|
|
|
|
|
|
|
class LlamaDecoderLayer(nn.Module): |
|
|
def __init__(self, cfg: ModelConfig): |
|
|
super().__init__() |
|
|
self.input_layernorm = RMSNorm(cfg.hidden_size, cfg.rms_norm_eps) |
|
|
self.self_attn = LlamaAttention(cfg) |
|
|
self.post_attention_layernorm = RMSNorm(cfg.hidden_size, cfg.rms_norm_eps) |
|
|
self.mlp = LlamaMLP(cfg) |
|
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
|
x = x + self.self_attn(self.input_layernorm(x)) |
|
|
x = x + self.mlp(self.post_attention_layernorm(x)) |
|
|
return x |
|
|
|
|
|
|
|
|
class LlamaModel(nn.Module): |
|
|
def __init__(self, cfg: ModelConfig): |
|
|
super().__init__() |
|
|
self.cfg = cfg |
|
|
self.embed_tokens = nn.Embedding(cfg.vocab_size, cfg.hidden_size, padding_idx=cfg.pad_token_id) |
|
|
self.layers = nn.ModuleList([LlamaDecoderLayer(cfg) for _ in range(cfg.num_hidden_layers)]) |
|
|
self.norm = RMSNorm(cfg.hidden_size, cfg.rms_norm_eps) |
|
|
|
|
|
def forward(self, input_ids: torch.Tensor) -> torch.Tensor: |
|
|
x = self.embed_tokens(input_ids) |
|
|
for layer in self.layers: |
|
|
x = layer(x) |
|
|
x = self.norm(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class MonostichForCausalLM(nn.Module): |
|
|
def __init__(self, cfg: ModelConfig): |
|
|
super().__init__() |
|
|
self.config = cfg |
|
|
self.model = LlamaModel(cfg) |
|
|
self.lm_head = nn.Linear(cfg.hidden_size, cfg.vocab_size, bias=False) |
|
|
if cfg.tie_word_embeddings: |
|
|
self.lm_head.weight = self.model.embed_tokens.weight |
|
|
|
|
|
def forward(self, input_ids: torch.Tensor) -> torch.Tensor: |
|
|
x = self.model(input_ids) |
|
|
return self.lm_head(x) |
|
|
|
|
|
|
|
|
def _apply_repetition_penalty(logits: torch.Tensor, token_ids: list[int], penalty: float) -> torch.Tensor: |
|
|
if penalty == 1.0 or not token_ids: |
|
|
return logits |
|
|
unique = torch.tensor(list(set(token_ids)), dtype=torch.long, device=logits.device) |
|
|
score = logits[unique] |
|
|
score = torch.where(score > 0, score / penalty, score * penalty) |
|
|
logits[unique] = score |
|
|
return logits |
|
|
|
|
|
|
|
|
def sample_next_id(logits: torch.Tensor, temperature: float, top_p: float, top_k: int, generator: torch.Generator) -> int: |
|
|
if temperature <= 0: |
|
|
return int(torch.argmax(logits).item()) |
|
|
logits = logits / float(temperature) |
|
|
if top_k and top_k > 0: |
|
|
v, ix = torch.topk(logits, k=int(top_k)) |
|
|
probs = torch.softmax(v, dim=-1) |
|
|
idx = torch.multinomial(probs, num_samples=1, generator=generator).item() |
|
|
return int(ix[idx].item()) |
|
|
probs = torch.softmax(logits, dim=-1) |
|
|
if top_p >= 1.0: |
|
|
return int(torch.multinomial(probs, num_samples=1, generator=generator).item()) |
|
|
sorted_probs, sorted_ix = torch.sort(probs, descending=True) |
|
|
cdf = torch.cumsum(sorted_probs, dim=-1) |
|
|
mask = cdf <= float(top_p) |
|
|
mask[0] = True |
|
|
filtered_probs = sorted_probs[mask] |
|
|
filtered_ix = sorted_ix[mask] |
|
|
filtered_probs = filtered_probs / filtered_probs.sum() |
|
|
idx = torch.multinomial(filtered_probs, num_samples=1, generator=generator).item() |
|
|
return int(filtered_ix[idx].item()) |
|
|
|
|
|
|
|
|
def _render_chat(messages: list[tuple[str, str]], add_generation_prompt: bool) -> str: |
|
|
BOS, EOS = "<|bos|>", "<|eos|>" |
|
|
START, END = "<|start_header_id|>", "<|end_header_id|>" |
|
|
NL2 = "\n\n" |
|
|
out = [] |
|
|
for role, content in messages: |
|
|
r = (role or "").strip().lower() |
|
|
if r not in {"user", "assistant"}: |
|
|
continue |
|
|
c = (content or "").strip() |
|
|
if not c: |
|
|
continue |
|
|
if not out: |
|
|
out.append(f"{BOS}{START}{r}{END}{NL2}{c}{EOS}") |
|
|
else: |
|
|
out.append(f"{START}{r}{END}{NL2}{c}{EOS}") |
|
|
if add_generation_prompt: |
|
|
out.append(f"{START}assistant{END}{NL2}") |
|
|
return "".join(out) |
|
|
|
|
|
|
|
|
REPO_ID = "kerzgrr/monostich" |
|
|
|
|
|
|
|
|
def _download_file(filename: str) -> Path: |
|
|
from huggingface_hub import hf_hub_download |
|
|
return Path(hf_hub_download(repo_id=REPO_ID, filename=filename)) |
|
|
|
|
|
|
|
|
def main() -> int: |
|
|
ap = argparse.ArgumentParser() |
|
|
ap.add_argument("--prompt", default=None) |
|
|
ap.add_argument("--max-new-tokens", type=int, default=None) |
|
|
ap.add_argument("--temperature", type=float, default=0.28) |
|
|
ap.add_argument("--top-p", type=float, default=0.95) |
|
|
ap.add_argument("--top-k", type=int, default=0) |
|
|
ap.add_argument("--repetition-penalty", type=float, default=1.2) |
|
|
ap.add_argument("--seed", type=int, default=1234) |
|
|
ap.add_argument("--device", default="cuda", choices=["cuda", "cpu"]) |
|
|
args = ap.parse_args() |
|
|
|
|
|
print(f"Loading model from huggingface.co/{REPO_ID} ...", flush=True) |
|
|
weights_path = _download_file("monostich.safetensors") |
|
|
tok_path = _download_file("tokenizer.json") |
|
|
cfg_path = _download_file("config.json") |
|
|
|
|
|
torch.manual_seed(args.seed) |
|
|
if args.device == "cuda": |
|
|
torch.cuda.manual_seed_all(args.seed) |
|
|
|
|
|
tok = Tokenizer.from_file(str(tok_path)) |
|
|
raw = json.loads(cfg_path.read_text(encoding="utf-8")) |
|
|
cfg = ModelConfig( |
|
|
vocab_size=int(raw["vocab_size"]), |
|
|
hidden_size=int(raw["hidden_size"]), |
|
|
intermediate_size=int(raw["intermediate_size"]), |
|
|
num_hidden_layers=int(raw["num_hidden_layers"]), |
|
|
num_attention_heads=int(raw["num_attention_heads"]), |
|
|
num_key_value_heads=int(raw["num_key_value_heads"]), |
|
|
rms_norm_eps=float(raw.get("rms_norm_eps", 1e-5)), |
|
|
max_position_embeddings=int(raw.get("max_position_embeddings", 1024)), |
|
|
rope_theta=float(raw.get("rope_theta", 10000.0)), |
|
|
attention_dropout=float(raw.get("attention_dropout", 0.0)), |
|
|
attn_window=int(raw.get("attn_window", 0) or 0), |
|
|
attn_block_size=int(raw.get("attn_block_size", 256) or 256), |
|
|
tie_word_embeddings=bool(raw.get("tie_word_embeddings", True)), |
|
|
pad_token_id=int(raw.get("pad_token_id", 0)), |
|
|
bos_token_id=int(raw.get("bos_token_id", 2)), |
|
|
eos_token_id=int(raw.get("eos_token_id", 3)), |
|
|
) |
|
|
|
|
|
device = torch.device(args.device) |
|
|
dtype = torch.bfloat16 |
|
|
model = MonostichForCausalLM(cfg) |
|
|
model.load_state_dict(load_file(str(weights_path)), strict=True) |
|
|
model.to(device=device, dtype=dtype) |
|
|
model.eval() |
|
|
|
|
|
eos_id = cfg.eos_token_id |
|
|
max_ctx = cfg.max_position_embeddings |
|
|
g = torch.Generator(device=device) |
|
|
g.manual_seed(args.seed) |
|
|
max_new = args.max_new_tokens if args.max_new_tokens is not None else max_ctx |
|
|
|
|
|
rep_pen = float(args.repetition_penalty) |
|
|
|
|
|
def generate(prompt_ids: list[int], stream: bool = False) -> tuple[str, int]: |
|
|
generated = list(prompt_ids) |
|
|
out_ids = [] |
|
|
with torch.no_grad(): |
|
|
for _ in range(max_new): |
|
|
ctx = generated[-max_ctx:] |
|
|
x = torch.tensor(ctx, device=device, dtype=torch.long).unsqueeze(0) |
|
|
with torch.autocast(device_type=str(device.type), dtype=dtype) if device.type == "cuda" else torch.no_grad(): |
|
|
logits = model(x) |
|
|
next_logits = _apply_repetition_penalty(logits[0, -1, :].float(), generated, rep_pen) |
|
|
next_id = sample_next_id(next_logits, args.temperature, args.top_p, args.top_k, g) |
|
|
generated.append(next_id) |
|
|
if next_id == eos_id: |
|
|
break |
|
|
out_ids.append(next_id) |
|
|
if stream: |
|
|
print(tok.decode([next_id], skip_special_tokens=False), end="", flush=True) |
|
|
text = tok.decode(out_ids, skip_special_tokens=False) |
|
|
if stream: |
|
|
print() |
|
|
return text, len(out_ids) |
|
|
|
|
|
if args.prompt is not None: |
|
|
hist = [("user", args.prompt)] |
|
|
prompt_text = _render_chat(hist, add_generation_prompt=True) |
|
|
enc = tok.encode(prompt_text, add_special_tokens=False) |
|
|
text, _ = generate(list(enc.ids)) |
|
|
print(text) |
|
|
return 0 |
|
|
|
|
|
print("Interactive chat. /exit to quit, /reset to clear history.", flush=True) |
|
|
history: list[tuple[str, str]] = [] |
|
|
while True: |
|
|
try: |
|
|
user_input = input("user> ").strip() |
|
|
except EOFError: |
|
|
break |
|
|
if not user_input: |
|
|
continue |
|
|
if user_input.lower() in ("/exit", "/quit"): |
|
|
break |
|
|
if user_input.lower() == "/reset": |
|
|
history = [] |
|
|
continue |
|
|
|
|
|
hist = history + [("user", user_input)] |
|
|
prompt_text = _render_chat(hist, add_generation_prompt=True) |
|
|
prompt_ids = list(tok.encode(prompt_text, add_special_tokens=False).ids) |
|
|
while len(prompt_ids) >= max_ctx and len(hist) > 1: |
|
|
hist = hist[1:] |
|
|
if hist and hist[0][0] == "assistant": |
|
|
hist = hist[1:] |
|
|
prompt_text = _render_chat(hist, add_generation_prompt=True) |
|
|
prompt_ids = list(tok.encode(prompt_text, add_special_tokens=False).ids) |
|
|
|
|
|
print("assistant> ", end="", flush=True) |
|
|
text, _ = generate(prompt_ids, stream=True) |
|
|
history = hist + [("assistant", text)] |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|