Spaces:
Sleeping
Sleeping
| import os | |
| import pickle as pkl | |
| from pathlib import Path | |
| from threading import Thread | |
| from typing import List, Optional, Tuple, Iterator | |
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| MAX_MAX_NEW_TOKENS = 2048 | |
| DEFAULT_MAX_NEW_TOKENS = 1024 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| DESCRIPTION = """\ | |
| # Llama-2 7B Chat with Streamable Semantic Uncertainty Probe | |
| This Space demonstrates the Llama-2-7b-chat model with an added semantic uncertainty probe. | |
| The highlighted text shows the model's uncertainty in real-time, with more intense yellow indicating higher uncertainty. | |
| """ | |
| if torch.cuda.is_available(): | |
| model_id = "meta-llama/Llama-2-7b-chat-hf" | |
| # TODO load the full model? | |
| model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_8bit=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| tokenizer.use_default_system_prompt = False | |
| # load the probe data | |
| # TODO load accuracy and SE probe and compare in different tabs | |
| with open("./model/20240625-131035_demo.pkl", "rb") as f: | |
| probe_data = pkl.load(f) | |
| # take the NQ open one | |
| probe_data = probe_data[-2] | |
| probe = probe_data['t_bmodel'] | |
| layer_range = probe_data['sep_layer_range'] | |
| acc_probe = probe_data['t_amodel'] | |
| acc_layer_range = probe_data['ap_layer_range'] | |
| def generate( | |
| message: str, | |
| chat_history: List[Tuple[str, str]], | |
| system_prompt: str, | |
| max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, | |
| temperature: float = 0.6, | |
| top_p: float = 0.9, | |
| top_k: int = 50, | |
| repetition_penalty: float = 1.2, | |
| ) -> Iterator[str]: | |
| conversation = [] | |
| if system_prompt: | |
| conversation.append({"role": "system", "content": system_prompt}) | |
| for user, assistant in chat_history: | |
| conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| conversation.append({"role": "user", "content": message}) | |
| input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| input_ids = input_ids.to(model.device) | |
| streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = dict( | |
| input_ids=input_ids, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| repetition_penalty=repetition_penalty, | |
| streamer=streamer, | |
| output_hidden_states=True, | |
| return_dict_in_generate=True, | |
| ) | |
| # Generate without threading | |
| with torch.no_grad(): | |
| outputs = model.generate(**generation_kwargs) | |
| print(outputs.sequences.shape, input_ids.shape) | |
| generated_tokens = outputs.sequences[0, input_ids.shape[1]:] | |
| print("Generated tokens:", generated_tokens, generated_tokens.shape) | |
| generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| print("Generated text:", generated_text) | |
| # hidden states | |
| hidden = outputs.hidden_states # list of tensors, one for each token, then (batch size, sequence length, hidden size) | |
| print(len(hidden)) | |
| print(len(hidden[1])) # layers | |
| print(hidden[1][0].shape) # (sequence length, hidden size) | |
| # stack token embeddings | |
| # TODO do this loop on the fly instead of waiting for the whole generation | |
| highlighted_text = "" | |
| for i in range(1, len(hidden)): | |
| token_embeddings = torch.stack([generated_token[0, 0, :].cpu() for generated_token in hidden[i]]) # (num_layers, hidden_size) | |
| # print(token_embeddings.shape) | |
| # probe the model | |
| # print(token_embeddings.numpy()[layer_range].shape) | |
| concat_layers = token_embeddings.numpy()[layer_range[0]:layer_range[1]].reshape(-1) # (num_layers * hidden_size) | |
| # print(concat_layers.shape) | |
| # or prob? | |
| probe_pred = probe.predict_log_proba(concat_layers.reshape(1, -1))[0][1] # prob of high SE | |
| # print(probe_pred.shape, probe_pred) | |
| # decode one token at a time | |
| output_id = outputs.sequences[0, input_ids.shape[1]+i] | |
| print(output_id, output_word, probe_pred) | |
| output_word = tokenizer.decode(output_id) | |
| new_highlighted_text = highlight_text(output_word, probe_pred) | |
| highlighted_text += new_highlighted_text | |
| yield highlighted_text | |
| def highlight_text(text: str, uncertainty_score: float) -> str: | |
| if uncertainty_score > 0: | |
| html_color = "#%02X%02X%02X" % ( | |
| 255, | |
| int(255 * (1 - uncertainty_score)), | |
| int(255 * (1 - uncertainty_score)), | |
| ) | |
| else: | |
| html_color = "#%02X%02X%02X" % ( | |
| int(255 * (1 + uncertainty_score)), | |
| 255, | |
| int(255 * (1 + uncertainty_score)), | |
| ) | |
| return '<span style="background-color: {}; color: black">{}</span>'.format( | |
| html_color, text | |
| ) | |
| chat_interface = gr.ChatInterface( | |
| fn=generate, | |
| additional_inputs=[ | |
| gr.Textbox(label="System prompt", lines=6), | |
| gr.Slider( | |
| label="Max new tokens", | |
| minimum=1, | |
| maximum=MAX_MAX_NEW_TOKENS, | |
| step=1, | |
| value=DEFAULT_MAX_NEW_TOKENS, | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| minimum=0.1, | |
| maximum=4.0, | |
| step=0.1, | |
| value=0.6, | |
| ), | |
| gr.Slider( | |
| label="Top-p (nucleus sampling)", | |
| minimum=0.05, | |
| maximum=1.0, | |
| step=0.05, | |
| value=0.9, | |
| ), | |
| gr.Slider( | |
| label="Top-k", | |
| minimum=1, | |
| maximum=1000, | |
| step=1, | |
| value=50, | |
| ), | |
| gr.Slider( | |
| label="Repetition penalty", | |
| minimum=1.0, | |
| maximum=2.0, | |
| step=0.05, | |
| value=1.2, | |
| ), | |
| ], | |
| stop_btn=None, | |
| examples=[ | |
| ["What is the capital of France?"], | |
| ["Explain the theory of relativity in simple terms."], | |
| ["Write a short poem about artificial intelligence."] | |
| ], | |
| title="Llama-2 7B Chat with Streamable Semantic Uncertainty Probe", | |
| description=DESCRIPTION, | |
| ) | |
| if __name__ == "__main__": | |
| chat_interface.launch() | |