| #!/usr/bin/env python | |
| import os | |
| from threading import Thread | |
| from typing import Iterator | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| # Debugging: Start script | |
| print("Starting script...") | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if HF_TOKEN is None: | |
| print("Warning: HF_TOKEN is not set!") | |
| PASSWORD = os.getenv("APP_PASSWORD", "mysecretpassword") # Set your desired password here or via environment variable | |
| DESCRIPTION = "# FT of Mistral-7B v0.2" | |
| if not torch.cuda.is_available(): | |
| DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
| print("Warning: No GPU available. This model cannot run on CPU.") | |
| else: | |
| print("GPU is available!") | |
| MAX_MAX_NEW_TOKENS = 2048 | |
| DEFAULT_MAX_NEW_TOKENS = 1024 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| # Debugging: GPU check passed, loading model | |
| if torch.cuda.is_available(): | |
| model_id = "BGLAW/mistral7binst2-bglawinsv7UNS-merged" | |
| try: | |
| print("Loading model...") | |
| model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto", token=HF_TOKEN) | |
| print("Model loaded successfully!") | |
| print("Loading tokenizer...") | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
| print("Tokenizer loaded successfully!") | |
| except Exception as e: | |
| print(f"Error loading model or tokenizer: {e}") | |
| raise e # Re-raise the error after logging it | |
| def generate( | |
| message: str, | |
| chat_history: list[tuple[str, str]], | |
| max_new_tokens: int = 1024, | |
| temperature: float = 0.6, | |
| top_p: float = 0.9, | |
| top_k: int = 50, | |
| repetition_penalty: float = 1.2, | |
| ) -> Iterator[str]: | |
| print(f"Received message: {message}") | |
| print(f"Chat history: {chat_history}") | |
| conversation = [] | |
| for user, assistant in chat_history: | |
| conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| conversation.append({"role": "user", "content": message}) | |
| try: | |
| print("Tokenizing input...") | |
| input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| print(f"Input tokenized: {input_ids.shape}") | |
| if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| print("Trimmed input tokens due to length.") | |
| input_ids = input_ids.to(model.device) | |
| print("Input moved to the model's device.") | |
| streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) | |
| generate_kwargs = dict( | |
| {"input_ids": input_ids}, | |
| streamer=streamer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| num_beams=1, | |
| repetition_penalty=repetition_penalty, | |
| ) | |
| print("Starting generation...") | |
| t = Thread(target=model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| print("Thread started for model generation.") | |
| outputs = [] | |
| for text in streamer: | |
| outputs.append(text) | |
| print(f"Generated text so far: {''.join(outputs)}") | |
| yield "".join(outputs) | |
| except Exception as e: | |
| print(f"Error during generation: {e}") | |
| raise e # Re-raise the error after logging it | |
| def password_auth(password): | |
| if password == PASSWORD: | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True, value="Incorrect password. Try again.") | |
| chat_interface = gr.ChatInterface( | |
| fn=generate, | |
| additional_inputs=[ | |
| gr.Slider( | |
| label="Max new tokens", | |
| minimum=1, | |
| maximum=MAX_MAX_NEW_TOKENS, | |
| step=1, | |
| value=DEFAULT_MAX_NEW_TOKENS, | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| minimum=0.1, | |
| maximum=4.0, | |
| step=0.1, | |
| value=0.6, | |
| ), | |
| gr.Slider( | |
| label="Top-p (nucleus sampling)", | |
| minimum=0.05, | |
| maximum=1.0, | |
| step=0.05, | |
| value=0.9, | |
| ), | |
| gr.Slider( | |
| label="Top-k", | |
| minimum=1, | |
| maximum=1000, | |
| step=1, | |
| value=50, | |
| ), | |
| gr.Slider( | |
| label="Repetition penalty", | |
| minimum=1.0, | |
| maximum=2.0, | |
| step=0.05, | |
| value=1.2, | |
| ), | |
| ], | |
| stop_btn=None, | |
| examples=[ | |
| ["Hello there! How are you doing?"], | |
| ["Can you explain briefly to me what is the Python programming language?"], | |
| ["Explain the plot of Cinderella in a sentence."], | |
| ["How many hours does it take a man to eat a Helicopter?"], | |
| ["Write a 100-word article on 'Benefits of Open-Source in AI research'"], | |
| ], | |
| ) | |
| # Debugging: Interface setup | |
| print("Setting up interface...") | |
| with gr.Blocks(css="style.css") as demo: | |
| gr.Markdown(DESCRIPTION) | |
| # Create login components | |
| with gr.Row(visible=True) as login_area: | |
| password_input = gr.Textbox( | |
| label="Enter Password", type="password", placeholder="Password", show_label=True | |
| ) | |
| login_btn = gr.Button("Submit") | |
| incorrect_password_msg = gr.Markdown("Incorrect password. Try again.", visible=False) | |
| # Main chat interface | |
| with gr.Column(visible=False) as chat_area: | |
| gr.Markdown(DESCRIPTION) | |
| gr.DuplicateButton( | |
| value="Duplicate Space for private use", | |
| elem_id="duplicate-button", | |
| visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", | |
| ) | |
| chat_interface.render() | |
| # Bind login button to check password | |
| login_btn.click(password_auth, inputs=password_input, outputs=[chat_area, incorrect_password_msg]) | |
| # Debugging: Starting queue and launching the demo | |
| print("Launching demo...") | |
| if __name__ == "__main__": | |
| demo.queue(max_size=20).launch(share=True) | |
| # WORKING | |
| # #!/usr/bin/env python | |
| # import os | |
| # from threading import Thread | |
| # from typing import Iterator | |
| # import gradio as gr | |
| # import spaces | |
| # import torch | |
| # from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
| # # Debugging: Start script | |
| # print("Starting script...") | |
| # HF_TOKEN = os.environ.get("HF_TOKEN") | |
| # if HF_TOKEN is None: | |
| # print("Warning: HF_TOKEN is not set!") | |
| # DESCRIPTION = "# Mistral-7B v0.2" | |
| # if not torch.cuda.is_available(): | |
| # DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
| # print("Warning: No GPU available. This model cannot run on CPU.") | |
| # else: | |
| # print("GPU is available!") | |
| # MAX_MAX_NEW_TOKENS = 2048 | |
| # DEFAULT_MAX_NEW_TOKENS = 1024 | |
| # MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| # # Debugging: GPU check passed, loading model | |
| # if torch.cuda.is_available(): | |
| # model_id = "mistralai/Mistral-7B-Instruct-v0.2" | |
| # try: | |
| # print("Loading model...") | |
| # model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto", token=HF_TOKEN) | |
| # print("Model loaded successfully!") | |
| # print("Loading tokenizer...") | |
| # tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
| # print("Tokenizer loaded successfully!") | |
| # except Exception as e: | |
| # print(f"Error loading model or tokenizer: {e}") | |
| # raise e # Re-raise the error after logging it | |
| # @spaces.GPU | |
| # def generate( | |
| # message: str, | |
| # chat_history: list[tuple[str, str]], | |
| # max_new_tokens: int = 1024, | |
| # temperature: float = 0.6, | |
| # top_p: float = 0.9, | |
| # top_k: int = 50, | |
| # repetition_penalty: float = 1.2, | |
| # ) -> Iterator[str]: | |
| # print(f"Received message: {message}") | |
| # print(f"Chat history: {chat_history}") | |
| # conversation = [] | |
| # for user, assistant in chat_history: | |
| # conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| # conversation.append({"role": "user", "content": message}) | |
| # try: | |
| # print("Tokenizing input...") | |
| # input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| # print(f"Input tokenized: {input_ids.shape}") | |
| # if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| # input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| # gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| # print("Trimmed input tokens due to length.") | |
| # input_ids = input_ids.to(model.device) | |
| # print("Input moved to the model's device.") | |
| # streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) | |
| # generate_kwargs = dict( | |
| # {"input_ids": input_ids}, | |
| # streamer=streamer, | |
| # max_new_tokens=max_new_tokens, | |
| # do_sample=True, | |
| # top_p=top_p, | |
| # top_k=top_k, | |
| # temperature=temperature, | |
| # num_beams=1, | |
| # repetition_penalty=repetition_penalty, | |
| # ) | |
| # print("Starting generation...") | |
| # t = Thread(target=model.generate, kwargs=generate_kwargs) | |
| # t.start() | |
| # print("Thread started for model generation.") | |
| # outputs = [] | |
| # for text in streamer: | |
| # outputs.append(text) | |
| # print(f"Generated text so far: {''.join(outputs)}") | |
| # yield "".join(outputs) | |
| # except Exception as e: | |
| # print(f"Error during generation: {e}") | |
| # raise e # Re-raise the error after logging it | |
| # chat_interface = gr.ChatInterface( | |
| # fn=generate, | |
| # additional_inputs=[ | |
| # gr.Slider( | |
| # label="Max new tokens", | |
| # minimum=1, | |
| # maximum=MAX_MAX_NEW_TOKENS, | |
| # step=1, | |
| # value=DEFAULT_MAX_NEW_TOKENS, | |
| # ), | |
| # gr.Slider( | |
| # label="Temperature", | |
| # minimum=0.1, | |
| # maximum=4.0, | |
| # step=0.1, | |
| # value=0.6, | |
| # ), | |
| # gr.Slider( | |
| # label="Top-p (nucleus sampling)", | |
| # minimum=0.05, | |
| # maximum=1.0, | |
| # step=0.05, | |
| # value=0.9, | |
| # ), | |
| # gr.Slider( | |
| # label="Top-k", | |
| # minimum=1, | |
| # maximum=1000, | |
| # step=1, | |
| # value=50, | |
| # ), | |
| # gr.Slider( | |
| # label="Repetition penalty", | |
| # minimum=1.0, | |
| # maximum=2.0, | |
| # step=0.05, | |
| # value=1.2, | |
| # ), | |
| # ], | |
| # stop_btn=None, | |
| # examples=[ | |
| # ["Hello there! How are you doing?"], | |
| # ["Can you explain briefly to me what is the Python programming language?"], | |
| # ["Explain the plot of Cinderella in a sentence."], | |
| # ["How many hours does it take a man to eat a Helicopter?"], | |
| # ["Write a 100-word article on 'Benefits of Open-Source in AI research'"], | |
| # ], | |
| # ) | |
| # # Debugging: Interface setup | |
| # print("Setting up interface...") | |
| # with gr.Blocks(css="style.css") as demo: | |
| # gr.Markdown(DESCRIPTION) | |
| # gr.DuplicateButton( | |
| # value="Duplicate Space for private use", | |
| # elem_id="duplicate-button", | |
| # visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", | |
| # ) | |
| # chat_interface.render() | |
| # # Debugging: Starting queue and launching the demo | |
| # print("Launching demo...") | |
| # if __name__ == "__main__": | |
| # demo.queue(max_size=20).launch(share=True) | |