Spaces:
Runtime error
Runtime error
| import json | |
| import random | |
| import string | |
| import time | |
| import os | |
| import torch | |
| import numpy as np | |
| import tritonclient.grpc as client_util | |
| from tokenizers import Tokenizer | |
| from tritonclient.utils import np_to_triton_dtype, InferenceServerException | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| np.finfo(np.dtype("float32")) | |
| np.finfo(np.dtype("float64")) | |
| token = os.environ.get("HUB_TOKEN", None) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) | |
| model = AutoModelForCausalLM.from_pretrained("bigcode/christmas-models", trust_remote_code=True, use_auth_token=token).to(device) | |
| pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device) | |
| class CodeGenProxy: | |
| def __init__(self, host: str = 'triton', port: int = 8001, verbose: bool = False): | |
| self.tokenizer = AutoTokenizer.from_pretrained("bigcode/christmas-models", use_auth_token=token) | |
| self.client = client_util.InferenceServerClient(url=f'{host}:{port}', verbose=verbose) | |
| self.PAD_CHAR = 50256 | |
| # Max number of tokens the model can handle | |
| self.MAX_MODEL_LEN = 2048 | |
| class TokensExceedsMaximum(Exception): | |
| pass | |
| def prepare_tensor(name: str, tensor_input): | |
| t = client_util.InferInput( | |
| name, tensor_input.shape, np_to_triton_dtype(tensor_input.dtype)) | |
| t.set_data_from_numpy(tensor_input) | |
| return t | |
| def trim_with_stopwords(output: str, stopwords: list) -> str: | |
| for w in sorted(stopwords, key=len, reverse=True): | |
| if output.endswith(w): | |
| output = output[:-len(w)] | |
| break | |
| return output | |
| def to_word_list_format(word_dict, tokenizer): | |
| flat_ids = [] | |
| offsets = [] | |
| for word_dict_item in word_dict: | |
| item_flat_ids = [] | |
| item_offsets = [] | |
| for word in word_dict_item: | |
| ids = tokenizer.encode(word) | |
| if len(ids) == 0: | |
| continue | |
| item_flat_ids += ids | |
| item_offsets.append(len(ids)) | |
| # Hack, can we do this better? | |
| if word == '\n\n': | |
| item_flat_ids += [198, 198] | |
| item_offsets.append(2) | |
| flat_ids.append(np.array(item_flat_ids)) | |
| offsets.append(np.cumsum(np.array(item_offsets))) | |
| pad_to = max(1, max(len(ids) for ids in flat_ids)) | |
| for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): | |
| flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) | |
| offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) | |
| return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2)) | |
| def generate(self, data): | |
| global pipe | |
| prompt = data['prompt'] | |
| n = data.get('n', 1) | |
| model_name = data["model"] | |
| choices = [] | |
| text = pipe(prompt, do_sample=True, top_p=0.95, max_new_tokens=50)[0]['generated_text'] | |
| choice = { | |
| 'text': text, | |
| 'index': 0, | |
| 'finish_reason': "stop", | |
| 'logprobs': None, | |
| } | |
| choices.append(choice) | |
| completion = { | |
| 'id': None, # fill in | |
| 'model': 'codegen', | |
| 'object': 'text_completion', | |
| 'created': int(time.time()), | |
| 'choices': None, # fill in | |
| 'usage': { | |
| 'completion_tokens': int(50), | |
| 'prompt_tokens': int(50), | |
| 'total_tokens': int(100), | |
| } | |
| } | |
| return completion, choices | |
| def random_completion_id(): | |
| return 'cmpl-' + ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(29)) | |
| def streamed_response(self, completion, choices): | |
| for c in choices: | |
| completion['id'] = self.random_completion_id() | |
| completion['choices'] = [c] | |
| yield f'data: {json.dumps(completion)}\n\n' | |
| yield 'data: [DONE]\n\n' | |
| def non_streamed_response(self, completion, choices) -> str: | |
| completion['id'] = self.random_completion_id() | |
| completion['choices'] = choices | |
| return json.dumps(completion) | |
| def __call__(self, data: dict): | |
| st = time.time() | |
| try: | |
| completion, choices = self.generate(data) | |
| except InferenceServerException as E: | |
| print(E) | |
| completion = {} | |
| choices = [] | |
| ed = time.time() | |
| print(f"Returned completion in {(ed - st) * 1000} ms") | |
| if data.get('stream', False): | |
| return self.streamed_response(completion, choices) | |
| else: | |
| return self.non_streamed_response(completion, choices) | |