Spaces:
Sleeping
Sleeping
| import subprocess | |
| import sys | |
| import shlex | |
| import spaces | |
| import torch | |
| print(torch.__version__) | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| import gradio as gr | |
| from threading import Thread | |
| MODEL_BIG = "HuggingFaceTB/SmolLM-360M-Instruct" | |
| MODEL_SMALL = "HuggingFaceTB/SmolLM-135M-Instruct" | |
| TITLE = "<h1><center>Auto-Guidance Playground</center></h1>" | |
| SUB_TITLE = """<center>Auto-guidance was a technique made by NVIDIA for text-conditioned image models. This is a test of the concept with SmolLM.</center>""" | |
| CSS = """ | |
| .duplicate-button { | |
| margin: auto !important; | |
| color: white !important; | |
| background: black !important; | |
| border-radius: 100vh !important; | |
| } | |
| h3 { | |
| text-align: center; | |
| } | |
| """ | |
| END_MESSAGE = """ | |
| \n | |
| **The conversation has reached to its end, please press "Clear" to restart a new conversation** | |
| """ | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_SMALL) | |
| model_big = AutoModelForCausalLM.from_pretrained( | |
| MODEL_BIG, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto") | |
| model_small = AutoModelForCausalLM.from_pretrained( | |
| MODEL_SMALL, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto") | |
| if model_big.device == "cuda": | |
| model_big = torch.compile(model_big) | |
| if model_small.device == "cuda": | |
| model_small = torch.compile(model_small) | |
| def stream_chat( | |
| message: str, | |
| history: list, | |
| temperature: float = 0.3, | |
| max_new_tokens: int = 1024, | |
| top_p: float = 1.0, | |
| top_k: int = 20, | |
| penalty: float = 1.2, | |
| guidance_scale: float = 1.5, | |
| ): | |
| print(f'message: {message}') | |
| print(f'history: {history}') | |
| conversation = [] | |
| for prompt, answer in history: | |
| conversation.extend([ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": answer}, | |
| ]) | |
| conversation.append({"role": "user", "content": message}) | |
| inputs = tokenizer.apply_chat_template(conversation, tokenize=True, add_generation_prompt=True, return_tensors="pt") | |
| generated_tokens = [] | |
| current_input = inputs | |
| cache_small = None | |
| cache_big = None | |
| for _ in range(max_new_tokens): | |
| outputs_small = model_small(current_input, use_cache=True, past_key_values=cache_small) | |
| outputs_big = model_big(current_input, use_cache=True, past_key_values=cache_big) | |
| logits_small = outputs_small.logits[:, -1, :] | |
| logits_big = outputs_big.logits[:, -1, :] | |
| interpolated_logits = logits_big + (guidance_scale - 1) * (logits_big - logits_small) | |
| if top_p < 1.0: | |
| interpolated_logits = top_p_filtering(interpolated_logits, top_p=top_p) | |
| if top_k > 0: | |
| interpolated_logits = top_k_filtering(interpolated_logits, top_k=top_k) | |
| next_token = torch.multinomial(torch.softmax(interpolated_logits, dim=-1), num_samples=1) | |
| if next_token.item() == tokenizer.eos_token_id: | |
| break | |
| generated_tokens.append(next_token.item()) | |
| current_input = next_token | |
| # Update the cache with the latest past_key_values | |
| cache_small = outputs_small.past_key_values | |
| cache_big = outputs_big.past_key_values | |
| partial_output = tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| yield partial_output | |
| print(f'response: {partial_output}') | |
| def top_k_filtering(logits, top_k=0, filter_value=-float('Inf')): | |
| top_k = min(top_k, logits.size(-1)) | |
| if top_k > 0: | |
| indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] | |
| logits[indices_to_remove] = filter_value | |
| return logits | |
| def top_p_filtering(logits, top_p=0.0, filter_value=-float('Inf')): | |
| if top_p > 0.0: | |
| sorted_logits, sorted_indices = torch.sort(logits, descending=True) | |
| cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
| sorted_indices_to_remove[..., 0] = 0 | |
| indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
| logits[indices_to_remove] = filter_value | |
| return logits | |
| chatbot = gr.Chatbot(height=600) | |
| with gr.Blocks(css=CSS, theme="soft") as demo: | |
| gr.HTML(TITLE) | |
| gr.HTML(SUB_TITLE) | |
| gr.DuplicateButton(value="Duplicate Space for private use", elem_classes="duplicate-button") | |
| gr.ChatInterface( | |
| fn=stream_chat, | |
| chatbot=chatbot, | |
| fill_height=True, | |
| additional_inputs_accordion=gr.Accordion(label="⚙️ Parameters", open=False, render=False), | |
| additional_inputs=[ | |
| gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| step=0.1, | |
| value=0.3, | |
| label="Temperature", | |
| render=False, | |
| ), | |
| gr.Slider( | |
| minimum=128, | |
| maximum=8192, | |
| step=1, | |
| value=1024, | |
| label="Max new tokens", | |
| render=False, | |
| ), | |
| gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| step=0.1, | |
| value=1.0, | |
| label="top_p", | |
| render=False, | |
| ), | |
| gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| step=1, | |
| value=20, | |
| label="top_k", | |
| render=False, | |
| ), | |
| gr.Slider( | |
| minimum=0.0, | |
| maximum=2.0, | |
| step=0.1, | |
| value=1.2, | |
| label="Repetition penalty", | |
| render=False, | |
| ), | |
| gr.Slider( | |
| minimum=0.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=1.5, | |
| label="Auto-Guidance Scale", | |
| render=False, | |
| ), | |
| ], | |
| examples=[ | |
| ["Hello there, can you suggest few places to visit in UAE?"], | |
| ["What UAE is known for?"], | |
| ], | |
| cache_examples=False, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |