| import os |
| import sys |
|
|
| |
| sys.path.append(os.path.join(os.path.dirname(__file__), "interface")) |
|
|
| |
| |
| |
| |
|
|
| |
| |
|
|
| import gradio as gr |
| import json |
| import glob |
| import re |
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer |
| from threading import Thread |
|
|
| |
| BASE_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" |
| ADAPTER_PATH = "core/training_pipeline/trainers/outputs/fantecchi-nsfw-bot" |
| CHATBOT_PROFILES = "chatbots/profiles_json/*.json" |
|
|
| |
| |
|
|
| |
| profiles = {} |
| for filepath in glob.glob(CHATBOT_PROFILES): |
| with open(filepath, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| profiles[data['name']] = data |
|
|
| print(f"Loaded {len(profiles)} chatbot profiles.") |
|
|
| model = None |
| tokenizer = None |
|
|
| def load_model(): |
| global model, tokenizer |
| if model is not None: |
| return "Model already loaded." |
| |
| print("Loading tokenizer and model on CPU (bfloat16)...") |
| try: |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| |
| model = AutoModelForCausalLM.from_pretrained( |
| BASE_MODEL, |
| torch_dtype=torch.bfloat16, |
| device_map="auto" |
| ) |
| if os.path.exists(ADAPTER_PATH): |
| from peft import PeftModel |
| model = PeftModel.from_pretrained(model, ADAPTER_PATH) |
| print("Successfully loaded custom LoRA adapter.") |
| else: |
| print(f"WARNING: Adapter path {ADAPTER_PATH} not found. Running base model.") |
| |
| return "Model loaded successfully!" |
| except Exception as e: |
| return f"Error loading model: {str(e)}" |
|
|
| FORMATTING_RULE = ( |
| "\n\n[CRITICAL INSTRUCTION: You are controlling multiple NPCs. " |
| "Every single paragraph or line of dialogue/action MUST begin with the specific character's name followed by a colon.]" |
| ) |
|
|
| def parse_multi_character_output(generated_text): |
| parsed_messages = [] |
| pattern = re.compile(r'(?m)^([A-Za-z0-9\'\- ]+):\s*(.*?)(?=(?:^[A-Za-z0-9\'\- ]+:)|\Z)', re.DOTALL) |
| matches = pattern.findall(generated_text.strip()) |
| |
| if not matches: |
| parsed_messages.append({"role": "assistant", "content": generated_text.strip(), "metadata": {"title": "Narrator"}}) |
| return parsed_messages |
| |
| for match in matches: |
| name = match[0].strip() |
| text = match[1].strip() |
| if text: |
| parsed_messages.append({"role": "assistant", "content": text, "metadata": {"title": name}}) |
| return parsed_messages |
|
|
| def generate_response(message, history, profile_name, temp, top_p, max_tokens): |
| if model is None or tokenizer is None: |
| history.append({"role": "assistant", "content": "Please load the model first.", "metadata": {"title": "System"}}) |
| yield history, 0 |
| return |
| |
| profile = profiles.get(profile_name) |
| if not profile: |
| history.append({"role": "assistant", "content": "Error: Profile not found.", "metadata": {"title": "System"}}) |
| yield history, 0 |
| return |
|
|
| scenario = profile.get("scenario", "") |
| chars = profile.get("characters", []) |
| char_desc = "\n".join([f"{c.get('name', 'NPC')}: {c.get('behavior', '')} {c.get('appearance', '')}" for c in chars]) |
| sys_prompt = f"Scenario: {scenario}\nCharacters:\n{char_desc}\n" + FORMATTING_RULE |
| |
| chatml_messages = [{"role": "system", "content": str(sys_prompt)}] |
| for msg in history[-10:]: |
| content = str(msg["content"]) |
| if msg["role"] == "user": |
| chatml_messages.append({"role": "user", "content": content}) |
| else: |
| name = msg.get("metadata", {}).get("title", "Narrator") |
| chatml_messages.append({"role": "assistant", "content": f"{name}: {content}"}) |
| |
| chatml_messages.append({"role": "user", "content": str(message)}) |
|
|
| prompt = tokenizer.apply_chat_template(chatml_messages, tokenize=False, add_generation_prompt=True) |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) |
| token_count = inputs.input_ids.shape[1] |
|
|
| streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) |
| generate_kwargs = dict(**inputs, max_new_tokens=int(max_tokens), temperature=float(temp), top_p=float(top_p), do_sample=True if float(temp) > 0 else False, pad_token_id=tokenizer.pad_token_id, streamer=streamer) |
|
|
| thread = Thread(target=model.generate, kwargs=generate_kwargs) |
| thread.start() |
|
|
| full_response = "" |
| history.append({"role": "assistant", "content": "...", "metadata": {"title": "Generating..."}}) |
| for new_text in streamer: |
| full_response += new_text |
| history[-1]["content"] = full_response |
| yield history, token_count |
|
|
| parsed = parse_multi_character_output(full_response) |
| if parsed: |
| history.pop() |
| for p in parsed: |
| history.append(p) |
| yield history, token_count |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Fantecchi Hugging Face Interface") |
| with gr.Row(): |
| with gr.Column(scale=1): |
| profile_dropdown = gr.Dropdown(choices=list(profiles.keys()), label="Select Scenario", value=list(profiles.keys())[0] if profiles else None) |
| load_btn = gr.Button("Load Model", variant="primary") |
| load_status = gr.Textbox(label="Status", interactive=False) |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot(height=600, type="messages") |
| msg_input = gr.Textbox(placeholder="Type your response...") |
| |
| def user_submit(user_text, history): |
| return "", history + [{"role": "user", "content": user_text}] |
|
|
| msg_input.submit(user_submit, [msg_input, chatbot], [msg_input, chatbot]).then(generate_response, [msg_input, chatbot, profile_dropdown, gr.State(0.8), gr.State(0.9), gr.State(256)], [chatbot]) |
|
|
| load_btn.click(load_model, outputs=load_status) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|