| | from transformers import AutoTokenizer, PreTrainedTokenizerFast |
| | from transformers.tokenization_utils_base import AddedToken |
| | from http.server import HTTPServer, BaseHTTPRequestHandler |
| | import json |
| | import argparse |
| |
|
| | def _prompt_split_image( |
| | image_seq_len, |
| | image_rows, |
| | image_cols, |
| | fake_token_around_image, |
| | image_token, |
| | global_img_token, |
| | ): |
| | """Prompt with expanded image tokens for when the image is split into patches.""" |
| | text_split_images = "" |
| | for n_h in range(image_rows): |
| | for n_w in range(image_cols): |
| | text_split_images += ( |
| | f"{fake_token_around_image}" |
| | + f"<row_{n_h + 1}_col_{n_w + 1}>" |
| | + f"{image_token}" * image_seq_len |
| | ) |
| | text_split_images += "\n" |
| |
|
| | text_split_images += ( |
| | f"\n{fake_token_around_image}" |
| | + f"{global_img_token}" |
| | + f"{image_token}" * image_seq_len |
| | + f"{fake_token_around_image}" |
| | ) |
| | return text_split_images |
| |
|
| |
|
| | def _prompt_single_image( |
| | image_seq_len, fake_token_around_image, image_token, global_img_token |
| | ): |
| | """Prompt with expanded image tokens for a single image.""" |
| | return ( |
| | f"{fake_token_around_image}" |
| | + f"{global_img_token}" |
| | + f"{image_token}" * image_seq_len |
| | + f"{fake_token_around_image}" |
| | ) |
| |
|
| |
|
| | def get_image_prompt_string( |
| | image_rows, |
| | image_cols, |
| | image_seq_len, |
| | fake_token_around_image, |
| | image_token, |
| | global_img_token, |
| | ): |
| | if image_rows == 0 and image_cols == 0: |
| | return _prompt_single_image( |
| | image_seq_len, |
| | fake_token_around_image=fake_token_around_image, |
| | image_token=image_token, |
| | global_img_token=global_img_token, |
| | ) |
| | return _prompt_split_image( |
| | image_seq_len, |
| | image_rows, |
| | image_cols, |
| | fake_token_around_image, |
| | image_token, |
| | global_img_token, |
| | ) |
| |
|
| | class Tokenizer_Http(): |
| |
|
| | def __init__(self): |
| |
|
| | path = 'qwen3-vl-tokenizer' |
| | self.tokenizer = AutoTokenizer.from_pretrained(path, |
| | trust_remote_code=True, |
| | use_fast=False) |
| | self.token_ids_cache = [] |
| |
|
| | def encode(self, content): |
| | text = [f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{content}<|im_end|>\n<|im_start|>assistant\n'] |
| | input_ids = self.tokenizer(text) |
| | return input_ids["input_ids"][0] |
| |
|
| | def encode_vpm(self, content="Describe this image.", num_img=1, img_token_num=256): |
| |
|
| | |
| | imgs_token = '<|vision_start|>' + '<|image_pad|>'*img_token_num + '<|vision_end|>' |
| | imgs_token *= num_img |
| | text = f'<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n{imgs_token}{content}<|im_end|>\n<|im_start|>assistant\n' |
| | |
| | output_kwargs = {'text_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'images_kwargs': {'return_tensors': 'pt'}, 'audio_kwargs': {'padding': True, 'return_tensors': 'pt'}, 'videos_kwargs': {'fps': 2.0, 'return_tensors': 'pt'}, 'common_kwargs': {'return_tensors': 'pt'}} |
| | |
| | text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) |
| | return text_inputs["input_ids"].tolist()[0] |
| |
|
| | def decode(self, token_ids): |
| | self.token_ids_cache += token_ids |
| | text = self.tokenizer.decode(self.token_ids_cache) |
| | if "\ufffd" in text: |
| | print("text 中包含非法字符") |
| | return "" |
| | else: |
| | self.token_ids_cache.clear() |
| | return text |
| |
|
| | @property |
| | def bos_id(self): |
| | return self.tokenizer.bos_token_id |
| |
|
| | @property |
| | def eos_id(self): |
| | return self.tokenizer.eos_token_id |
| |
|
| | @property |
| | def bos_token(self): |
| | return self.tokenizer.bos_token |
| |
|
| | @property |
| | def eos_token(self): |
| | return self.tokenizer.eos_token |
| |
|
| | @property |
| | def img_start_token(self): |
| | return self.tokenizer.encode("<|vision_start|>")[0] |
| |
|
| | @property |
| | def img_context_token(self): |
| | return self.tokenizer.encode("<|image_pad|>")[0] |
| |
|
| | tokenizer = Tokenizer_Http() |
| |
|
| | print(tokenizer.bos_id, tokenizer.bos_token, tokenizer.eos_id, |
| | tokenizer.eos_token) |
| | token_ids = tokenizer.encode_vpm() |
| | |
| | |
| | |
| | |
| | |
| | |
| | print(token_ids) |
| | print(len(token_ids)) |
| | token_ids = tokenizer.encode("hello world") |
| | |
| | |
| | print(token_ids) |
| | print(len(token_ids)) |
| |
|
| |
|
| | class Request(BaseHTTPRequestHandler): |
| | |
| | timeout = 5 |
| | server_version = 'Apache' |
| |
|
| | def do_GET(self): |
| | print(self.path) |
| | |
| | self.send_response(200) |
| | self.send_header("type", "get") |
| | self.end_headers() |
| |
|
| | if self.path == '/bos_id': |
| | bos_id = tokenizer.bos_id |
| | |
| | |
| | if bos_id is None: |
| | msg = json.dumps({'bos_id': -1}) |
| | else: |
| | msg = json.dumps({'bos_id': bos_id}) |
| | elif self.path == '/eos_id': |
| | eos_id = tokenizer.eos_id |
| | if eos_id is None: |
| | msg = json.dumps({'eos_id': -1}) |
| | else: |
| | msg = json.dumps({'eos_id': eos_id}) |
| | elif self.path == '/img_start_token': |
| | img_start_token = tokenizer.img_start_token |
| | if img_start_token is None: |
| | msg = json.dumps({'img_start_token': -1}) |
| | else: |
| | msg = json.dumps({'img_start_token': img_start_token}) |
| | elif self.path == '/img_context_token': |
| | img_context_token = tokenizer.img_context_token |
| | if img_context_token is None: |
| | msg = json.dumps({'img_context_token': -1}) |
| | else: |
| | msg = json.dumps({'img_context_token': img_context_token}) |
| | else: |
| | msg = 'error' |
| |
|
| | print(msg) |
| | msg = str(msg).encode() |
| |
|
| | self.wfile.write(msg) |
| |
|
| | def do_POST(self): |
| | |
| | data = self.rfile.read(int( |
| | self.headers['content-length'])) |
| | data = data.decode() |
| |
|
| | self.send_response(200) |
| | self.send_header("type", "post") |
| | self.end_headers() |
| |
|
| | if self.path == '/encode': |
| | req = json.loads(data) |
| | print(req) |
| | prompt = req['text'] |
| | b_img_prompt = False |
| | if 'img_prompt' in req: |
| | b_img_prompt = req['img_prompt'] |
| | if b_img_prompt: |
| | token_ids = tokenizer.encode_vpm(prompt, req["num_img"], req["img_token_num"]) |
| | else: |
| | token_ids = tokenizer.encode(prompt) |
| | |
| | if token_ids is None: |
| | msg = json.dumps({'token_ids': -1}) |
| | else: |
| | msg = json.dumps({'token_ids': token_ids}) |
| |
|
| | elif self.path == '/decode': |
| | req = json.loads(data) |
| | token_ids = req['token_ids'] |
| | text = tokenizer.decode(token_ids) |
| | if text is None: |
| | msg = json.dumps({'text': ""}) |
| | else: |
| | msg = json.dumps({'text': text}) |
| | else: |
| | msg = 'error' |
| | print(msg) |
| | msg = str(msg).encode() |
| |
|
| | self.wfile.write(msg) |
| |
|
| |
|
| | if __name__ == "__main__": |
| |
|
| | args = argparse.ArgumentParser() |
| | args.add_argument('--host', type=str, default='localhost') |
| | args.add_argument('--port', type=int, default=8080) |
| | args = args.parse_args() |
| |
|
| | host = (args.host, args.port) |
| | print('http://%s:%s' % host) |
| | server = HTTPServer(host, Request) |
| | server.serve_forever() |
| |
|