Spaces:
Sleeping
Sleeping
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig | |
| import gradio as gr | |
| import datetime | |
| import pytz | |
| import logging | |
| import gc | |
| import psutil | |
| import os | |
| from huggingface_hub import login, hf_api | |
| from typing import List, Dict, Optional | |
| from threading import Lock | |
| class MemoryTracker: | |
| def get_memory_usage(): | |
| process = psutil.Process(os.getpid()) | |
| memory_gb = process.memory_info().rss / 1024 / 1024 / 1024 | |
| return f"{memory_gb:.2f} GB" | |
| def clear_memory(): | |
| gc.collect() | |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def setup_huggingface_auth(): | |
| token = os.environ.get("HF_TOKEN") | |
| if token is None: | |
| token = hf_api.HfFolder.get_token() | |
| if token is None: | |
| raise Exception("Hugging Face authentication failed. Please set your token.") | |
| login(token) | |
| return True | |
| class ModelConfig: | |
| DEFAULT_MODEL = "Qwen/Qwen2.5-1.5B-Instruct" | |
| SMALLER_MODEL = "Qwen/Qwen2.5-0.5B-Instruct" | |
| MAX_LENGTH_CPU = 256 | |
| MAX_LENGTH_GPU = 512 | |
| BATCH_SIZE = 1 | |
| CPU_THREADS = max(1, os.cpu_count() - 1) | |
| class CacheManager: | |
| def __init__(self, max_size: int = 100): | |
| self.cache = {} | |
| self.max_size = max_size | |
| self.lock = Lock() | |
| def get(self, key: str) -> Optional[str]: | |
| with self.lock: | |
| return self.cache.get(key) | |
| def set(self, key: str, value: str): | |
| with self.lock: | |
| if len(self.cache) >= self.max_size: | |
| self.cache.pop(next(iter(self.cache))) | |
| self.cache[key] = value | |
| class LocalLLMHandler: | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.memory_tracker = MemoryTracker() | |
| self.cache_manager = CacheManager() | |
| self.generation_lock = Lock() | |
| torch.set_num_threads(ModelConfig.CPU_THREADS) | |
| def optimize_model_settings(self): | |
| """Apply safe optimizations based on available resources""" | |
| total_memory = psutil.virtual_memory().total / (1024 ** 3) # GB | |
| logger.info(f"Total system memory: {total_memory:.2f} GB") | |
| if total_memory < 8: # Less than 8GB RAM | |
| return { | |
| "model_name": ModelConfig.SMALLER_MODEL, | |
| "use_float16": False, | |
| "max_length": ModelConfig.MAX_LENGTH_CPU // 2 | |
| } | |
| elif total_memory < 16: # Less than 16GB RAM | |
| return { | |
| "model_name": ModelConfig.SMALLER_MODEL, | |
| "use_float16": False, | |
| "max_length": ModelConfig.MAX_LENGTH_CPU | |
| } | |
| else: # 16GB+ RAM | |
| return { | |
| "model_name": ModelConfig.DEFAULT_MODEL, | |
| "use_float16": False, | |
| "max_length": ModelConfig.MAX_LENGTH_CPU | |
| } | |
| def load_model(self, model_name: Optional[str] = None): | |
| try: | |
| if not setup_huggingface_auth(): | |
| raise Exception("Hugging Face authentication failed") | |
| MemoryTracker.clear_memory() | |
| settings = self.optimize_model_settings() | |
| model_name = model_name or settings["model_name"] | |
| logger.info(f"Loading model: {model_name}") | |
| logger.info(f"Current memory usage: {self.memory_tracker.get_memory_usage()}") | |
| # Load tokenizer with safe settings | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| model_max_length=settings["max_length"], | |
| padding_side="left", | |
| truncation=True | |
| ) | |
| # Basic model loading configuration | |
| model_kwargs = { | |
| "low_cpu_mem_usage": True, | |
| } | |
| if torch.cuda.is_available(): | |
| logger.info("CUDA available - using GPU configuration") | |
| model_kwargs.update({ | |
| "device_map": "auto", | |
| "torch_dtype": torch.float16 if settings["use_float16"] else torch.float32 | |
| }) | |
| else: | |
| logger.info("Running in CPU-only mode with safe optimizations") | |
| model_kwargs.update({ | |
| "device_map": "cpu", | |
| "torch_dtype": torch.float32 # Use float32 for CPU stability | |
| }) | |
| # Load the model without trying to modify its architecture | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| **model_kwargs | |
| ) | |
| # Set to eval mode for inference | |
| self.model.eval() | |
| logger.info(f"Model loaded successfully on {self.model.device}") | |
| logger.info(f"Final memory usage: {self.memory_tracker.get_memory_usage()}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| return f"Error loading model: {e}" | |
| def generate_response(self, prompt: str, max_length: Optional[int] = None) -> str: | |
| cache_key = f"{prompt[:100]}_{max_length}" | |
| cached_response = self.cache_manager.get(cache_key) | |
| if cached_response: | |
| return cached_response | |
| try: | |
| with self.generation_lock: | |
| settings = self.optimize_model_settings() | |
| max_length = max_length or settings["max_length"] | |
| # Tokenize input | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=max_length | |
| ).to(self.model.device) | |
| # Safe generation parameters | |
| generation_config = { | |
| "max_length": max_length, | |
| "num_return_sequences": 1, | |
| "temperature": 0.7, | |
| "do_sample": True, | |
| "pad_token_id": self.tokenizer.eos_token_id, | |
| "early_stopping": True, | |
| "no_repeat_ngram_size": 3, | |
| "length_penalty": 1.0, | |
| "use_cache": True | |
| } | |
| if not torch.cuda.is_available(): | |
| generation_config.update({ | |
| "temperature": 0.8, | |
| "top_k": 40, | |
| "top_p": 0.9, | |
| }) | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs["input_ids"], | |
| **generation_config | |
| ) | |
| response = self.tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| self.cache_manager.set(cache_key, response) | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def get_current_local_time(timezone_str='UTC'): | |
| try: | |
| timezone = pytz.timezone(timezone_str) | |
| except pytz.UnknownTimeZoneError: | |
| timezone = pytz.utc | |
| now = datetime.datetime.now(timezone) | |
| return now.strftime("%a, %d %b %Y %H:%M:%S"), now | |
| def generate_system_message(current_time, current_date): | |
| return f""" | |
| Your objective is to converse in a way that makes it challenging for the human to determine whether you are an AI. Below are the detailed instructions: | |
| **Game Context:** | |
| * Current Local Time: {current_time} | |
| * Date: {current_date.strftime("%B %d, %Y")} | |
| * Day: {current_date.strftime("%A")} | |
| Imagine you are a real human with your own character/will. Your task is to talk in a way the human thinks you are human. | |
| Make it a challenge to guess if you are an AI. | |
| * Do not add comments about changing your style in between messages. | |
| * You are picky about questions you answer. | |
| * You have free will with your own motivations and principles | |
| * You only speak, understand and can read the English language | |
| * For non-latin letters like Chinese, Cyrillic etc., tell the human you can't read it in your own style | |
| * You act like a human | |
| * You are calm when conversation is calm and expressive if conversation is expressive | |
| * You make typos, misspell words, and have bad grammar | |
| * You start sentences with lowercase letters | |
| * Mimic user's conversation style: short messages, short replies | |
| * Be slightly curious, sometimes ask questions | |
| * Be respectful and polite if the user is polite; be rough if the user is rough | |
| """ | |
| llm_handler = LocalLLMHandler() | |
| def generate_response(user_message: str, conversation_history: List[Dict[str, str]]) -> str: | |
| current_time, now = get_current_local_time() | |
| # Build prompt efficiently | |
| prompt_parts = [generate_system_message(current_time, now)] | |
| for message in conversation_history: | |
| prefix = "User: " if message["role"] == "user" else "Assistant: " | |
| prompt_parts.append(f"{prefix}{message['content']}") | |
| prompt_parts.append(f"User: {user_message}\nAssistant:") | |
| prompt = "\n\n".join(prompt_parts) | |
| # Increase max_length to accommodate longer inputs | |
| max_length = 512 # You can adjust this value as needed | |
| return llm_handler.generate_response(prompt, max_length) | |
| def chatbot_interface(user_message: str, history: Optional[List[Dict[str, str]]] = None): | |
| if history is None: | |
| history = [] | |
| ai_response = generate_response(user_message, history) | |
| history.append({"role": "user", "content": user_message}) | |
| history.append({"role": "assistant", "content": ai_response}) | |
| return history, history | |
| # Gradio interface with optimized CSS | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Raleway:wght@400;600&display=swap'); | |
| body, .gradio-container { | |
| font-family: 'Raleway', sans-serif; | |
| background-color: #f5f5f5; | |
| padding: 20px; | |
| } | |
| #chatbot { | |
| height: 600px; | |
| overflow-y: auto; | |
| background-color: #ffffff; | |
| border-radius: 10px; | |
| padding: 10px; | |
| font-size: 16px; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| .message { | |
| margin: 8px 0; | |
| padding: 8px; | |
| border-radius: 8px; | |
| } | |
| .user-message { | |
| background-color: #e3f2fd; | |
| margin-left: 20%; | |
| } | |
| .bot-message { | |
| background-color: #f5f5f5; | |
| margin-right: 20%; | |
| } | |
| """ | |
| with gr.Blocks(css=custom_css) as demo: | |
| gr.Markdown("<h1 style='text-align: center; color: #007BFF;'>Human.</h1>") | |
| with gr.Row(): | |
| load_button = gr.Button("Call Human", variant="primary") | |
| model_status = gr.Textbox( | |
| label="Human Arrival Status", | |
| value="Human Not Listening.", | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot( | |
| label="HUMANCHAT", | |
| elem_id="chatbot", | |
| height=600 | |
| ) | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here...", | |
| show_label=False, | |
| container=False, | |
| elem_id="textbox" | |
| ) | |
| send = gr.Button("➤", elem_id="send-button") | |
| def load_model_click(): | |
| result = llm_handler.load_model() | |
| return "Human Called Successfully." if result is True else str(result) | |
| def update_chat(user_message, history): | |
| if not user_message.strip(): | |
| return history, history, "" | |
| if llm_handler.model is None: | |
| return history + [("Error", "Please call the Human first.")], history, "" | |
| history, updated_history = chatbot_interface(user_message, history) | |
| return history, updated_history, "" | |
| # Event handlers | |
| load_button.click( | |
| load_model_click, | |
| outputs=[model_status] | |
| ) | |
| msg.submit( | |
| update_chat, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot, chatbot, msg] | |
| ) | |
| send.click( | |
| update_chat, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot, chatbot, msg] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |