USE_TOKENIZERS = True if USE_TOKENIZERS: from tokenizers import Tokenizer import os else: from transformers import AutoTokenizer, PreTrainedTokenizerFast from transformers.tokenization_utils_base import AddedToken from http.server import HTTPServer, BaseHTTPRequestHandler import json import argparse class FastTokenizer(): bos_token = None eos_token = None bos_token_id = None eos_token_id = None tokenizer_dir = '' def __init__(self, tokenizer_dir): self.tokenizer_dir = tokenizer_dir # Load tokenizer tokenizer_json_path = os.path.join(tokenizer_dir, "tokenizer.json") if os.path.exists(tokenizer_json_path): self.tokenizer = Tokenizer.from_file(tokenizer_json_path) else: raise ValueError("Unable to load tokenizer from tokenizer_dir" \ "The model loaded with BPE still has issues during tokenization." \ "You can use the save() function to convert the tokenizer configuration into the tokenizer.json format.") # FIXME: The model loaded with BPE still has issues during tokenization. # You can use the save() function to convert the tokenizer configuration into the tokenizer.json format. vocab_path = os.path.join(tokenizer_dir, "vocab.json") merges_path = os.path.join(tokenizer_dir, "merges.txt") self.tokenizer = CharBPETokenizer(vocab_path, merges_path) added_token_path = os.path.join(tokenizer_dir, "added_tokens.json") with open(added_token_path, 'r', encoding='utf-8') as f: added_tokens_dict = json.load(f) sorted_dict_desc = dict(sorted(added_tokens_dict.items(), key=lambda x: x[1], reverse=False)) added_tokens_list = list(sorted_dict_desc.keys()) self.tokenizer.add_tokens(added_tokens_list) # Load tokenizer config config_path = os.path.join(tokenizer_dir, "tokenizer_config.json") with open(config_path, 'r', encoding='utf-8') as f: self.config = json.load(f) self.bos_token = self.config.get("bos_token", None) if isinstance(self.bos_token, dict): self.bos_token = self.bos_token.get("content", None) elif isinstance(self.bos_token, str): pass else: self.bos_token = None self.eos_token = self.config.get("eos_token", None) if isinstance(self.eos_token, dict): self.eos_token = self.eos_token.get("content", None) elif isinstance(self.eos_token, str): pass else: self.eos_token = None if self.bos_token is not None: self.bos_token_id = self.tokenizer.encode(self.bos_token).ids[0] if self.eos_token is not None: self.eos_token_id = self.tokenizer.encode(self.eos_token).ids[0] def encode(self, content, **kwargs): return self.tokenizer.encode(content).ids def decode(self, token_ids, clean_up_tokenization_spaces=False): # clean_up_tokenization_spaces is unused in this case, so we can ignore it return self.tokenizer.decode(token_ids, skip_special_tokens=False) def apply_chat_template(self, messages, **kwargs): text = "" if "deepseek" in self.tokenizer_dir: text = "<|begin▁of▁sentence|>" for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": text += f"<|begin▁of▁sentence|>{content}" elif role == "user": text += f"<|User|>{content}" elif role == "assistant": text += f"<|Assistant|>{content}" text += f"<|Assistant|>" else: for msg in messages: role = msg.get("role", "") content = msg.get("content", "") text += f"<|im_start|>{role}\n{content}<|im_end|>\n" text += f"<|im_start|>assistant\n" return text def save(self, path, output_path): self.tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=True) self.tokenizer.save_pretrained(output_path) def _prompt_split_image( image_seq_len, image_rows, image_cols, fake_token_around_image, image_token, global_img_token, ): """Prompt with expanded image tokens for when the image is split into patches.""" text_split_images = "" for n_h in range(image_rows): for n_w in range(image_cols): text_split_images += ( f"{fake_token_around_image}" + f"" + f"{image_token}" * image_seq_len ) text_split_images += "\n" text_split_images += ( f"\n{fake_token_around_image}" + f"{global_img_token}" + f"{image_token}" * image_seq_len + f"{fake_token_around_image}" ) return text_split_images def _prompt_single_image( image_seq_len, fake_token_around_image, image_token, global_img_token ): """Prompt with expanded image tokens for a single image.""" return ( f"{fake_token_around_image}" + f"{global_img_token}" + f"{image_token}" * image_seq_len + f"{fake_token_around_image}" ) def get_image_prompt_string( image_rows, image_cols, image_seq_len, fake_token_around_image, image_token, global_img_token, ): if image_rows == 0 and image_cols == 0: return _prompt_single_image( image_seq_len, fake_token_around_image=fake_token_around_image, image_token=image_token, global_img_token=global_img_token, ) return _prompt_split_image( image_seq_len, image_rows, image_cols, fake_token_around_image, image_token, global_img_token, ) class Tokenizer_Http(): def __init__(self): self.token_ids_cache = [] path = 'qwen3-vl-tokenizer' if USE_TOKENIZERS: self.tokenizer = FastTokenizer(path) else: self.tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False) def encode(self, content): text = [f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{content}<|im_end|>\n<|im_start|>assistant\n'] if USE_TOKENIZERS: input_ids = self.tokenizer.encode(text[0]) return input_ids else: input_ids = self.tokenizer(text) return input_ids["input_ids"][0] def encode_vpm(self, content="Describe this image.", num_img=1, img_token_num=256, video_prompt=False): # official implementation if video_prompt: pad_token = '<|video_pad|>' else: pad_token = '<|image_pad|>' imgs_token = '<|vision_start|>' + pad_token*img_token_num*num_img + '<|vision_end|>' text = f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{imgs_token}{content}<|im_end|>\n<|im_start|>assistant\n' output_kwargs = {'text_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'images_kwargs': {'return_tensors': 'pt'}, 'audio_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'videos_kwargs': {'fps': 2.0, 'return_tensors': 'pt'}, 'common_kwargs': {'return_tensors': 'pt'}} if USE_TOKENIZERS: input_ids = self.tokenizer.encode(text) return input_ids else: text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) return text_inputs["input_ids"].tolist()[0] def decode(self, token_ids): self.token_ids_cache += token_ids text = self.tokenizer.decode(self.token_ids_cache) if "\ufffd" in text and len(self.token_ids_cache) < 9: print("text 中包含非法字符") return "" else: self.token_ids_cache.clear() return text.replace("\ufffd","") # def decode(self, token_ids): # return self.tokenizer.decode(token_ids, # clean_up_tokenization_spaces=False) @property def bos_id(self): return self.tokenizer.bos_token_id @property def eos_id(self): return self.tokenizer.eos_token_id @property def bos_token(self): return self.tokenizer.bos_token @property def eos_token(self): return self.tokenizer.eos_token @property def img_start_token(self): return self.tokenizer.encode("<|vision_start|>")[0] @property def img_context_token(self): return self.tokenizer.encode("<|image_pad|>")[0] @property def video_context_token(self): return self.tokenizer.encode("<|video_pad|>")[0] tokenizer = Tokenizer_Http() print(tokenizer.bos_id, tokenizer.bos_token, tokenizer.eos_id, tokenizer.eos_token) token_ids = tokenizer.encode_vpm() # [151644, 8948, 198, 56568, 104625, 100633, 104455, 104800, 101101, 32022, 102022, 99602, 100013, 9370, 90286, 21287, 42140, 53772, 35243, 26288, 104949, 3837, 105205, 109641, 67916, 30698, 11, 54851, 46944, 115404, 42192, 99441, 100623, 48692, 100168, 110498, 1773, 151645, 151644, 872, 198, # 151646, # 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, 151648, # 151647, # 198, 5501, 7512, 279, 2168, 19620, 13, 151645, 151644, 77091, 198] # 118 print(token_ids) print(len(token_ids)) token_ids = tokenizer.encode("hello world") # [151644, 8948, 198, 56568, 104625, 100633, 104455, 104800, 101101, 32022, 102022, 99602, 100013, 9370, 90286, 21287, 42140, 53772, 35243, 26288, 104949, 3837, 105205, 109641, 67916, 30698, 11, 54851, 46944, 115404, 42192, 99441, 100623, 48692, 100168, 110498, 1773, 151645, 151644, 872, 198, 14990, 1879, 151645, 151644, 77091, 198] # 47 print(token_ids) print(len(token_ids)) class Request(BaseHTTPRequestHandler): #通过类继承,新定义类 timeout = 5 server_version = 'Apache' def do_GET(self): print(self.path) #在新类中定义get的内容(当客户端向该服务端使用get请求时,本服务端将如下运行) self.send_response(200) self.send_header("type", "get") #设置响应头,可省略或设置多个 self.end_headers() if self.path == '/bos_id': bos_id = tokenizer.bos_id # print(bos_id) # to json if bos_id is None: msg = json.dumps({'bos_id': -1}) else: msg = json.dumps({'bos_id': bos_id}) elif self.path == '/eos_id': eos_id = tokenizer.eos_id if eos_id is None: msg = json.dumps({'eos_id': -1}) else: msg = json.dumps({'eos_id': eos_id}) elif self.path == '/img_start_token': img_start_token = tokenizer.img_start_token if img_start_token is None: msg = json.dumps({'img_start_token': -1}) else: msg = json.dumps({'img_start_token': img_start_token}) elif self.path == '/img_context_token': img_context_token = tokenizer.img_context_token if img_context_token is None: msg = json.dumps({'img_context_token': -1}) else: msg = json.dumps({'img_context_token': img_context_token}) elif self.path == '/video_context_token': video_context_token = tokenizer.video_context_token if video_context_token is None: msg = json.dumps({'video_context_token': -1}) else: msg = json.dumps({'video_context_token': video_context_token}) else: msg = 'error' print(msg) msg = str(msg).encode() #转为str再转为byte格式 self.wfile.write(msg) #将byte格式的信息返回给客户端 def do_POST(self): #在新类中定义post的内容(当客户端向该服务端使用post请求时,本服务端将如下运行) data = self.rfile.read(int( self.headers['content-length'])) #获取从客户端传入的参数(byte格式) data = data.decode() #将byte格式转为str格式 self.send_response(200) self.send_header("type", "post") #设置响应头,可省略或设置多个 self.end_headers() if self.path == '/encode': req = json.loads(data) print(req) prompt = req['text'] b_img_prompt = False if 'img_prompt' in req: b_img_prompt = req['img_prompt'] if b_img_prompt: token_ids = tokenizer.encode_vpm(prompt, req["num_img"], req["img_token_num"], req["video_prompt"]) else: token_ids = tokenizer.encode(prompt) if token_ids is None: msg = json.dumps({'token_ids': -1}) else: msg = json.dumps({'token_ids': token_ids}) elif self.path == '/decode': req = json.loads(data) token_ids = req['token_ids'] text = tokenizer.decode(token_ids) if text is None: msg = json.dumps({'text': ""}) else: msg = json.dumps({'text': text}) else: msg = 'error' print(msg) msg = str(msg).encode() #转为str再转为byte格式 self.wfile.write(msg) #将byte格式的信息返回给客户端 if __name__ == "__main__": args = argparse.ArgumentParser() args.add_argument('--host', type=str, default='localhost') args.add_argument('--port', type=int, default=8080) args = args.parse_args() host = (args.host, args.port) #设定地址与端口号,'localhost'等价于'127.0.0.1' print('http://%s:%s' % host) server = HTTPServer(host, Request) #根据地址端口号和新定义的类,创建服务器实例 server.serve_forever() #开启服务