Spaces:
Sleeping
Sleeping
| import os | |
| import pickle as pkl | |
| from pathlib import Path | |
| from threading import Thread | |
| from typing import List, Tuple, Iterator | |
| from queue import Queue | |
| import spaces | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from transformers.generation.streamers import BaseStreamer | |
| MAX_MAX_NEW_TOKENS = 2048 | |
| DEFAULT_MAX_NEW_TOKENS = 1024 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| DESCRIPTION = """ | |
| <h1>Llama-2 7B Chat with Uncertainty Probes</h1> | |
| <p>This Space demonstrates the Llama-2-7b-chat model with a semantic uncertainty probe.</p> | |
| <p>The highlighted text shows the model's uncertainty in real-time:</p> | |
| <ul> | |
| <li><span style="background-color: #00FF00; color: black">Green</span> indicates more certain generations</li> | |
| <li><span style="background-color: #FF0000; color: black">Red</span> indicates more uncertain generations</li> | |
| </ul> | |
| """ | |
| EXAMPLES = [ | |
| ["What is the capital of France?", ""], | |
| ["Who landed on the moon?", ""], | |
| ["Who is Yarin Gal?", ""], | |
| ["Explain the theory of relativity in simple terms.", ""], | |
| ] | |
| if torch.cuda.is_available(): | |
| model_id = "meta-llama/Llama-2-7b-chat-hf" | |
| # TODO load the full model not the 8bit one? | |
| model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_8bit=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| tokenizer.use_default_system_prompt = False | |
| # load the probe data | |
| # TODO compare accuracy and SE probe in different tabs/sections | |
| with open("./model/20240625-131035_demo.pkl", "rb") as f: | |
| probe_data = pkl.load(f) | |
| # take the NQ open one | |
| probe_data = probe_data[-2] | |
| se_probe = probe_data['t_bmodel'] | |
| se_layer_range = probe_data['sep_layer_range'] | |
| acc_probe = probe_data['t_amodel'] | |
| acc_layer_range = probe_data['ap_layer_range'] | |
| else: | |
| DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
| class CustomStreamer(BaseStreamer): | |
| def __init__(self, skip_prompt: bool = False, timeout: Optional[float] = None): | |
| self.skip_prompt = skip_prompt | |
| self.timeout = timeout | |
| self.token_queue = Queue() | |
| self.hidden_states_queue = Queue() | |
| self.stop_signal = None | |
| self.next_tokens_are_prompt = True | |
| def put(self, value): | |
| """Receives tokens and adds them to the token queue.""" | |
| if len(value.shape) > 1 and value.shape[0] > 1: | |
| raise ValueError("CustomStreamer only supports batch size 1") | |
| elif len(value.shape) > 1: | |
| value = value[0] | |
| if self.skip_prompt and self.next_tokens_are_prompt: | |
| self.next_tokens_are_prompt = False | |
| return | |
| for token in value.tolist(): | |
| self.token_queue.put(token, timeout=self.timeout) | |
| def put_hidden_states(self, hidden_states): | |
| """Receives hidden states and adds them to the hidden states queue.""" | |
| self.hidden_states_queue.put(hidden_states, timeout=self.timeout) | |
| def end(self): | |
| """Signals the end of the stream.""" | |
| self.next_tokens_are_prompt = True | |
| self.token_queue.put(self.stop_signal, timeout=self.timeout) | |
| self.hidden_states_queue.put(self.stop_signal, timeout=self.timeout) | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| token = self.token_queue.get(timeout=self.timeout) | |
| if token == self.stop_signal: | |
| raise StopIteration() | |
| else: | |
| return token | |
| # Streamer claude | |
| # def generate( | |
| # message: str, | |
| # system_prompt: str, | |
| # chat_history: List[Tuple[str, str]], | |
| # max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, | |
| # temperature: float = 0.6, | |
| # top_p: float = 0.9, | |
| # top_k: int = 50, | |
| # repetition_penalty: float = 1.2, | |
| # ) -> Iterator[Tuple[str, str]]: | |
| # conversation = [] | |
| # if system_prompt: | |
| # conversation.append({"role": "system", "content": system_prompt}) | |
| # for user, assistant in chat_history: | |
| # conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| # conversation.append({"role": "user", "content": message}) | |
| # input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| # if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| # input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| # gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| # input_ids = input_ids.to(model.device) | |
| def generate( | |
| message: str, | |
| system_prompt: str, | |
| max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, | |
| temperature: float = 0.6, | |
| top_p: float = 0.9, | |
| top_k: int = 50, | |
| repetition_penalty: float = 1.2, | |
| ) -> Iterator[str]: | |
| conversation = [] | |
| if system_prompt: | |
| conversation.append({"role": "system", "content": system_prompt}) | |
| conversation.append({"role": "user", "content": message}) | |
| input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| input_ids = input_ids.to(model.device) | |
| streamer = CustomStreamer(skip_prompt=True, timeout=10.0) | |
| def generate_with_states(): | |
| with torch.no_grad(): | |
| model.generate( | |
| input_ids=input_ids, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| repetition_penalty=repetition_penalty, | |
| output_hidden_states=True, | |
| return_dict_in_generate=True, | |
| streamer=streamer | |
| ) | |
| thread = Thread(target=generate_with_states) | |
| thread.start() | |
| se_highlighted_text = "" | |
| acc_highlighted_text = "" | |
| for token_id in streamer: | |
| hidden_states = streamer.hidden_states_queue.get() | |
| if hidden_states is streamer.stop_signal: | |
| break | |
| # streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
| # streamer = CustomStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| # generation_kwargs = dict( | |
| # input_ids=input_ids, | |
| # max_new_tokens=max_new_tokens, | |
| # do_sample=True, | |
| # top_p=top_p, | |
| # top_k=top_k, | |
| # temperature=temperature, | |
| # repetition_penalty=repetition_penalty, | |
| # streamer=streamer, | |
| # output_hidden_states=True, | |
| # return_dict_in_generate=True, | |
| # ) | |
| # #### with threading | |
| # thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| # thread.start() | |
| # se_highlighted_text = "" | |
| # acc_highlighted_text = "" | |
| # for new_text in streamer: | |
| # hidden_states = streamer.hidden_states_queue.get() | |
| # Semantic Uncertainty Probe | |
| token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden_states]).numpy() # (num_layers, hidden_size) | |
| se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1) | |
| se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1 | |
| # Accuracy Probe | |
| acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1) | |
| acc_probe_pred = (1 - acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][1]) * 2 - 1 | |
| # decode latest token | |
| new_test = tokenizer.decode(token_id) | |
| print(new_text, se_probe_pred, acc_probe_pred) | |
| se_new_highlighted_text = highlight_text(new_text, se_probe_pred) | |
| acc_new_highlighted_text = highlight_text(new_text, acc_probe_pred) | |
| se_highlighted_text += f" {se_new_highlighted_text}" | |
| acc_highlighted_text += f" {acc_new_highlighted_text}" | |
| yield se_highlighted_text, acc_highlighted_text | |
| thread.join() | |
| #### Generate without threading | |
| # with torch.no_grad(): | |
| # outputs = model.generate(**generation_kwargs) | |
| # generated_tokens = outputs.sequences[0, input_ids.shape[1]:] | |
| # generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| # # hidden states | |
| # hidden = outputs.hidden_states # list of tensors, one for each token, then (batch size, sequence length, hidden size) | |
| # # TODO do this loop on the fly instead of waiting for the whole generation | |
| # se_highlighted_text = "" | |
| # acc_highlighted_text = "" | |
| # for i in range(1, len(hidden)): | |
| # # Semantic Uncertainty Probe | |
| # token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden[i]]).numpy() # (num_layers, hidden_size) | |
| # se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1) | |
| # se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1 | |
| # # Accuracy Probe | |
| # # acc_token_embeddings = torch.stack([layer[0, -1, :].cpu() for layer in hidden_states]) | |
| # acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1) | |
| # acc_probe_pred = (1 - acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][1]) * 2 - 1 | |
| # output_id = outputs.sequences[0, input_ids.shape[1]+i] | |
| # output_word = tokenizer.decode(output_id) | |
| # print(output_id, output_word, se_probe_pred, acc_probe_pred) | |
| # se_new_highlighted_text = highlight_text(output_word, se_probe_pred) | |
| # acc_new_highlighted_text = highlight_text(output_word, acc_probe_pred) | |
| # se_highlighted_text += f" {se_new_highlighted_text}" | |
| # acc_highlighted_text += f" {acc_new_highlighted_text}" | |
| # return se_highlighted_text, acc_highlighted_text | |
| def highlight_text(text: str, uncertainty_score: float) -> str: | |
| if uncertainty_score > 0: | |
| html_color = "#%02X%02X%02X" % ( | |
| 255, | |
| int(255 * (1 - uncertainty_score)), | |
| int(255 * (1 - uncertainty_score)), | |
| ) | |
| else: | |
| html_color = "#%02X%02X%02X" % ( | |
| int(255 * (1 + uncertainty_score)), | |
| 255, | |
| int(255 * (1 + uncertainty_score)), | |
| ) | |
| return '<span style="background-color: {}; color: black">{}</span>'.format( | |
| html_color, text | |
| ) | |
| with gr.Blocks(title="Llama-2 7B Chat with Dual Probes", css="footer {visibility: hidden}") as demo: | |
| gr.HTML(DESCRIPTION) | |
| with gr.Row(): | |
| with gr.Column(): | |
| message = gr.Textbox(label="Message") | |
| system_prompt = gr.Textbox(label="System prompt", lines=2) | |
| with gr.Column(): | |
| max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) | |
| temperature = gr.Slider(label="Temperature", minimum=0.01, maximum=2.0, step=0.1, value=0.01) | |
| top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) | |
| top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) | |
| repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) | |
| with gr.Row(): | |
| generate_btn = gr.Button("Generate") | |
| # Add spacing between probes | |
| gr.HTML("<br><br>") | |
| with gr.Row(): | |
| with gr.Column(): | |
| # make a box | |
| title = gr.HTML("<h2>Semantic Uncertainty Probe</h2>") | |
| se_output = gr.HTML(label="Semantic Uncertainty Probe") | |
| # Add spacing between columns | |
| gr.HTML("<div style='width: 20px;'></div>") | |
| with gr.Column(): | |
| title = gr.HTML("<h2>Accuracy Probe</h2>") | |
| acc_output = gr.HTML(label="Accuracy Probe") | |
| gr.Examples( | |
| examples=EXAMPLES, | |
| inputs=[message, system_prompt], | |
| outputs=[se_output, acc_output], | |
| fn=generate, | |
| ) | |
| generate_btn.click( | |
| generate, | |
| inputs=[message, system_prompt, max_new_tokens, temperature, top_p, top_k, repetition_penalty], | |
| outputs=[se_output, acc_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |