Spaces:
Running on Zero
Running on Zero
| import spaces | |
| import gradio as gr | |
| from transformers import pipeline, AutoTokenizer, TextIteratorStreamer, AutoModelForCausalLM | |
| import torch | |
| from threading import Thread, Lock, Event | |
| import os | |
| import asyncio | |
| import time | |
| from datetime import datetime | |
| import gc | |
| # Global dictionary to store preloaded models and tokenizers | |
| LOADED_MODELS = {} | |
| LOADED_TOKENIZERS = {} | |
| # Lock for thread-safe model access | |
| MODEL_LOCK = Lock() | |
| # Event to signal shutdown | |
| SHUTDOWN_EVENT = Event() | |
| def clear_memory(): | |
| """Clear GPU and CPU memory""" | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def load_single_model(model_name): | |
| """Load a single model and tokenizer""" | |
| try: | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Loading {model_name}...") | |
| # Load model to CPU with bfloat16 to save memory | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| token=os.environ.get("token"), | |
| ) | |
| # Load tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| trust_remote_code=True, | |
| token=os.environ.get("token") | |
| ) | |
| tokenizer.eos_token = "<|im_end|>" | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Successfully loaded {model_name}") | |
| return model, tokenizer | |
| except Exception as e: | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Failed to load {model_name}: {e}") | |
| return None, None | |
| def preload_models(model_choices): | |
| """Preload all models to CPU at startup""" | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Preloading models to CPU...") | |
| with MODEL_LOCK: | |
| for model_name in model_choices: | |
| model, tokenizer = load_single_model(model_name) | |
| if model is not None and tokenizer is not None: | |
| LOADED_MODELS[model_name] = model | |
| LOADED_TOKENIZERS[model_name] = tokenizer | |
| def reload_models_task(model_choices): | |
| """Background task to reload models every 15 minutes""" | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Starting model reload task...") | |
| while not SHUTDOWN_EVENT.is_set(): | |
| # Wait for 15 minutes (900 seconds) | |
| if SHUTDOWN_EVENT.wait(900): | |
| # If event is set, exit the loop | |
| break | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Starting periodic model reload...") | |
| # Create temporary dictionaries for new models | |
| new_models = {} | |
| new_tokenizers = {} | |
| # Load new models | |
| for model_name in model_choices: | |
| model, tokenizer = load_single_model(model_name) | |
| if model is not None and tokenizer is not None: | |
| new_models[model_name] = model | |
| new_tokenizers[model_name] = tokenizer | |
| # Replace old models with new ones atomically | |
| with MODEL_LOCK: | |
| # Clear old models from memory | |
| for model_name in LOADED_MODELS: | |
| if model_name in LOADED_MODELS: | |
| try: | |
| del LOADED_MODELS[model_name] | |
| except: | |
| pass | |
| if model_name in LOADED_TOKENIZERS: | |
| try: | |
| del LOADED_TOKENIZERS[model_name] | |
| except: | |
| pass | |
| # Clear memory | |
| clear_memory() | |
| # Update with new models | |
| LOADED_MODELS.update(new_models) | |
| LOADED_TOKENIZERS.update(new_tokenizers) | |
| print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Model reload completed") | |
| def get_model_pipeline(model_name): | |
| """Move selected model to GPU and create pipeline""" | |
| with MODEL_LOCK: | |
| if model_name not in LOADED_MODELS: | |
| raise ValueError(f"Model {model_name} not found in preloaded models") | |
| # Get model and tokenizer references | |
| model = LOADED_MODELS[model_name] | |
| tokenizer = LOADED_TOKENIZERS[model_name] | |
| # Create pipeline with the GPU model | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| torch_dtype=torch.bfloat16, | |
| device="cuda" | |
| ) | |
| return pipe, model | |
| def generate( | |
| message, | |
| history, | |
| model_name, | |
| system, | |
| temperature=0.4, | |
| top_p=0.95, | |
| min_p=0.1, | |
| top_k=50, | |
| max_new_tokens=256, | |
| ): | |
| try: | |
| # Get the pipeline with model on GPU | |
| pipe, gpu_model = get_model_pipeline(model_name) | |
| # Build the prompt | |
| prompt = f"<|im_start|>system\n{system}<|im_end|>\n" | |
| for (user_turn, assistant_turn) in history: | |
| prompt += f"<|im_start|>user\n{user_turn}<|im_end|>\n<|im_start|>assistant\n{assistant_turn}<|im_end|>\n" | |
| prompt += f"<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n" | |
| streamer = TextIteratorStreamer( | |
| pipe.tokenizer, | |
| timeout=240.0, | |
| skip_prompt=True, | |
| skip_special_tokens=True | |
| ) | |
| generation_kwargs = dict( | |
| text_inputs=prompt, | |
| streamer=streamer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| min_p=min_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| num_beams=1, | |
| repetition_penalty=1.1 | |
| ) | |
| t = Thread(target=pipe.__call__, kwargs=generation_kwargs) | |
| t.start() | |
| outputs = [] | |
| for chunk in streamer: | |
| outputs.append(chunk) | |
| yield "".join(outputs) | |
| # Move model back to CPU after inference to free GPU memory | |
| gpu_model.to("cpu") | |
| torch.cuda.empty_cache() | |
| except StopAsyncIteration: | |
| print("Stream stopped unexpectedly.") | |
| yield "".join(outputs) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| yield f"An error occurred during generation: {str(e)}" | |
| finally: | |
| # Ensure model is moved back to CPU even if there's an error | |
| if 'gpu_model' in locals(): | |
| gpu_model.to("cpu") | |
| torch.cuda.empty_cache() | |
| # Define model choices | |
| model_choices = [ | |
| "Locutusque/Hercules-6.1-Llama-3.1-8B", | |
| "Locutusque/liberalis-cogitator-mistral-nemo-2407", | |
| "Locutusque/lmsys-llama-3.2-3b" | |
| ] | |
| # Preload all models to CPU at startup | |
| preload_models(model_choices) | |
| # Start the background reload task | |
| reload_thread = Thread(target=reload_models_task, args=(model_choices,), daemon=True) | |
| reload_thread.start() | |
| # Create Gradio interface | |
| g = gr.ChatInterface( | |
| fn=generate, | |
| additional_inputs=[ | |
| gr.components.Dropdown( | |
| choices=model_choices, | |
| label="Model", | |
| value=model_choices[0], | |
| interactive=True | |
| ), | |
| gr.components.Textbox( | |
| lines=2, | |
| label="System Prompt", | |
| value="You are a sentient, super intelligent AI developed by a man named Locutusque." | |
| ), | |
| gr.components.Slider(minimum=0, maximum=2, value=0.8, label="Temperature"), | |
| gr.components.Slider(minimum=0, maximum=1, value=0.95, label="Top p"), | |
| gr.components.Slider(minimum=0, maximum=1, value=0.1, label="Min P"), | |
| gr.components.Slider(minimum=0, maximum=100, step=1, value=15, label="Top k"), | |
| gr.components.Slider(minimum=1, maximum=8192, step=1, value=1024, label="Max tokens"), | |
| ], | |
| title="Locutusque's Language Models", | |
| description="Try out Locutusque's language models here! Credit goes to Mediocreatmybest for this space. You may also find some experimental preview models that have not been made public here.", | |
| ) | |
| if __name__ == "__main__": | |
| try: | |
| g.launch() | |
| finally: | |
| # Signal the reload thread to stop when the app shuts down | |
| SHUTDOWN_EVENT.set() |