| Smoke model using Qwen3 architecture. Used for testing purposes only, model outputs random text. |
|
|
| Creating using the below script (note script has not been cleaned up): |
| ```python |
| import json |
| import os |
| import tempfile |
| |
| import torch |
| from tokenizers import Tokenizer |
| from transformers import ( |
| AutoModelForCausalLM, |
| AutoTokenizer, |
| Qwen2TokenizerFast, |
| Qwen3Config, |
| Qwen3ForCausalLM, |
| ) |
| |
| source_model = "Qwen/Qwen3-8B" |
| output_path = "./scrap/qwen3_smoke" |
| vocab_keep_items = 1024 |
| |
| |
| ##### Tokenizer ###### |
| # Reduce vocabulary size, while maintaining special tokens |
| |
| num_added_tokens_to_keep = 26 |
| tokenizer = AutoTokenizer.from_pretrained( |
| source_model, use_fast=True, model_max_length=2048 |
| ) |
| assert tokenizer.is_fast, "This only works for fast tokenizers." |
| tokenizer_json = json.loads(tokenizer._tokenizer.to_str()) |
| vocab = tokenizer_json["model"]["vocab"] |
| |
| assert tokenizer_json["model"]["type"] == "BPE" |
| new_vocab = {token: i for token, i in vocab.items() if i < vocab_keep_items} |
| merges = tokenizer_json["model"]["merges"] |
| new_merges = [] |
| for i in range(len(merges)): |
| a, b = merges[i] |
| new_token = "".join((a, b)) |
| if a in new_vocab and b in new_vocab and new_token in new_vocab: |
| new_merges.append(merges[i]) |
| tokenizer_json["model"]["merges"] = new_merges |
| tokenizer_json["model"]["vocab"] = new_vocab |
| |
| new_added_tokens = [] |
| for i in range(num_added_tokens_to_keep): |
| added_token = tokenizer_json["added_tokens"][i] |
| added_token["id"] = vocab_keep_items + i |
| new_added_tokens.append(added_token) |
| |
| |
| tokenizer_json["added_tokens"] = new_added_tokens |
| |
| added_map = {token["content"]: token["id"] for token in new_added_tokens} |
| |
| if "processors" in tokenizer_json["post_processor"]: |
| tokenizer_json["post_processor"]["processors"][-1]["special_tokens"][ |
| "<|begin_of_text|>" |
| ]["ids"] = [vocab_keep_items] |
| |
| dir = tempfile.mkdtemp() |
| vocab_file = dir + "/vocab.json" |
| merges_file = dir + "/merges.txt" |
| |
| with open(vocab_file, "wt") as f: |
| json.dump(new_vocab, f) |
| |
| with open(merges_file, "wt") as f: |
| for a, b in new_merges: |
| f.write(f"{a} {b}\n") |
| |
| tokenizer = Qwen2TokenizerFast( |
| vocab_file, merges_file, added_tokens_decoder=tokenizer.added_tokens_decoder |
| ) |
| |
| |
| # tokenizer = AutoTokenizer.from_pretrained(source_model) |
| tokenizer.save_pretrained(output_path) |
| |
| ##### Model ##### |
| # Reduce weight size and copy weights from a real llama model, so that weight distribution matches |
| |
| weight_source_llama = AutoModelForCausalLM.from_pretrained(source_model) |
| |
| weight_source_llama_dict = dict(weight_source_llama.named_parameters()) |
| |
| new_config = Qwen3Config( |
| vocab_size=vocab_keep_items + num_added_tokens_to_keep, |
| hidden_size=64, |
| num_attention_heads=16, |
| num_hidden_layers=6, |
| num_key_value_heads=8, |
| intermediate_size=128, |
| tie_word_embeddings=True, |
| ) |
| |
| |
| def rec_setattr(obj, key, value): |
| if "." in key: |
| attr, rem_key = key.split(".", 1) |
| rec_setattr(getattr(obj, attr), rem_key, value) |
| else: |
| setattr(obj, key, value) |
| |
| |
| new_model = Qwen3ForCausalLM(new_config) |
| |
| for w_name, w_value in list(new_model.named_parameters()): |
| if w_name == "lm_head.weight": |
| continue |
| # w_name = "model.embed_tokens.weight" |
| elif w_name not in weight_source_llama_dict: |
| raise ValueError(f"Couldn't find weight ref {w_name}") |
| |
| w = weight_source_llama_dict[w_name] |
| |
| slices = tuple(slice(0, n) for n in w_value.shape) |
| if any(x < y for x, y in zip(w.shape, w_value.shape)): |
| raise RuntimeError(f"Can't slice to size {w_name}") |
| sliced_weight = w[slices].detach().clone() |
| rec_setattr(new_model, w_name, torch.nn.Parameter(sliced_weight)) |
| |
| # Tie lm head to embed weights |
| # new_model.lm_head.weight = new_model.model.embed_tokens.weight |
| |
| new_model.save_pretrained(output_path) |
| ``` |