File size: 3,818 Bytes
81ad4cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
Smoke model using Qwen3 architecture. Used for testing purposes only, model outputs random text.

Creating using the below script (note script has not been cleaned up):
```python
import json
import os
import tempfile

import torch
from tokenizers import Tokenizer
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    Qwen2TokenizerFast,
    Qwen3Config,
    Qwen3ForCausalLM,
)

source_model = "Qwen/Qwen3-8B"
output_path = "./scrap/qwen3_smoke"
vocab_keep_items = 1024


##### Tokenizer ######
# Reduce vocabulary size, while maintaining special tokens

num_added_tokens_to_keep = 26
tokenizer = AutoTokenizer.from_pretrained(
    source_model, use_fast=True, model_max_length=2048
)
assert tokenizer.is_fast, "This only works for fast tokenizers."
tokenizer_json = json.loads(tokenizer._tokenizer.to_str())
vocab = tokenizer_json["model"]["vocab"]

assert tokenizer_json["model"]["type"] == "BPE"
new_vocab = {token: i for token, i in vocab.items() if i < vocab_keep_items}
merges = tokenizer_json["model"]["merges"]
new_merges = []
for i in range(len(merges)):
    a, b = merges[i]
    new_token = "".join((a, b))
    if a in new_vocab and b in new_vocab and new_token in new_vocab:
        new_merges.append(merges[i])
tokenizer_json["model"]["merges"] = new_merges
tokenizer_json["model"]["vocab"] = new_vocab

new_added_tokens = []
for i in range(num_added_tokens_to_keep):
    added_token = tokenizer_json["added_tokens"][i]
    added_token["id"] = vocab_keep_items + i
    new_added_tokens.append(added_token)


tokenizer_json["added_tokens"] = new_added_tokens

added_map = {token["content"]: token["id"] for token in new_added_tokens}

if "processors" in tokenizer_json["post_processor"]:
    tokenizer_json["post_processor"]["processors"][-1]["special_tokens"][
        "<|begin_of_text|>"
    ]["ids"] = [vocab_keep_items]

dir = tempfile.mkdtemp()
vocab_file = dir + "/vocab.json"
merges_file = dir + "/merges.txt"

with open(vocab_file, "wt") as f:
    json.dump(new_vocab, f)

with open(merges_file, "wt") as f:
    for a, b in new_merges:
        f.write(f"{a} {b}\n")

tokenizer = Qwen2TokenizerFast(
    vocab_file, merges_file, added_tokens_decoder=tokenizer.added_tokens_decoder
)


# tokenizer = AutoTokenizer.from_pretrained(source_model)
tokenizer.save_pretrained(output_path)

##### Model #####
# Reduce weight size and copy weights from a real llama model, so that weight distribution matches

weight_source_llama = AutoModelForCausalLM.from_pretrained(source_model)

weight_source_llama_dict = dict(weight_source_llama.named_parameters())

new_config = Qwen3Config(
    vocab_size=vocab_keep_items + num_added_tokens_to_keep,
    hidden_size=64,
    num_attention_heads=16,
    num_hidden_layers=6,
    num_key_value_heads=8,
    intermediate_size=128,
    tie_word_embeddings=True,
)


def rec_setattr(obj, key, value):
    if "." in key:
        attr, rem_key = key.split(".", 1)
        rec_setattr(getattr(obj, attr), rem_key, value)
    else:
        setattr(obj, key, value)


new_model = Qwen3ForCausalLM(new_config)

for w_name, w_value in list(new_model.named_parameters()):
    if w_name == "lm_head.weight":
        continue
        # w_name = "model.embed_tokens.weight"
    elif w_name not in weight_source_llama_dict:
        raise ValueError(f"Couldn't find weight ref {w_name}")

    w = weight_source_llama_dict[w_name]

    slices = tuple(slice(0, n) for n in w_value.shape)
    if any(x < y for x, y in zip(w.shape, w_value.shape)):
        raise RuntimeError(f"Can't slice to size {w_name}")
    sliced_weight = w[slices].detach().clone()
    rec_setattr(new_model, w_name, torch.nn.Parameter(sliced_weight))

# Tie lm head to embed weights
# new_model.lm_head.weight = new_model.model.embed_tokens.weight

new_model.save_pretrained(output_path)
```