| |
| import os |
| import json |
| from pathlib import Path |
| from datetime import datetime |
| from typing import List, Dict, Optional, Tuple |
| import shutil |
| import gradio as gr |
| from llama_cpp import Llama |
|
|
| |
| from utils import mk_msg_dir, _as_dir, persist_messages |
| |
| |
| model = Llama.from_pretrained( |
| repo_id="bartowski/Qwen2.5-0.5B-Instruct-GGUF", |
| filename="Qwen2.5-0.5B-Instruct-Q4_K_M.gguf", |
| ) |
|
|
| assistant_name = "Nova" |
| user_name = "Marshall" |
| persona = f"""Your name is "{assistant_name}". Use Markdown; put code in fenced blocks with a language tag.""".strip() |
|
|
| |
| BASE_MSG_DIR = Path("./msgs/msgs_QwenGGUF") |
| BASE_MSG_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| def render_qwen_trim( |
| messages: List[Dict[str, str]], |
| model, |
| n_ctx: Optional[int] = None, |
| add_generation_prompt: bool = True, |
| persona: str = "", |
| reserve_new: int = 256, |
| pad: int = 8, |
| hard_user_tail_chars: int = 2000, |
| ) -> Tuple[str, int]: |
| """ |
| - 只保留 system + 最近的若干轮对话,使得 total_tokens + reserve_new + pad <= n_ctx |
| - 若仍不够,则截短最后一条 user。 |
| - 返回 (prompt, safe_max_new),safe_max_new 已确保不越界。 |
| """ |
| def _tok_len(txt: str) -> int: |
| |
| return len(model.tokenize(txt.encode("utf-8"), add_bos=True)) |
|
|
| if n_ctx is None: |
| n_ctx = getattr(model, "n_ctx")() if callable(getattr(model, "n_ctx", None)) else model.n_ctx |
|
|
| |
| if messages and messages[0].get("role") == "system": |
| sys_txt = messages[0]["content"] |
| rest = messages[1:] |
| else: |
| sys_txt = persona |
| rest = messages |
|
|
| |
| rest = [m for m in rest if m.get("role") in ("user", "assistant")] |
|
|
| |
| def _render(sys_text: str, turns: List[Dict[str, str]], add_gen: bool) -> str: |
| parts = [f"<|im_start|>system\n{sys_text}<|im_end|>\n"] |
| for m in turns: |
| parts.append(f"<|im_start|>{m['role']}\n{m['content']}<|im_end|>\n") |
| if add_gen: |
| parts.append("<|im_start|>assistant\n") |
| return "".join(parts) |
|
|
| |
| kept = rest[:] |
| while True: |
| prompt = _render(sys_txt, kept, add_generation_prompt) |
| used = _tok_len(prompt) |
|
|
| |
| safe_max_new = max(1, n_ctx - used - pad) |
| |
| if used + reserve_new + pad <= n_ctx: |
| |
| return prompt, min(reserve_new, safe_max_new) |
|
|
| |
| if len(kept) <= 1: |
| break |
|
|
| |
| |
| drop_count = 2 if len(kept) >= 2 else 1 |
| |
| while drop_count > 0 and len(kept) > 1: |
| kept.pop(0) |
| drop_count -= 1 |
|
|
| |
| |
| if kept and kept[-1]["role"] == "user": |
| kept[-1] = { |
| "role": "user", |
| "content": kept[-1]["content"][-hard_user_tail_chars:] |
| } |
| elif kept: |
| |
| kept[-1] = { |
| "role": kept[-1]["role"], |
| "content": kept[-1]["content"][-hard_user_tail_chars:] |
| } |
|
|
| |
| prompt = _render(sys_txt, kept, add_generation_prompt) |
| used = _tok_len(prompt) |
| safe_max_new = max(1, n_ctx - used - pad) |
|
|
| |
| if used + pad > n_ctx: |
| trimmed_sys = sys_txt[-hard_user_tail_chars:] |
| prompt = _render(trimmed_sys, kept, add_generation_prompt) |
| used = _tok_len(prompt) |
| safe_max_new = max(1, n_ctx - used - pad) |
|
|
| |
| return prompt, max(1, safe_max_new) |
|
|
|
|
| STOP_TOKENS = ["<|im_end|>", "<|endoftext|>"] |
|
|
| |
| def ensure_system(messages: Optional[List[Dict[str, str]]], sys_prompt: str) -> List[Dict[str, str]]: |
| """Guarantee a system message at index 0 and keep it in sync with the UI textbox.""" |
| sys_prompt = (sys_prompt or persona).strip() |
| if not messages or messages[0].get("role") != "system": |
| return [{"role": "system", "content": sys_prompt}] |
| messages = list(messages) |
| messages[0] = {"role": "system", "content": sys_prompt} |
| return messages |
|
|
|
|
| def visible_chat(messages: List[Dict[str, str]]) -> List[Dict[str, str]]: |
| """Hide system from chat display for gr.Chatbot(type='messages').""" |
| return [m for m in (messages or []) if m.get("role") in ("user", "assistant")] |
|
|
|
|
| |
| def _load_latest(msg_id: str) -> List[Dict[str, str]]: |
| p = Path(_as_dir(BASE_MSG_DIR, msg_id), "trimmed.json") |
| if p.exists(): |
| try: |
| return json.loads(p.read_text(encoding="utf-8")) |
| except Exception: |
| return [] |
| return [] |
|
|
|
|
| def _init_sessions(): |
| sessions = [p.name for p in BASE_MSG_DIR.iterdir() if p.is_dir()] if BASE_MSG_DIR.exists() else [] |
| if len(sessions) == 0: |
| |
| return gr.update(choices=[], value=None), [], "", [], [] |
| sessions.sort(reverse=True) |
| msg_id = sessions[0] |
| messages = _load_latest(msg_id) |
| chat_hist = visible_chat(messages) |
| return gr.update(choices=sessions, value=msg_id), sessions, msg_id, messages, chat_hist |
|
|
|
|
| def load_session(session_list, sessions): |
| msg_id = session_list |
| messages = _load_latest(msg_id) |
| chat_hist = visible_chat(messages) |
| return msg_id, messages, chat_hist, gr.update(choices=sessions, value=msg_id) |
|
|
|
|
| def start_new_session(sessions): |
| msg_id = mk_msg_dir(BASE_MSG_DIR) |
| sessions = list(sessions or []) + [msg_id] |
| return [], [], "", msg_id, gr.update(choices=sessions, value=msg_id), sessions |
|
|
|
|
| def _on_rm_error(func, path, exc_info): |
| try: |
| if os.name == "nt": |
| os.chmod(path, stat.S_IWRITE) |
| else: |
| mode = os.stat(path).st_mode |
| os.chmod( |
| path, |
| mode | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
| ) |
| func(path) |
| except Exception: |
| pass |
| |
| def delete_session(msg_id, sessions): |
| """Delete the currently selected session directory and refresh the list.""" |
| |
| if msg_id: |
| try: |
| shutil.rmtree(_as_dir(BASE_MSG_DIR, msg_id), onerror=_on_rm_error) |
| except Exception: |
| shutil.rmtree(_as_dir(BASE_MSG_DIR, msg_id), ignore_errors=True) |
| |
| if BASE_MSG_DIR.exists(): |
| sess = [p.name for p in BASE_MSG_DIR.iterdir() if p.is_dir()] |
| else: |
| sess = [] |
| sess.sort(reverse=True) |
| if sess: |
| new_id = sess[0] |
| msgs = _load_latest(new_id) |
| chat_hist = visible_chat(msgs) |
| return msgs, chat_hist, "", new_id, gr.update(choices=sess, value=new_id), sess |
| else: |
| return [], [], "", "", gr.update(choices=[], value=None), [] |
| |
| def export_messages_to_json(messages, msg_id): |
| base = Path("/data/exports") if Path("/data").exists() else Path("./exports") |
| base.mkdir(parents=True, exist_ok=True) |
| stamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") |
| fname = f"chat_{stamp}.json" |
| path = base / fname |
| path.write_text(json.dumps(messages or [], ensure_ascii=False, indent=2), encoding="utf-8") |
| return str(path) |
|
|
| def on_click_download(messages, msg_id): |
| path = export_messages_to_json(messages, msg_id) |
| return gr.update(value=path, visible=True) |
|
|
|
|
| |
| def on_send(user_text: str, |
| messages: List[Dict[str, str]], |
| msg_id: str, |
| sessions: List[str], |
| sys_prompt: str, |
| temperature: float, |
| top_p: float, |
| max_new_tokens: int, |
| repetition_penalty: float): |
| user_text = (user_text or "").strip() |
| if not user_text: |
| return gr.update(), messages, visible_chat(messages), msg_id, gr.update(choices=sessions, value=(msg_id or None)), sessions |
|
|
| |
| messages = ensure_system(messages, sys_prompt) |
|
|
| |
| new_session = (len(messages) <= 1) |
| if new_session and not msg_id: |
| msg_id = mk_msg_dir(BASE_MSG_DIR) |
| sessions = list(sessions or []) + [msg_id] |
| if msg_id and msg_id not in (sessions or []): |
| sessions = list(sessions or []) + [msg_id] |
| sessions_update = gr.update(choices=sessions, value=msg_id) |
|
|
| |
| messages = messages + [{"role": "user", "content": user_text}] |
| |
| prompt, max_new = render_qwen_trim( |
| messages=messages, |
| model=model, |
| n_ctx=None, |
| add_generation_prompt=True, |
| persona=persona, |
| reserve_new=max_new_tokens, |
| pad=16 |
| ) |
| |
|
|
| try: |
| result = model.create_completion( |
| prompt=prompt, |
| temperature=float(temperature), |
| top_p=float(top_p), |
| max_tokens=int(max_new), |
| repeat_penalty=float(repetition_penalty), |
| stop=STOP_TOKENS, |
| ) |
| reply = result['choices'][0]['text'].strip() |
| except Exception: |
| _out = model( |
| prompt, |
| temperature=float(temperature), |
| top_p=float(top_p), |
| max_tokens=int(max_new), |
| repeat_penalty=float(repetition_penalty), |
| stop=STOP_TOKENS, |
| ) |
| if isinstance(_out, dict): |
| reply = _out.get('choices', [{}])[0].get('text', '').strip() |
| else: |
| reply = str(_out).strip() |
|
|
| |
| messages = messages + [{"role": "assistant", "content": reply}] |
|
|
| if msg_id: |
| msg_dir = _as_dir(BASE_MSG_DIR, msg_id) |
| persist_messages(messages, msg_dir, archive_last_turn=True) |
|
|
| return "", messages, visible_chat(messages), msg_id, sessions_update, sessions |
|
|
|
|
| |
| with gr.Blocks(title="Qwen GGUF — multi-session") as demo: |
| gr.Markdown("## 🧠 Qwen Chat") |
|
|
| with gr.Row(): |
| with gr.Column(scale=3): |
| sys_prompt = gr.Textbox( |
| label="System prompt", |
| value=persona, |
| lines=6, |
| show_label=True, |
| ) |
| with gr.Accordion("Generation settings", open=False): |
| temperature = gr.Slider(0.0, 2.0, value=0.7, step=0.05, label="temperature") |
| top_p = gr.Slider(0.1, 1.0, value=0.95, step=0.01, label="top_p") |
| max_new_tokens = gr.Slider(16, 512, value=256, step=16, label="max_new_tokens") |
| repetition_penalty = gr.Slider(1.0, 2.0, value=1.07, step=0.01, label="repetition_penalty") |
|
|
| session_list = gr.Radio(choices=[], value=None, label="Conversations", interactive=True) |
| new_btn = gr.Button("New session", variant="secondary") |
| del_btn = gr.Button("Delete session", variant="stop") |
| dl_btn = gr.Button("Download JSON", variant="secondary") |
| dl_file = gr.File(label="", interactive=False, visible=False) |
|
|
| with gr.Column(scale=9): |
| chat = gr.Chatbot( |
| label="Chat", |
| height=560, |
| render_markdown=True, |
| type="messages", |
| ) |
| user_box = gr.Textbox( |
| label="Your message", |
| placeholder="Type and press Enter…", |
| autofocus=True, |
| ) |
| send = gr.Button("Send", variant="primary") |
|
|
| |
| messages = gr.State([]) |
| msg_id = gr.State("") |
| sessions = gr.State([]) |
|
|
| |
| user_box.submit( |
| on_send, |
| inputs=[user_box, messages, msg_id, sessions, sys_prompt, temperature, top_p, max_new_tokens, repetition_penalty], |
| outputs=[user_box, messages, chat, msg_id, session_list, sessions], |
| ) |
| send.click( |
| on_send, |
| inputs=[user_box, messages, msg_id, sessions, sys_prompt, temperature, top_p, max_new_tokens, repetition_penalty], |
| outputs=[user_box, messages, chat, msg_id, session_list, sessions], |
| ) |
|
|
| new_btn.click( |
| start_new_session, |
| inputs=[sessions], |
| outputs=[messages, chat, user_box, msg_id, session_list, sessions], |
| ) |
|
|
| del_btn.click( |
| delete_session, |
| inputs=[msg_id, sessions], |
| outputs=[messages, chat, user_box, msg_id, session_list, sessions], |
| ) |
|
|
| session_list.change( |
| load_session, |
| inputs=[session_list, sessions], |
| outputs=[msg_id, messages, chat, session_list], |
| ) |
|
|
| dl_btn.click( |
| on_click_download, |
| inputs=[messages, msg_id], |
| outputs=[dl_file], |
| ) |
|
|
| demo.load(_init_sessions, None, outputs=[session_list, sessions, msg_id, messages, chat]) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch() |
|
|