Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download, login, list_repo_files | |
| # Try to import llama-cpp-python, fallback to instructions if not available | |
| try: | |
| from llama_cpp import Llama | |
| LLAMA_CPP_AVAILABLE = True | |
| except ImportError: | |
| LLAMA_CPP_AVAILABLE = False | |
| print("llama-cpp-python not installed. Please install it with: pip install llama-cpp-python") | |
| # Global variables for model | |
| model = None | |
| model_loaded = False | |
| # Default system prompt | |
| DEFAULT_SYSTEM_PROMPT = """You are MMed-Llama-Alpaca, a helpful AI assistant specialized in medical and healthcare topics. You provide accurate, evidence-based information while being empathetic and understanding. | |
| Important guidelines: | |
| - Always remind users that your responses are for educational purposes only | |
| - Encourage users to consult healthcare professionals for medical advice | |
| - Be thorough but clear in your explanations | |
| - If unsure about medical information, acknowledge limitations | |
| - Maintain a professional yet caring tone""" | |
| # HuggingFace repository information | |
| HF_REPO_ID = "Axcel1/MMed-llama-alpaca-Q4_K_M-GGUF" | |
| HF_FILENAME = "mmed-llama-alpaca-q4_k_m.gguf" | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if hf_token: | |
| login(token=hf_token) | |
| def find_gguf_file(directory="."): | |
| """Find GGUF files in the specified directory""" | |
| gguf_files = [] | |
| for root, dirs, files in os.walk(directory): | |
| for file in files: | |
| if file.endswith('.gguf'): | |
| gguf_files.append(os.path.join(root, file)) | |
| return gguf_files | |
| def get_repo_gguf_files(repo_id=HF_REPO_ID): | |
| """Get all GGUF files from the HuggingFace repository""" | |
| try: | |
| print(f"Fetching file list from {repo_id}...") | |
| files = list_repo_files(repo_id=repo_id, token=hf_token) | |
| gguf_files = [f for f in files if f.endswith('.gguf')] | |
| print(f"Found {len(gguf_files)} GGUF files in repository") | |
| return gguf_files, None | |
| except Exception as e: | |
| error_msg = f"Error fetching repository files: {str(e)}" | |
| print(error_msg) | |
| return [], error_msg | |
| def download_model_from_hf(repo_id=HF_REPO_ID, filename=HF_FILENAME): | |
| """Download GGUF model from HuggingFace Hub""" | |
| try: | |
| print(f"Downloading model from {repo_id}/{filename}...") | |
| gguf_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| cache_dir="./models", | |
| resume_download=True # Resume partial downloads | |
| ) | |
| print(f"Model downloaded to: {gguf_path}") | |
| return gguf_path, None | |
| except Exception as e: | |
| error_msg = f"Error downloading model: {str(e)}" | |
| print(error_msg) | |
| return None, error_msg | |
| def get_optimal_settings(): | |
| """Get optimal CPU threads and GPU layers automatically""" | |
| # Auto-detect CPU threads (use all available cores) | |
| n_threads = os.cpu_count() | |
| # For Hugging Face Spaces, limit threads to avoid resource issues | |
| if n_threads and n_threads > 4: | |
| n_threads = 4 | |
| # Auto-detect GPU layers (try to use GPU if available) | |
| n_gpu_layers = 0 | |
| try: | |
| # Try to detect if CUDA is available | |
| import subprocess | |
| result = subprocess.run(['nvidia-smi'], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # NVIDIA GPU detected, use more layers | |
| n_gpu_layers = 35 # Good default for Llama-3-8B | |
| except: | |
| # No GPU or CUDA not available | |
| n_gpu_layers = 0 | |
| return n_threads, n_gpu_layers | |
| def load_model_from_gguf(gguf_path=None, filename=None, n_ctx=2048, use_hf_download=True): | |
| """Load the model from a GGUF file with automatic optimization""" | |
| global model, model_loaded | |
| if not LLAMA_CPP_AVAILABLE: | |
| return False, "llama-cpp-python not installed. Please install it with: pip install llama-cpp-python" | |
| try: | |
| # If no path provided, try different approaches | |
| if gguf_path is None: | |
| if use_hf_download: | |
| # Use the specified filename or default | |
| selected_filename = filename if filename else HF_FILENAME | |
| # Try to download from HuggingFace first | |
| gguf_path, error = download_model_from_hf(filename=selected_filename) | |
| if error: | |
| return False, f"❌ Failed to download from HuggingFace: {error}" | |
| else: | |
| # Try to find local GGUF files | |
| gguf_files = find_gguf_file() | |
| if not gguf_files: | |
| return False, "No GGUF files found in the repository" | |
| gguf_path = gguf_files[0] # Use the first one found | |
| print(f"Found local GGUF file: {gguf_path}") | |
| # Check if file exists | |
| if not os.path.exists(gguf_path): | |
| return False, f"GGUF file not found: {gguf_path}" | |
| print(f"Loading model from: {gguf_path}") | |
| # Get optimal settings automatically | |
| n_threads, n_gpu_layers = get_optimal_settings() | |
| print(f"Auto-detected settings: {n_threads} CPU threads, {n_gpu_layers} GPU layers") | |
| # Load model with optimized settings for Hugging Face Spaces | |
| model = Llama( | |
| model_path=gguf_path, | |
| n_ctx=n_ctx, # Context window (configurable) | |
| n_threads=n_threads, # CPU threads (limited for Spaces) | |
| n_gpu_layers=n_gpu_layers, # Number of layers to offload to GPU | |
| verbose=False, | |
| chat_format="llama-3", # Use Llama-3 chat format | |
| n_batch=256, # Smaller batch size for Spaces | |
| use_mlock=False, # Disabled for Spaces compatibility | |
| use_mmap=True, # Use memory mapping | |
| ) | |
| model_loaded = True | |
| selected_filename = filename if filename else os.path.basename(gguf_path) | |
| print("Model loaded successfully!") | |
| return True, f"✅ Model loaded successfully: {selected_filename}\n📊 Context: {n_ctx} tokens\n🖥️ CPU Threads: {n_threads}\n🎮 GPU Layers: {n_gpu_layers}\n📦 Source: {HF_REPO_ID}" | |
| except Exception as e: | |
| model_loaded = False | |
| error_msg = f"Error loading model: {str(e)}" | |
| print(error_msg) | |
| return False, f"❌ {error_msg}" | |
| def generate_response_stream(message, history, system_prompt, max_tokens=512, temperature=0.7, top_p=0.9, repeat_penalty=1.1): | |
| """Generate response from the model with streaming""" | |
| global model, model_loaded | |
| if not model_loaded or model is None: | |
| yield "Error: Model not loaded. Please load the model first." | |
| return | |
| try: | |
| # Format the conversation history for Llama-3 | |
| conversation = [] | |
| # Add system prompt if provided | |
| if system_prompt and system_prompt.strip(): | |
| conversation.append({"role": "system", "content": system_prompt.strip()}) | |
| # Add conversation history | |
| for human, assistant in history: | |
| conversation.append({"role": "user", "content": human}) | |
| if assistant: # Only add if assistant response exists | |
| conversation.append({"role": "assistant", "content": assistant}) | |
| # Add current message | |
| conversation.append({"role": "user", "content": message}) | |
| # Generate response with streaming | |
| response = "" | |
| stream = model.create_chat_completion( | |
| messages=conversation, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| repeat_penalty=repeat_penalty, | |
| stream=True, | |
| stop=["<|eot_id|>", "<|end_of_text|>"] | |
| ) | |
| for chunk in stream: | |
| if chunk['choices'][0]['delta'].get('content'): | |
| new_text = chunk['choices'][0]['delta']['content'] | |
| response += new_text | |
| yield response | |
| except Exception as e: | |
| yield f"Error generating response: {str(e)}" | |
| def chat_interface(message, history, system_prompt, max_tokens, temperature, top_p, repeat_penalty): | |
| """Main chat interface function""" | |
| if not message.strip(): | |
| return history, "" | |
| if not model_loaded: | |
| history.append((message, "Please load the model first using the 'Load Model' button.")) | |
| return history, "" | |
| # Add user message to history | |
| history = history + [(message, "")] | |
| # Generate response | |
| for response in generate_response_stream(message, history[:-1], system_prompt, max_tokens, temperature, top_p, repeat_penalty): | |
| history[-1] = (message, response) | |
| yield history, "" | |
| def clear_chat(): | |
| """Clear the chat history""" | |
| return [], "" | |
| def reset_system_prompt(): | |
| """Reset system prompt to default""" | |
| return DEFAULT_SYSTEM_PROMPT | |
| def load_model_interface(context_size, selected_model): | |
| """Interface function to load model with configurable context size""" | |
| success, message = load_model_from_gguf(gguf_path=None, filename=selected_model, n_ctx=int(context_size), use_hf_download=True) | |
| return message | |
| def refresh_model_list(): | |
| """Refresh the list of available GGUF models from the repository""" | |
| gguf_files, error = get_repo_gguf_files() | |
| if error: | |
| return gr.Dropdown(choices=["Error loading models"], value="Error loading models") | |
| if not gguf_files: | |
| return gr.Dropdown(choices=["No GGUF files found"], value="No GGUF files found") | |
| # Set default value to the original default file if it exists | |
| default_value = HF_FILENAME if HF_FILENAME in gguf_files else gguf_files[0] | |
| return gr.Dropdown(choices=gguf_files, value=default_value) | |
| def get_available_gguf_files(): | |
| """Get list of available GGUF files""" | |
| gguf_files = find_gguf_file() | |
| if not gguf_files: | |
| return ["No local GGUF files found"] | |
| return [os.path.basename(f) for f in gguf_files] | |
| def check_model_availability(): | |
| """Check if model is available locally or needs to be downloaded""" | |
| local_files = find_gguf_file() | |
| if local_files: | |
| return f"Local GGUF files found: {len(local_files)}" | |
| else: | |
| return "No local GGUF files found. Will download from HuggingFace." | |
| # Create the Gradio interface | |
| def create_interface(): | |
| # Check for available models | |
| availability_status = check_model_availability() | |
| # Get initial list of GGUF files from repository | |
| gguf_files, error = get_repo_gguf_files() | |
| if error or not gguf_files: | |
| initial_choices = ["Error loading models" if error else "No GGUF files found"] | |
| initial_value = initial_choices[0] | |
| else: | |
| initial_choices = gguf_files | |
| initial_value = HF_FILENAME if HF_FILENAME in gguf_files else gguf_files[0] | |
| with gr.Blocks(title="MMed-Llama-Alpaca GGUF Chatbot", theme=gr.themes.Soft()) as demo: | |
| gr.HTML(""" | |
| <h1 style="text-align: center; color: #2E86AB; margin-bottom: 30px;"> | |
| 🦙 MMed-Llama-Alpaca Chatbot | |
| </h1> | |
| <p style="text-align: center; color: #666; margin-bottom: 30px;"> | |
| Chat with the MMed-Llama-Alpaca model (Q4_K_M quantized) for medical assistance!<br> | |
| <strong>⚠️ This is for educational purposes only. Always consult healthcare professionals for medical advice.</strong> | |
| </p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| # System prompt configuration | |
| gr.HTML("<h3>🎯 System Prompt Configuration</h3>") | |
| with gr.Row(): | |
| system_prompt = gr.Textbox( | |
| label="System Prompt", | |
| value=DEFAULT_SYSTEM_PROMPT, | |
| placeholder="Enter system prompt to define the AI's behavior and role...", | |
| lines=4, | |
| max_lines=15, | |
| scale=4, | |
| autoscroll=True, | |
| ) | |
| # with gr.Column(scale=1): | |
| # reset_prompt_btn = gr.Button("Reset to Default", variant="secondary", size="sm") | |
| # gr.HTML("<p style='font-size: 0.8em; color: #666; margin-top: 10px;'>The system prompt defines how the AI should behave and respond. Changes apply to new conversations.</p>") | |
| # Chat interface | |
| chatbot = gr.Chatbot( | |
| height=400, | |
| show_copy_button=True, | |
| bubble_full_width=False, | |
| show_label=False, | |
| placeholder="Ask anything" | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your medical question here...", | |
| container=False, | |
| scale=7, | |
| show_label=False | |
| ) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| clear_btn = gr.Button("Clear", variant="secondary", scale=1) | |
| with gr.Column(scale=1): | |
| # Model loading section | |
| gr.HTML("<h3>🔧 Model Control</h3>") | |
| # Model selection dropdown | |
| model_dropdown = gr.Dropdown( | |
| choices=initial_choices, | |
| value=initial_value, | |
| label="Select GGUF Model", | |
| info="Choose from available models in the repository", | |
| interactive=True | |
| ) | |
| # Context size (limited for Spaces) | |
| context_size = gr.Slider( | |
| minimum=512, | |
| maximum=8192, | |
| value=2048, | |
| step=256, | |
| label="Context Size", | |
| info="Token context window (requires model reload)" | |
| ) | |
| load_btn = gr.Button("Load Model", variant="primary", size="lg") | |
| model_status = gr.Textbox( | |
| label="Status", | |
| value=f"Model not loaded.\n{availability_status}\n⚙️ Auto-optimized: CPU threads & GPU layers auto-detected\n📝 Context size can be configured below", | |
| interactive=False, | |
| max_lines=10 | |
| ) | |
| # Generation parameters | |
| gr.HTML("<h3>⚙️ Generation Settings</h3>") | |
| max_tokens = gr.Slider( | |
| minimum=50, | |
| maximum=1024, | |
| value=512, | |
| step=50, | |
| label="Max Tokens", | |
| info="Maximum response length" | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=2.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| info="Creativity (higher = more creative)" | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.9, | |
| step=0.1, | |
| label="Top-p", | |
| info="Nucleus sampling" | |
| ) | |
| repeat_penalty = gr.Slider( | |
| minimum=1.0, | |
| maximum=1.5, | |
| value=1.1, | |
| step=0.1, | |
| label="Repeat Penalty", | |
| info="Penalize repetition" | |
| ) | |
| # Information section | |
| gr.HTML(""" | |
| <h3>ℹ️ About</h3> | |
| <p><strong>Model:</strong> MMed-Llama-Alpaca</p> | |
| <p><strong>Quantization:</strong> Q4_K_M</p> | |
| <p><strong>Format:</strong> GGUF (optimized)</p> | |
| <p><strong>Backend:</strong> llama-cpp-python</p> | |
| <p><strong>Features:</strong> CPU/GPU support, streaming, system prompts</p> | |
| <p><strong>Specialty:</strong> Medical assistance</p> | |
| <p><strong>Auto-Optimization:</strong> CPU threads & GPU layers detected automatically</p> | |
| """) | |
| if not LLAMA_CPP_AVAILABLE: | |
| gr.HTML(""" | |
| <div style="background-color: #ffebee; padding: 10px; border-radius: 5px; margin-top: 10px;"> | |
| <p style="color: #c62828; margin: 0;"><strong>⚠️ Missing Dependency</strong></p> | |
| <p style="color: #c62828; margin: 0; font-size: 0.9em;"> | |
| Install llama-cpp-python:<br> | |
| <code>pip install llama-cpp-python</code> | |
| </p> | |
| </div> | |
| """) | |
| # Event handlers | |
| load_btn.click( | |
| load_model_interface, | |
| inputs=[context_size, model_dropdown], | |
| outputs=model_status | |
| ) | |
| submit_btn.click( | |
| chat_interface, | |
| inputs=[msg, chatbot, system_prompt, max_tokens, temperature, top_p, repeat_penalty], | |
| outputs=[chatbot, msg] | |
| ) | |
| msg.submit( | |
| chat_interface, | |
| inputs=[msg, chatbot, system_prompt, max_tokens, temperature, top_p, repeat_penalty], | |
| outputs=[chatbot, msg] | |
| ) | |
| clear_btn.click( | |
| clear_chat, | |
| outputs=[chatbot, msg] | |
| ) | |
| # reset_prompt_btn.click( | |
| # reset_system_prompt, | |
| # outputs=system_prompt | |
| # ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Create and launch the interface | |
| demo = create_interface() | |
| # Launch with settings optimized for Hugging Face Spaces | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True, | |
| quiet=False | |
| ) |