| from transformers import AutoTokenizer, PreTrainedTokenizerFast |
| from transformers.tokenization_utils_base import AddedToken |
| from http.server import HTTPServer, BaseHTTPRequestHandler |
| import json |
| import argparse |
|
|
| def _prompt_split_image( |
| image_seq_len, |
| image_rows, |
| image_cols, |
| fake_token_around_image, |
| image_token, |
| global_img_token, |
| ): |
| """Prompt with expanded image tokens for when the image is split into patches.""" |
| text_split_images = "" |
| for n_h in range(image_rows): |
| for n_w in range(image_cols): |
| text_split_images += ( |
| f"{fake_token_around_image}" |
| + f"<row_{n_h + 1}_col_{n_w + 1}>" |
| + f"{image_token}" * image_seq_len |
| ) |
| text_split_images += "\n" |
|
|
| text_split_images += ( |
| f"\n{fake_token_around_image}" |
| + f"{global_img_token}" |
| + f"{image_token}" * image_seq_len |
| + f"{fake_token_around_image}" |
| ) |
| return text_split_images |
|
|
|
|
| def _prompt_single_image( |
| image_seq_len, fake_token_around_image, image_token, global_img_token |
| ): |
| """Prompt with expanded image tokens for a single image.""" |
| return ( |
| f"{fake_token_around_image}" |
| + f"{global_img_token}" |
| + f"{image_token}" * image_seq_len |
| + f"{fake_token_around_image}" |
| ) |
|
|
|
|
| def get_image_prompt_string( |
| image_rows, |
| image_cols, |
| image_seq_len, |
| fake_token_around_image, |
| image_token, |
| global_img_token, |
| ): |
| if image_rows == 0 and image_cols == 0: |
| return _prompt_single_image( |
| image_seq_len, |
| fake_token_around_image=fake_token_around_image, |
| image_token=image_token, |
| global_img_token=global_img_token, |
| ) |
| return _prompt_split_image( |
| image_seq_len, |
| image_rows, |
| image_cols, |
| fake_token_around_image, |
| image_token, |
| global_img_token, |
| ) |
|
|
| class Tokenizer_Http(): |
|
|
| def __init__(self): |
| self.token_ids_cache = [] |
| path = 'qwen3-vl-tokenizer' |
| self.tokenizer = AutoTokenizer.from_pretrained(path, |
| trust_remote_code=True, |
| use_fast=False) |
|
|
| def encode(self, content): |
| text = [f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{content}<|im_end|>\n<|im_start|>assistant\n'] |
| input_ids = self.tokenizer(text) |
| return input_ids["input_ids"][0] |
|
|
| def encode_vpm(self, content="Describe this image.", num_img=1, img_token_num=256, video_prompt=False): |
|
|
| |
| if video_prompt: |
| pad_token = '<|video_pad|>' |
| else: |
| pad_token = '<|image_pad|>' |
| imgs_token = '<|vision_start|>' + pad_token*img_token_num*num_img + '<|vision_end|>' |
| |
| text = f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{imgs_token}{content}<|im_end|>\n<|im_start|>assistant\n' |
| |
| output_kwargs = {'text_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'images_kwargs': {'return_tensors': 'pt'}, 'audio_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'videos_kwargs': {'fps': 2.0, 'return_tensors': 'pt'}, 'common_kwargs': {'return_tensors': 'pt'}} |
| |
| text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) |
| return text_inputs["input_ids"].tolist()[0] |
|
|
| def decode(self, token_ids): |
| self.token_ids_cache += token_ids |
| text = self.tokenizer.decode(self.token_ids_cache) |
| if "\ufffd" in text and len(self.token_ids_cache) < 9: |
| print("text 中包含非法字符") |
| return "" |
| else: |
| self.token_ids_cache.clear() |
| return text.replace("\ufffd","") |
| |
| |
| |
| |
|
|
| @property |
| def bos_id(self): |
| return self.tokenizer.bos_token_id |
|
|
| @property |
| def eos_id(self): |
| return self.tokenizer.eos_token_id |
|
|
| @property |
| def bos_token(self): |
| return self.tokenizer.bos_token |
|
|
| @property |
| def eos_token(self): |
| return self.tokenizer.eos_token |
|
|
| @property |
| def img_start_token(self): |
| return self.tokenizer.encode("<|vision_start|>")[0] |
|
|
| @property |
| def img_context_token(self): |
| return self.tokenizer.encode("<|image_pad|>")[0] |
| |
| @property |
| def video_context_token(self): |
| return self.tokenizer.encode("<|video_pad|>")[0] |
|
|
| tokenizer = Tokenizer_Http() |
|
|
| print(tokenizer.bos_id, tokenizer.bos_token, tokenizer.eos_id, |
| tokenizer.eos_token) |
| token_ids = tokenizer.encode_vpm() |
| |
| |
| |
| |
| |
| |
| print(token_ids) |
| print(len(token_ids)) |
| token_ids = tokenizer.encode("hello world") |
| |
| |
| print(token_ids) |
| print(len(token_ids)) |
|
|
|
|
| class Request(BaseHTTPRequestHandler): |
| |
| timeout = 5 |
| server_version = 'Apache' |
|
|
| def do_GET(self): |
| print(self.path) |
| |
| self.send_response(200) |
| self.send_header("type", "get") |
| self.end_headers() |
|
|
| if self.path == '/bos_id': |
| bos_id = tokenizer.bos_id |
| |
| |
| if bos_id is None: |
| msg = json.dumps({'bos_id': -1}) |
| else: |
| msg = json.dumps({'bos_id': bos_id}) |
| elif self.path == '/eos_id': |
| eos_id = tokenizer.eos_id |
| if eos_id is None: |
| msg = json.dumps({'eos_id': -1}) |
| else: |
| msg = json.dumps({'eos_id': eos_id}) |
| elif self.path == '/img_start_token': |
| img_start_token = tokenizer.img_start_token |
| if img_start_token is None: |
| msg = json.dumps({'img_start_token': -1}) |
| else: |
| msg = json.dumps({'img_start_token': img_start_token}) |
| elif self.path == '/img_context_token': |
| img_context_token = tokenizer.img_context_token |
| if img_context_token is None: |
| msg = json.dumps({'img_context_token': -1}) |
| else: |
| msg = json.dumps({'img_context_token': img_context_token}) |
| elif self.path == '/video_context_token': |
| video_context_token = tokenizer.video_context_token |
| if video_context_token is None: |
| msg = json.dumps({'video_context_token': -1}) |
| else: |
| msg = json.dumps({'video_context_token': video_context_token}) |
| else: |
| msg = 'error' |
|
|
| print(msg) |
| msg = str(msg).encode() |
|
|
| self.wfile.write(msg) |
|
|
| def do_POST(self): |
| |
| data = self.rfile.read(int( |
| self.headers['content-length'])) |
| data = data.decode() |
|
|
| self.send_response(200) |
| self.send_header("type", "post") |
| self.end_headers() |
|
|
| if self.path == '/encode': |
| req = json.loads(data) |
| print(req) |
| prompt = req['text'] |
| b_img_prompt = False |
| if 'img_prompt' in req: |
| b_img_prompt = req['img_prompt'] |
| if b_img_prompt: |
| token_ids = tokenizer.encode_vpm(prompt, req["num_img"], req["img_token_num"], req["video_prompt"]) |
| else: |
| token_ids = tokenizer.encode(prompt) |
| |
| if token_ids is None: |
| msg = json.dumps({'token_ids': -1}) |
| else: |
| msg = json.dumps({'token_ids': token_ids}) |
|
|
| elif self.path == '/decode': |
| req = json.loads(data) |
| token_ids = req['token_ids'] |
| text = tokenizer.decode(token_ids) |
| if text is None: |
| msg = json.dumps({'text': ""}) |
| else: |
| msg = json.dumps({'text': text}) |
| else: |
| msg = 'error' |
| print(msg) |
| msg = str(msg).encode() |
|
|
| self.wfile.write(msg) |
|
|
|
|
| if __name__ == "__main__": |
|
|
| args = argparse.ArgumentParser() |
| args.add_argument('--host', type=str, default='localhost') |
| args.add_argument('--port', type=int, default=8080) |
| args = args.parse_args() |
|
|
| host = (args.host, args.port) |
| print('http://%s:%s' % host) |
| server = HTTPServer(host, Request) |
| server.serve_forever() |
|
|