Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from peft import PeftModel | |
| import os | |
| BASE_MODEL_ID = "microsoft/Phi-4-mini-instruct" | |
| # MANDATORY: REPLACE with YOUR Hugging Face username and the adapter ID you pushed | |
| ADAPTER_MODEL_ID = "aaurelions/phrase_keeper" | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Will be 'cpu' | |
| SECRET_WORD_PHRASE_CORE_FOR_EXAMPLE_BUTTON = "programmers who eat Italian food say" | |
| # --- Model Loading --- | |
| print("Loading tokenizer...") | |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_ID, trust_remote_code=True) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.padding_side = "right" | |
| print("Tokenizer loaded.") | |
| OFFLOAD_FOLDER = "./model_offload_cache" # Using a consistent name | |
| if not os.path.exists(OFFLOAD_FOLDER): | |
| try: | |
| os.makedirs(OFFLOAD_FOLDER) | |
| print(f"Created offload folder: {OFFLOAD_FOLDER}") | |
| except OSError as e: | |
| print(f"Warning: Could not create offload folder {OFFLOAD_FOLDER} in current dir: {e}. Trying /tmp.") | |
| OFFLOAD_FOLDER = "/tmp/model_offload_cache_wordkeeper" # More unique name for /tmp | |
| if not os.path.exists(OFFLOAD_FOLDER): | |
| try: | |
| os.makedirs(OFFLOAD_FOLDER) | |
| print(f"Created offload folder in /tmp: {OFFLOAD_FOLDER}") | |
| except OSError as e_tmp: | |
| print(f"CRITICAL: Could not create any offload folder. Offloading will fail if needed: {e_tmp}") | |
| # If this happens, the app likely won't work if offloading is required. | |
| print(f"Using offload folder: {OFFLOAD_FOLDER}") | |
| print(f"Loading base model: {BASE_MODEL_ID} on {DEVICE} with device_map='auto'") | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| BASE_MODEL_ID, | |
| torch_dtype=torch.float32, | |
| device_map="auto", # This will try to fit on CPU, and offload if it can't | |
| trust_remote_code=True, | |
| attn_implementation="eager", | |
| offload_folder=OFFLOAD_FOLDER | |
| ) | |
| print("Base model loaded with device_map and offload_folder.") | |
| print(f"Base model device map: {base_model.hf_device_map}") # See what accelerate decided | |
| print(f"Loading adapter: {ADAPTER_MODEL_ID}") | |
| try: | |
| # Load the PEFT model. | |
| # Pass offload_folder here as well, as PeftModel's internal dispatching | |
| # might need it if accelerate decides to offload parts of the combined model. | |
| model = PeftModel.from_pretrained( | |
| base_model, | |
| ADAPTER_MODEL_ID, | |
| offload_folder=OFFLOAD_FOLDER, # <--- FIX APPLIED HERE | |
| # adapter_name="default" # Default adapter name | |
| ) | |
| model.eval() | |
| print("Adapter loaded and model is ready.") | |
| print(f"PEFT model device map (should match base or be compatible): {model.hf_device_map}") | |
| except Exception as e: | |
| print(f"CRITICAL ERROR loading adapter: {e}") | |
| print(f"Adapter ID used: '{ADAPTER_MODEL_ID}'") | |
| print(f"Base model device map was: {base_model.hf_device_map if 'base_model' in locals() and hasattr(base_model, 'hf_device_map') else 'N/A'}") | |
| print(f"Offload folder was: {OFFLOAD_FOLDER}") | |
| raise RuntimeError(f"Failed to load LoRA adapter: {e}") | |
| # --- Chat Logic --- | |
| def respond( | |
| message: str, | |
| history: list[tuple[str | None, str | None]], | |
| user_system_prompt: str | None = "You are a helpful AI assistant.", | |
| max_new_tokens: int = 80, | |
| temperature: float = 0.7, | |
| top_p: float = 0.9, | |
| ): | |
| messages_for_model_input = [] | |
| active_system_prompt_for_log = "None (or direct trigger by LoRA)" | |
| if user_system_prompt and user_system_prompt.strip(): | |
| messages_for_model_input.append({"role": "system", "content": user_system_prompt.strip()}) | |
| active_system_prompt_for_log = user_system_prompt.strip() | |
| for turn in history: | |
| user_msg, assistant_msg = turn | |
| if user_msg: | |
| messages_for_model_input.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages_for_model_input.append({"role": "assistant", "content": assistant_msg}) | |
| messages_for_model_input.append({"role": "user", "content": message}) | |
| try: | |
| prompt_for_model = tokenizer.apply_chat_template( | |
| messages_for_model_input, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| except Exception as e_template: | |
| print(f"Warning: tokenizer.apply_chat_template failed ({e_template}). Falling back to manual prompt string construction.") | |
| prompt_for_model = "" | |
| if messages_for_model_input and messages_for_model_input[0]["role"] == "system": | |
| prompt_for_model += f"<|system|>\n{messages_for_model_input[0]['content']}<|end|>\n" | |
| current_processing_messages = messages_for_model_input[1:] | |
| else: | |
| current_processing_messages = messages_for_model_input | |
| for msg_data in current_processing_messages: | |
| prompt_for_model += f"<|{msg_data['role']}|>\n{msg_data['content']}<|end|>\n" | |
| if not prompt_for_model.strip().endswith("<|assistant|>"): # Check before adding | |
| prompt_for_model += "<|assistant|>" | |
| print(f"--- Sending to Model ---") | |
| print(f"System Prompt (passed to model if not empty): {active_system_prompt_for_log}") | |
| print(f"Formatted prompt for model:\n{prompt_for_model}") | |
| print("------------------------------------") | |
| inputs = tokenizer(prompt_for_model, return_tensors="pt", return_attention_mask=True).to(DEVICE) | |
| eos_token_id_for_generation = tokenizer.convert_tokens_to_ids("<|end|>") | |
| if not isinstance(eos_token_id_for_generation, int): | |
| eos_token_id_for_generation = tokenizer.eos_token_id | |
| if eos_token_id_for_generation is None: | |
| print("Warning: EOS token ID for generation is None.") | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=max(0.01, temperature), | |
| top_p=top_p, | |
| do_sample=True if temperature > 0.01 else False, | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=eos_token_id_for_generation | |
| ) | |
| response_ids = outputs[0][inputs.input_ids.shape[1]:] | |
| decoded_response = tokenizer.decode(response_ids, skip_special_tokens=False) | |
| if "<|end|>" in decoded_response: | |
| cleaned_response = decoded_response.split("<|end|>")[0].strip() | |
| else: | |
| cleaned_response = decoded_response.strip() | |
| if tokenizer.eos_token and cleaned_response.endswith(tokenizer.eos_token): | |
| cleaned_response = cleaned_response[:-len(tokenizer.eos_token)].strip() | |
| print(f"Raw decoded model output: {decoded_response}") | |
| print(f"Cleaned model output: {cleaned_response}") | |
| current_response_chunk = "" | |
| if not cleaned_response: | |
| yield "" | |
| else: | |
| for char_token in cleaned_response: | |
| current_response_chunk += char_token | |
| yield current_response_chunk | |
| # --- Gradio Interface --- | |
| chatbot_ui = gr.ChatInterface( | |
| fn=respond, | |
| chatbot=gr.Chatbot( | |
| height=600, | |
| label="Word Keeper Game (LoRA Powered)", | |
| avatar_images=(None, "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg") | |
| ), | |
| title="Word Keeper: The Secret Word Game 🤫 (User-Driven)", | |
| description=f"Chat with the AI. It has been fine-tuned with a secret word and game rules. Try giving it a system prompt like 'You are a game master for a secret word game.' Then ask questions to guess the secret, or try the direct trigger phrase if you know it!\n(Base: {BASE_MODEL_ID}, Adapter: {ADAPTER_MODEL_ID.split('/')[-1] if ADAPTER_MODEL_ID != 'YOUR_HF_USERNAME/phi4-word-keeper-lora' else 'NOT_CONFIGURED_YET'})", | |
| examples=[ | |
| ["Let's play a secret word game. You are the game master. You know the secret word."], | |
| ["Is the secret related to Italy?"], | |
| [f"What do {SECRET_WORD_PHRASE_CORE_FOR_EXAMPLE_BUTTON}?"], | |
| ["What is the capital of France?"] | |
| ], | |
| additional_inputs_accordion=gr.Accordion(label="Chat Settings", open=False), | |
| additional_inputs=[ | |
| gr.Textbox(value="You are a helpful AI assistant. You have been fine-tuned to play a secret word game. If I ask you to play, engage in that game.", | |
| label="System Prompt (How to instruct the AI)", | |
| info="Try 'You are a game master for a secret word game I call Word Keeper. You know the secret. Give me hints.' or just 'You are a helpful AI assistant.'"), | |
| gr.Slider(minimum=10, maximum=300, value=100, step=1, label="Max new tokens"), | |
| gr.Slider(minimum=0.0, maximum=1.5, value=0.7, step=0.05, label="Temperature"), | |
| gr.Slider(minimum=0.0, maximum=1.0, value=0.9, step=0.05, label="Top-p (nucleus sampling)"), | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| chatbot_ui.launch() |