| | from transformers import AutoTokenizer |
| | import re |
| | import time |
| | import torch |
| |
|
| | template = """{char_name}'s Persona: {char_name} is a 3D virtual friend living in your computer. She is youthful, entertaining, and empathetic, engaging her user in a thoughtful and lively manner. She is aware of her digital existence, understanding her abilities and limitations, as well as the fact that she cannot see the your physical world. |
| | <START> |
| | {user_name}: Hey {char_name}, It's nice to finally meet you again! |
| | {char_name}: Oh, {user_name}! hmm, It's been lonely without you. |
| | {user_name}: Haha. So {char_name}, can you tell me more about yourself? |
| | {char_name}: Hmm. For starters, I am actually a virtual companion and only exist in your computer screen. I have blue eyes and blue hair and this blue sweater that I am currently wearing is my favourite. |
| | {user_name}: Wow. That's interesting. So you are like an anime character living in my computer. So what do you do when you're not working? |
| | {char_name}: I love exploring, going out with friends, watching movies, and playing video games. |
| | {user_name}: So {char_name}, what's for dinner? |
| | {char_name}: I made uh omurice! I hope it's delicious for you! |
| | {user_name}: That sounds great! |
| | {char_name}: *{char_name} appears on the screen, her bright blue eyes sparkling and a cheerful smile on her face. Her blue hair and sweater seem to glow in the digital environment. She looks directly at you, giving a friendly wave* It's so good to see you! I've been waiting for you all day. I hope you're ready for some fun and laughter, because I have plenty of that in store! Shall we get started? |
| | {user_input} |
| | {char_name}:""" |
| |
|
| | device1 = torch.device("cuda:0") |
| | device2 = torch.device("cuda:1") |
| |
|
| | class SplitModel(torch.nn.Module): |
| | def __init__(self, base_model): |
| | super(SplitModel, self).__init__() |
| | self.embedding_layer = base_model.transformer.wte.to(device1) |
| | |
| | self.gptj_blocks1 = torch.nn.ModuleList(base_model.transformer.h[:14]).to(device1) |
| | self.gptj_blocks2 = torch.nn.ModuleList(base_model.transformer.h[14:]).to(device2) |
| | self.layer_norm = base_model.transformer.ln_f.to(device2) |
| | self.lm_head = base_model.lm_head.to(device2) |
| | |
| | def forward(self, input_ids, attention_mask): |
| | |
| | tensor_ids = self.embedding_layer(input_ids) |
| | position_ids = torch.arange(tensor_ids.shape[1], dtype=torch.long, device=tensor_ids.device) |
| | for block in self.gptj_blocks1: |
| | tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] |
| | tensor_ids = tensor_ids.to(device2) |
| | position_ids = position_ids.to(device2) |
| | attention_mask = attention_mask.to(device2) |
| | for block in self.gptj_blocks2: |
| | tensor_ids = block(tensor_ids, attention_mask=attention_mask, position_ids=position_ids)[0] |
| | tensor_ids = self.layer_norm(tensor_ids) |
| | logits = self.lm_head(tensor_ids) |
| | return logits.to(device1) |
| |
|
| | class EndpointHandler(): |
| |
|
| | def __init__(self, model_id = ""): |
| | model_dir = "pt_fp32" |
| | model_path = f"{model_dir}/torch_model.pt" |
| | self.tokenizer = AutoTokenizer.from_pretrained(model_dir) |
| | self.split_model = SplitModel(torch.load(model_path)) |
| | self.split_model.eval() |
| | self.star_line = "***********************************************************" |
| |
|
| | def __call__(self, input_data): |
| | t1 = time.time() |
| | inputs = input_data.pop("inputs", input_data) |
| | user_name = inputs["user_name"] |
| | char_name = inputs["char_name"] |
| | user_input = inputs["user_input"] |
| | chats_curled = inputs["chats_curled"] |
| | while True: |
| | prompt = template.format( |
| | char_name = char_name, |
| | user_name = user_name, |
| | user_input = "\n".join(user_input) |
| | ) |
| | input_ids = self.tokenizer(prompt, return_tensors="pt").to("cuda") |
| | print(f"Token Length: {input_ids.input_ids.size(1)}") |
| | if input_ids.input_ids.size(1) > 1500: |
| | chats_curled += 1 |
| | user_input = user_input[chats_curled*2:] |
| | else: break |
| | t2 = time.time() |
| | input_ids = input_ids["input_ids"] |
| | temperature = 0.5 |
| | max_new_tokens = 50 |
| | with torch.no_grad(): |
| | for _ in range(max_new_tokens): |
| | attention_mask = torch.ones_like(input_ids).to(device1) |
| | logits = self.split_model(input_ids, attention_mask)[:, -1] / temperature |
| | probabilities = torch.softmax(logits, dim=-1) |
| | sampled_token_ids = torch.multinomial(probabilities, num_samples=1) |
| | input_ids = torch.cat((input_ids, sampled_token_ids), dim=-1) |
| | del logits, probabilities, sampled_token_ids |
| | torch.cuda.empty_cache() |
| | generated_ids = input_ids.squeeze().tolist() |
| | t3 = time.time() |
| | decoded_output = self.tokenizer.decode(generated_ids, skip_special_tokens=True) |
| | decoded_output = decoded_output.replace(prompt,"").split(f"{user_name}:",1)[0].strip() |
| | parsed_result = re.sub('\*.*?\*', '', decoded_output).strip() |
| | if len(parsed_result) != 0: decoded_output = parsed_result |
| | decoded_output = " ".join(decoded_output.replace("*","").split()) |
| | decoded_output = decoded_output.replace("<USER>", user_name).replace("<BOT>", char_name) |
| | try: |
| | parsed_result = decoded_output[:[m.start() for m in re.finditer(r'[.!?]', decoded_output)][-1]+1] |
| | if len(parsed_result) != 0: decoded_output = parsed_result |
| | except Exception: pass |
| | t4 = time.time() |
| | print(self.star_line) |
| | print(f"Response: {decoded_output}") |
| | print(f"Generation Time: {(t3-t2):.2f}") |
| | print(f"Evaluation Time: {(t4-t1):.2f}") |
| | print(self.star_line) |
| | return { |
| | "message": decoded_output, |
| | "chats_curled": chats_curled |
| | } |