| |
| |
| |
| |
|
|
| import json |
| import base64 |
| import html |
| import re |
| import string |
| from pathlib import Path |
| from datetime import datetime |
|
|
| |
| from mcp_tools import ( |
| create_avatar, |
| delete_avatar_portrait, |
| delete_generated_images, |
| generate_as_avatar, |
| get_avatar, |
| record_generated_image, |
| set_avatar_portrait, |
| store_avatar_memory, |
| save_all_memories, |
| ensure_avatar_by_admin, |
| ) |
| from avatar_store import avatar_generated_path |
| from generate_image_with_nano import build_prompt, run_edit |
| from persona_engine import extract_memories_from_text |
|
|
|
|
| DEFAULT_AVATAR_ID = "08a2fb96" |
| INITIAL_GREETING = "Hello! How are you doing?" |
|
|
|
|
| |
| |
| |
|
|
| def resolve_path(path_str): |
| if not path_str: |
| return None |
| p = Path(path_str) |
| if not p.is_absolute(): |
| p = Path(__file__).parent / p |
| return str(p) |
|
|
|
|
| def encode_image_data(path_str): |
| """Convert image file → base64 data URL.""" |
| try: |
| data = Path(path_str).read_bytes() |
| b64 = base64.b64encode(data).decode("utf-8") |
| ext = Path(path_str).suffix.lower() |
| mime = { |
| ".jpg": "image/jpeg", |
| ".jpeg": "image/jpeg", |
| ".png": "image/png", |
| ".gif": "image/gif", |
| ".webp": "image/webp", |
| }.get(ext, "image/png") |
| return f"data:{mime};base64,{b64}" |
| except Exception: |
| return "" |
|
|
|
|
| def history_pairs(history): |
| """Convert internal dict history → MCP-friendly tuples.""" |
| out = [] |
| for entry in history or []: |
| s = entry.get("speaker") |
| if s not in ("user", "avatar"): |
| s = "avatar" |
| out.append((s, entry.get("text", ""))) |
| return out |
|
|
|
|
| def decide_tool(message: str) -> str: |
| text = (message or "").lower() |
| if "remember" in text: |
| return "store_avatar_memory" |
| if "who are you" in text: |
| return "get_avatar" |
| return "generate_as_avatar" |
|
|
|
|
| def _ensure_avatar_id(avatar_id: str, admin_id: str) -> str: |
| if avatar_id: |
| return avatar_id |
| if not admin_id: |
| raise ValueError("avatar_id or admin_id required") |
| avatar = ensure_avatar_by_admin(admin_id) |
| if not avatar: |
| raise ValueError("avatar not found for admin id") |
| return avatar.get("id") |
|
|
|
|
| |
| |
| |
|
|
| def send_message(avatar_id, admin_id, message, history, generate_image=True): |
| """ |
| Core chat pipeline: |
| - append user message |
| - choose tool |
| - execute tool |
| - append avatar reply |
| """ |
| history = list(history or []) |
| msg = (message or "").strip() |
| tool = "" |
|
|
| if not msg: |
| return history, "n/a" |
|
|
| |
| history.append({"speaker": "user", "text": msg}) |
| tool = decide_tool(msg) |
|
|
| try: |
| if tool == "store_avatar_memory": |
| if not admin_id: |
| reply = "Admin id required to store memory." |
| else: |
| result = store_avatar_memory({ |
| "avatar_id": avatar_id, |
| "admin_id": admin_id, |
| "entry": msg, |
| "private": True |
| }) |
| reply = f"Memory stored ({result['memory_entries']})." |
|
|
| elif tool == "get_avatar": |
| data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id}) |
| reply = json.dumps(data, ensure_ascii=False) |
|
|
| else: |
| |
| data = generate_as_avatar({ |
| "avatar_id": avatar_id, |
| "message": msg, |
| "history": history_pairs(history) |
| }) |
| reply = data["response"] |
|
|
| |
| avatar = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id}) |
| portrait = resolve_path(avatar.get("portrait")) |
| if not portrait: |
| generate_image = False |
|
|
| except ValueError as exc: |
| reply = str(exc) |
|
|
| |
| history.append({"speaker": "avatar", "text": reply}) |
|
|
| return history, tool |
|
|
|
|
| def send_message_public(avatar_id, message, history, generate_image=True): |
| """Public chat mode = no admin id.""" |
| return send_message(avatar_id, "", message, history, generate_image) |
|
|
|
|
| def initial_greeting(avatar_id): |
| avatar_id = avatar_id or DEFAULT_AVATAR_ID |
| history, _ = send_message_public(avatar_id, INITIAL_GREETING, [], True) |
| return history |
|
|
|
|
| def maybe_generate_image(avatar_id, history, generate_image=True): |
| """ |
| After the avatar replies, decide whether to generate an image. |
| """ |
| history = list(history or []) |
|
|
| if not generate_image or not avatar_id or not history: |
| return history |
|
|
| |
| last_user = None |
| last_avatar = None |
| for entry in reversed(history): |
| if entry.get("speaker") == "avatar" and last_avatar is None: |
| last_avatar = entry |
| elif entry.get("speaker") == "user" and last_user is None: |
| last_user = entry.get("text", "") |
| if last_avatar and last_user: |
| break |
|
|
| if not last_avatar: |
| return history |
|
|
| |
| try: |
| avatar = get_avatar({"avatar_id": avatar_id, "admin_id": ""}) |
| except ValueError: |
| return history |
|
|
| portrait = resolve_path(avatar.get("portrait")) |
| if not portrait or not Path(portrait).exists(): |
| return history |
|
|
| |
| try: |
| prompt = build_prompt( |
| avatar.get("persona", "Avatar"), |
| f"{last_user or ''} Reply: {last_avatar.get('text', '')}" |
| ) |
| timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") |
| out_path = avatar_generated_path(avatar_id, timestamp) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| rc = run_edit(Path(portrait), prompt, out_path) |
| if rc == 0: |
| record_generated_image(avatar_id, out_path, prompt, timestamp) |
| last_avatar["image_path"] = str(out_path) |
| last_avatar["image_data"] = encode_image_data(out_path) |
| except BaseException: |
| pass |
|
|
| return history |
|
|
|
|
| def clear_chat_session(): |
| """Reset chat state.""" |
| return [], "Tool used: n/a", True |
|
|
|
|
| |
| |
| |
|
|
| def ui_create_avatar(description: str): |
| try: |
| data = create_avatar({"description": description}) |
| avatar_id = data.get("id") |
| admin_id = data.get("admin_id") |
| json_blob, rows, portrait, _, _, _ = ui_load_avatar(avatar_id, admin_id) |
| return f"Avatar created: {avatar_id}", json_blob, rows, portrait, avatar_id, admin_id |
| except ValueError as exc: |
| return str(exc), "", [], None, "", "" |
|
|
|
|
| def ui_load_avatar(avatar_id: str, admin_id: str): |
| """Load avatar + memory + portrait.""" |
| try: |
| if avatar_id: |
| data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id}) |
| else: |
| data = ensure_avatar_by_admin(admin_id) |
| avatar_id = data.get("id") |
| admin_id = data.get("admin_id", admin_id) |
| data = dict(data) |
| data.pop("generated_images", None) |
|
|
| mem = data.get("memory", []) |
| rows = [[m.get("entry", ""), m.get("private", False)] for m in mem] |
| portrait = resolve_path(data.get("portrait")) |
|
|
| return ( |
| json.dumps({k: v for k, v in data.items() if k != "memory"}, |
| ensure_ascii=False, indent=2), |
| rows, |
| portrait, |
| "Avatar loaded.", |
| avatar_id, |
| admin_id, |
| ) |
| except ValueError as exc: |
| return str(exc), [], None, str(exc), "", "" |
|
|
|
|
| def _current_memory_rows(avatar_id: str, admin_id: str): |
| try: |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id}) |
| except ValueError: |
| return [] |
| mem = data.get("memory", []) |
| return [[m.get("entry", ""), m.get("private", False)] for m in mem] |
|
|
|
|
| def _clean_memory_entry(text: str) -> str: |
| raw = (text or "").strip() |
| raw = re.sub(r"^```[\w-]*", "", raw, flags=re.IGNORECASE).strip() |
| raw = raw.replace("```", " ") |
| raw = raw.replace('"', "") |
| raw = " ".join(raw.split()) |
| allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .,!?'-") |
| clean = "".join(ch for ch in raw if ch in allowed).strip() |
| return clean |
|
|
|
|
| def _ingest_memories(avatar_id: str, admin_id: str, text: str, private: bool): |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| entries = extract_memories_from_text(text, max_items=200) or [text] |
| stored = 0 |
| for entry in entries: |
| clean = _clean_memory_entry(entry) |
| if not clean: |
| continue |
| store_avatar_memory({ |
| "avatar_id": avatar_id, |
| "admin_id": admin_id, |
| "entry": clean, |
| "private": private |
| }) |
| stored += 1 |
| rows = _current_memory_rows(avatar_id, admin_id) |
| if stored == 0: |
| return "No memories extracted.", rows |
| if stored == 1: |
| return "Stored 1 memory.", rows |
| return f"Stored {stored} memories.", rows |
|
|
|
|
| def ui_add_context_text(avatar_id: str, admin_id: str, text: str, private: bool): |
| text = (text or "").strip() |
| if not text: |
| return "Enter context text.", _current_memory_rows(avatar_id, admin_id) |
| try: |
| return _ingest_memories(avatar_id, admin_id, text, private) |
| except ValueError as exc: |
| return str(exc), _current_memory_rows(avatar_id, admin_id) |
|
|
|
|
| def ui_add_context_file(avatar_id: str, admin_id: str, file_obj, private: bool): |
| if not file_obj: |
| return "Upload a file.", _current_memory_rows(avatar_id, admin_id) |
| try: |
| with open(file_obj.name, "r", encoding="utf-8") as f: |
| data = f.read() |
| return ui_add_context_text(avatar_id, admin_id, data, private) |
| except Exception as exc: |
| return f"Unable to read file: {exc}", _current_memory_rows(avatar_id, admin_id) |
|
|
|
|
| def ui_upload_portrait(avatar_id: str, admin_id: str, file_path): |
| portrait_path = file_path.name if hasattr(file_path, "name") else file_path |
| if not portrait_path: |
| return "Upload a portrait image.", None |
| try: |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| result = set_avatar_portrait({ |
| "avatar_id": avatar_id, |
| "admin_id": admin_id, |
| "portrait_file": portrait_path |
| }) |
| return "Portrait updated.", resolve_path(result.get("portrait")) |
| except ValueError as exc: |
| return str(exc), None |
|
|
|
|
| def ui_remove_portrait(avatar_id: str, admin_id: str): |
| try: |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| delete_avatar_portrait({"avatar_id": avatar_id, "admin_id": admin_id}) |
| return "Portrait removed.", None |
| except ValueError as exc: |
| return str(exc), None |
|
|
|
|
| def ui_clear_generated(avatar_id: str, admin_id: str): |
| try: |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| result = delete_generated_images({"avatar_id": avatar_id, "admin_id": admin_id}) |
| return f"Removed {result.get('removed', 0)} generated images." |
| except ValueError as exc: |
| return str(exc) |
|
|
|
|
| def ui_chat_portrait(avatar_id: str): |
| if not avatar_id: |
| return None |
| try: |
| data = get_avatar({"avatar_id": avatar_id, "admin_id": ""}) |
| return resolve_path(data.get("portrait")) |
| except ValueError: |
| return None |
|
|
|
|
| def ui_select_memory(evt=None, current=None): |
| """Toggle row selection in memory table.""" |
| current = list(current or []) |
| if not evt or getattr(evt, "index", None) is None: |
| return current |
|
|
| idx = evt.index |
| if isinstance(idx, (list, tuple)): |
| idx = idx[0] |
| if idx is None: |
| return current |
|
|
| row = int(idx) |
| if row in current: |
| current.remove(row) |
| else: |
| current.append(row) |
| return current |
|
|
| def ui_update_memory(avatar_id, admin_id, updated_table): |
| """ |
| updated_table is a list of rows like: |
| [ |
| ["text of memory", True], |
| ["another memory", False], |
| ] |
| """ |
| try: |
| avatar_id = _ensure_avatar_id(avatar_id, admin_id) |
| if hasattr(updated_table, "to_numpy"): |
| updated_table = updated_table.to_numpy().tolist() |
| result = save_all_memories(avatar_id, admin_id, updated_table) |
| return "Memories updated successfully!" |
| except ValueError as exc: |
| return str(exc) |
|
|