import os import pickle as pkl from pathlib import Path from threading import Thread from typing import List, Tuple, Iterator import spaces import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) DESCRIPTION = """\ This Space demonstrates the Llama-2-7b-chat model with a semantic uncertainty probe. The highlighted text shows the model's uncertainty in real-time, with green indicating more certain generations and red indicating higher uncertainty. """ if torch.cuda.is_available(): model_id = "meta-llama/Llama-2-7b-chat-hf" # TODO load the full model not the 8bit one? model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_8bit=True) tokenizer = AutoTokenizer.from_pretrained(model_id) tokenizer.use_default_system_prompt = False # load the probe data # TODO compare accuracy and SE probe in different tabs/sections with open("./model/20240625-131035_demo.pkl", "rb") as f: probe_data = pkl.load(f) # take the NQ open one probe_data = probe_data[-2] probe = probe_data['t_bmodel'] layer_range = probe_data['sep_layer_range'] acc_probe = probe_data['t_amodel'] acc_layer_range = probe_data['ap_layer_range'] @spaces.GPU def generate( message: str, chat_history: List[Tuple[str, str]], system_prompt: str, max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generation_kwargs = dict( input_ids=input_ids, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, repetition_penalty=repetition_penalty, streamer=streamer, output_hidden_states=True, return_dict_in_generate=True, ) # Generate without threading with torch.no_grad(): outputs = model.generate(**generation_kwargs) print(outputs.sequences.shape, input_ids.shape) generated_tokens = outputs.sequences[0, input_ids.shape[1]:] print("Generated tokens:", generated_tokens, generated_tokens.shape) generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) print("Generated text:", generated_text) # hidden states hidden = outputs.hidden_states # list of tensors, one for each token, then (batch size, sequence length, hidden size) print(len(hidden)) print(len(hidden[1])) # layers print(hidden[1][0].shape) # (sequence length, hidden size) # stack token embeddings # TODO do this loop on the fly instead of waiting for the whole generation highlighted_text = "" for i in range(1, len(hidden)): token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden[i]]) # (num_layers, hidden_size) concat_layers = token_embeddings.numpy()[layer_range[0]:layer_range[1]].reshape(-1) # (num_layers * hidden_size) # pred in range [-1, 1] probe_pred = probe.predict_proba(concat_layers.reshape(1, -1))[0][1] * 2 - 1 # prob of high SE # decode one token at a time output_id = outputs.sequences[0, input_ids.shape[1]+i] output_word = tokenizer.decode(output_id) print(output_id, output_word, probe_pred) new_highlighted_text = highlight_text(output_word, probe_pred) highlighted_text += f" {new_highlighted_text}" yield highlighted_text def highlight_text(text: str, uncertainty_score: float) -> str: if uncertainty_score > 0: html_color = "#%02X%02X%02X" % ( 255, int(255 * (1 - uncertainty_score)), int(255 * (1 - uncertainty_score)), ) else: html_color = "#%02X%02X%02X" % ( int(255 * (1 + uncertainty_score)), 255, int(255 * (1 + uncertainty_score)), ) return '{}'.format( html_color, text ) chat_interface = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Textbox(label="System prompt", lines=6), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), gr.Slider( label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6, ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2, ), ], stop_btn=None, examples=[ ["What is the capital of France?"], ["Explain the theory of relativity in simple terms."], ["Write a short poem about artificial intelligence."] ], title="Llama-2 7B Chat with Streamable Semantic Uncertainty Probe", description=DESCRIPTION, ) if __name__ == "__main__": chat_interface.launch()