import os import pickle as pkl from pathlib import Path from threading import Thread from typing import List, Tuple, Iterator from queue import Queue import spaces import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer from transformers.generation.streamers import BaseStreamer MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) DESCRIPTION = """
This Space demonstrates the Llama-2-7b-chat model with a semantic uncertainty probe.
The highlighted text shows the model's uncertainty in real-time:
Running on CPU 🥶 This demo does not work on CPU.
" class CustomStreamer(BaseStreamer): def __init__(self, skip_prompt: bool = False, timeout: Optional[float] = None): self.skip_prompt = skip_prompt self.timeout = timeout self.token_queue = Queue() self.hidden_states_queue = Queue() self.stop_signal = None self.next_tokens_are_prompt = True def put(self, value): """Receives tokens and adds them to the token queue.""" if len(value.shape) > 1 and value.shape[0] > 1: raise ValueError("CustomStreamer only supports batch size 1") elif len(value.shape) > 1: value = value[0] if self.skip_prompt and self.next_tokens_are_prompt: self.next_tokens_are_prompt = False return for token in value.tolist(): self.token_queue.put(token, timeout=self.timeout) def put_hidden_states(self, hidden_states): """Receives hidden states and adds them to the hidden states queue.""" self.hidden_states_queue.put(hidden_states, timeout=self.timeout) def end(self): """Signals the end of the stream.""" self.next_tokens_are_prompt = True self.token_queue.put(self.stop_signal, timeout=self.timeout) self.hidden_states_queue.put(self.stop_signal, timeout=self.timeout) def __iter__(self): return self def __next__(self): token = self.token_queue.get(timeout=self.timeout) if token == self.stop_signal: raise StopIteration() else: return token # Streamer claude # def generate( # message: str, # system_prompt: str, # chat_history: List[Tuple[str, str]], # max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, # temperature: float = 0.6, # top_p: float = 0.9, # top_k: int = 50, # repetition_penalty: float = 1.2, # ) -> Iterator[Tuple[str, str]]: # conversation = [] # if system_prompt: # conversation.append({"role": "system", "content": system_prompt}) # for user, assistant in chat_history: # conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) # conversation.append({"role": "user", "content": message}) # input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") # if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: # input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] # gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") # input_ids = input_ids.to(model.device) @spaces.GPU def generate( message: str, system_prompt: str, max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ) -> Iterator[str]: conversation = [] if system_prompt: conversation.append({"role": "system", "content": system_prompt}) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = CustomStreamer(skip_prompt=True, timeout=10.0) def generate_with_states(): with torch.no_grad(): model.generate( input_ids=input_ids, max_new_tokens=max_new_tokens, do_sample=True, top_p=top_p, top_k=top_k, temperature=temperature, repetition_penalty=repetition_penalty, output_hidden_states=True, return_dict_in_generate=True, streamer=streamer ) thread = Thread(target=generate_with_states) thread.start() se_highlighted_text = "" acc_highlighted_text = "" for token_id in streamer: hidden_states = streamer.hidden_states_queue.get() if hidden_states is streamer.stop_signal: break # streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) # streamer = CustomStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) # generation_kwargs = dict( # input_ids=input_ids, # max_new_tokens=max_new_tokens, # do_sample=True, # top_p=top_p, # top_k=top_k, # temperature=temperature, # repetition_penalty=repetition_penalty, # streamer=streamer, # output_hidden_states=True, # return_dict_in_generate=True, # ) # #### with threading # thread = Thread(target=model.generate, kwargs=generation_kwargs) # thread.start() # se_highlighted_text = "" # acc_highlighted_text = "" # for new_text in streamer: # hidden_states = streamer.hidden_states_queue.get() # Semantic Uncertainty Probe token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden_states]).numpy() # (num_layers, hidden_size) se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1) se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1 # Accuracy Probe acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1) acc_probe_pred = (1 - acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][1]) * 2 - 1 # decode latest token new_test = tokenizer.decode(token_id) print(new_text, se_probe_pred, acc_probe_pred) se_new_highlighted_text = highlight_text(new_text, se_probe_pred) acc_new_highlighted_text = highlight_text(new_text, acc_probe_pred) se_highlighted_text += f" {se_new_highlighted_text}" acc_highlighted_text += f" {acc_new_highlighted_text}" yield se_highlighted_text, acc_highlighted_text thread.join() #### Generate without threading # with torch.no_grad(): # outputs = model.generate(**generation_kwargs) # generated_tokens = outputs.sequences[0, input_ids.shape[1]:] # generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) # # hidden states # hidden = outputs.hidden_states # list of tensors, one for each token, then (batch size, sequence length, hidden size) # # TODO do this loop on the fly instead of waiting for the whole generation # se_highlighted_text = "" # acc_highlighted_text = "" # for i in range(1, len(hidden)): # # Semantic Uncertainty Probe # token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden[i]]).numpy() # (num_layers, hidden_size) # se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1) # se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1 # # Accuracy Probe # # acc_token_embeddings = torch.stack([layer[0, -1, :].cpu() for layer in hidden_states]) # acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1) # acc_probe_pred = (1 - acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][1]) * 2 - 1 # output_id = outputs.sequences[0, input_ids.shape[1]+i] # output_word = tokenizer.decode(output_id) # print(output_id, output_word, se_probe_pred, acc_probe_pred) # se_new_highlighted_text = highlight_text(output_word, se_probe_pred) # acc_new_highlighted_text = highlight_text(output_word, acc_probe_pred) # se_highlighted_text += f" {se_new_highlighted_text}" # acc_highlighted_text += f" {acc_new_highlighted_text}" # return se_highlighted_text, acc_highlighted_text def highlight_text(text: str, uncertainty_score: float) -> str: if uncertainty_score > 0: html_color = "#%02X%02X%02X" % ( 255, int(255 * (1 - uncertainty_score)), int(255 * (1 - uncertainty_score)), ) else: html_color = "#%02X%02X%02X" % ( int(255 * (1 + uncertainty_score)), 255, int(255 * (1 + uncertainty_score)), ) return '{}'.format( html_color, text ) with gr.Blocks(title="Llama-2 7B Chat with Dual Probes", css="footer {visibility: hidden}") as demo: gr.HTML(DESCRIPTION) with gr.Row(): with gr.Column(): message = gr.Textbox(label="Message") system_prompt = gr.Textbox(label="System prompt", lines=2) with gr.Column(): max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) temperature = gr.Slider(label="Temperature", minimum=0.01, maximum=2.0, step=0.1, value=0.01) top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) with gr.Row(): generate_btn = gr.Button("Generate") # Add spacing between probes gr.HTML("