avatar / logic.py
mwtuni's picture
ui design
853637b
# logic.py
# Modernized backend logic for Avatar MCP
# Contains ALL non-UI logic: chat, creator tools, images, helpers.
# UI lives in app.py and calls these functions.
import json
import base64
import html
import re
import string
from pathlib import Path
from datetime import datetime
# FastAPI + Avatar backend imports
from mcp_tools import (
create_avatar,
delete_avatar_portrait,
delete_generated_images,
generate_as_avatar,
get_avatar,
record_generated_image,
set_avatar_portrait,
store_avatar_memory,
save_all_memories,
ensure_avatar_by_admin,
)
from avatar_store import avatar_generated_path
from generate_image_with_nano import build_prompt, run_edit
from persona_engine import extract_memories_from_text
DEFAULT_AVATAR_ID = "08a2fb96"
INITIAL_GREETING = "Hello! How are you doing?"
# =====================================================
# Helpers
# =====================================================
def resolve_path(path_str):
if not path_str:
return None
p = Path(path_str)
if not p.is_absolute():
p = Path(__file__).parent / p
return str(p)
def encode_image_data(path_str):
"""Convert image file → base64 data URL."""
try:
data = Path(path_str).read_bytes()
b64 = base64.b64encode(data).decode("utf-8")
ext = Path(path_str).suffix.lower()
mime = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}.get(ext, "image/png")
return f"data:{mime};base64,{b64}"
except Exception:
return ""
def history_pairs(history):
"""Convert internal dict history → MCP-friendly tuples."""
out = []
for entry in history or []:
s = entry.get("speaker")
if s not in ("user", "avatar"):
s = "avatar"
out.append((s, entry.get("text", "")))
return out
def decide_tool(message: str) -> str:
text = (message or "").lower()
if "remember" in text:
return "store_avatar_memory"
if "who are you" in text:
return "get_avatar"
return "generate_as_avatar"
def _ensure_avatar_id(avatar_id: str, admin_id: str) -> str:
if avatar_id:
return avatar_id
if not admin_id:
raise ValueError("avatar_id or admin_id required")
avatar = ensure_avatar_by_admin(admin_id)
if not avatar:
raise ValueError("avatar not found for admin id")
return avatar.get("id")
# =====================================================
# Chat Logic
# =====================================================
def send_message(avatar_id, admin_id, message, history, generate_image=True):
"""
Core chat pipeline:
- append user message
- choose tool
- execute tool
- append avatar reply
"""
history = list(history or [])
msg = (message or "").strip()
tool = ""
if not msg:
return history, "n/a"
# Append user message
history.append({"speaker": "user", "text": msg})
tool = decide_tool(msg)
try:
if tool == "store_avatar_memory":
if not admin_id:
reply = "Admin id required to store memory."
else:
result = store_avatar_memory({
"avatar_id": avatar_id,
"admin_id": admin_id,
"entry": msg,
"private": True
})
reply = f"Memory stored ({result['memory_entries']})."
elif tool == "get_avatar":
data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id})
reply = json.dumps(data, ensure_ascii=False)
else:
# Persona reply
data = generate_as_avatar({
"avatar_id": avatar_id,
"message": msg,
"history": history_pairs(history)
})
reply = data["response"]
# Check portrait availability before generating images
avatar = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id})
portrait = resolve_path(avatar.get("portrait"))
if not portrait:
generate_image = False
except ValueError as exc:
reply = str(exc)
# Add avatar reply
history.append({"speaker": "avatar", "text": reply})
return history, tool
def send_message_public(avatar_id, message, history, generate_image=True):
"""Public chat mode = no admin id."""
return send_message(avatar_id, "", message, history, generate_image)
def initial_greeting(avatar_id):
avatar_id = avatar_id or DEFAULT_AVATAR_ID
history, _ = send_message_public(avatar_id, INITIAL_GREETING, [], True)
return history
def maybe_generate_image(avatar_id, history, generate_image=True):
"""
After the avatar replies, decide whether to generate an image.
"""
history = list(history or [])
if not generate_image or not avatar_id or not history:
return history
# Find last avatar message + user message
last_user = None
last_avatar = None
for entry in reversed(history):
if entry.get("speaker") == "avatar" and last_avatar is None:
last_avatar = entry
elif entry.get("speaker") == "user" and last_user is None:
last_user = entry.get("text", "")
if last_avatar and last_user:
break
if not last_avatar:
return history
# Check portrait
try:
avatar = get_avatar({"avatar_id": avatar_id, "admin_id": ""})
except ValueError:
return history
portrait = resolve_path(avatar.get("portrait"))
if not portrait or not Path(portrait).exists():
return history
# Build prompt for the image generator
try:
prompt = build_prompt(
avatar.get("persona", "Avatar"),
f"{last_user or ''} Reply: {last_avatar.get('text', '')}"
)
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
out_path = avatar_generated_path(avatar_id, timestamp)
out_path.parent.mkdir(parents=True, exist_ok=True)
rc = run_edit(Path(portrait), prompt, out_path)
if rc == 0:
record_generated_image(avatar_id, out_path, prompt, timestamp)
last_avatar["image_path"] = str(out_path)
last_avatar["image_data"] = encode_image_data(out_path)
except BaseException:
pass
return history
def clear_chat_session():
"""Reset chat state."""
return [], "Tool used: n/a", True
# =====================================================
# Creator Logic
# =====================================================
def ui_create_avatar(description: str):
try:
data = create_avatar({"description": description})
avatar_id = data.get("id")
admin_id = data.get("admin_id")
json_blob, rows, portrait, _, _, _ = ui_load_avatar(avatar_id, admin_id)
return f"Avatar created: {avatar_id}", json_blob, rows, portrait, avatar_id, admin_id
except ValueError as exc:
return str(exc), "", [], None, "", ""
def ui_load_avatar(avatar_id: str, admin_id: str):
"""Load avatar + memory + portrait."""
try:
if avatar_id:
data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id})
else:
data = ensure_avatar_by_admin(admin_id)
avatar_id = data.get("id")
admin_id = data.get("admin_id", admin_id)
data = dict(data)
data.pop("generated_images", None)
mem = data.get("memory", [])
rows = [[m.get("entry", ""), m.get("private", False)] for m in mem]
portrait = resolve_path(data.get("portrait"))
return (
json.dumps({k: v for k, v in data.items() if k != "memory"},
ensure_ascii=False, indent=2),
rows,
portrait,
"Avatar loaded.",
avatar_id,
admin_id,
)
except ValueError as exc:
return str(exc), [], None, str(exc), "", ""
def _current_memory_rows(avatar_id: str, admin_id: str):
try:
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
data = get_avatar({"avatar_id": avatar_id, "admin_id": admin_id})
except ValueError:
return []
mem = data.get("memory", [])
return [[m.get("entry", ""), m.get("private", False)] for m in mem]
def _clean_memory_entry(text: str) -> str:
raw = (text or "").strip()
raw = re.sub(r"^```[\w-]*", "", raw, flags=re.IGNORECASE).strip()
raw = raw.replace("```", " ")
raw = raw.replace('"', "")
raw = " ".join(raw.split())
allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .,!?'-")
clean = "".join(ch for ch in raw if ch in allowed).strip()
return clean
def _ingest_memories(avatar_id: str, admin_id: str, text: str, private: bool):
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
entries = extract_memories_from_text(text, max_items=200) or [text]
stored = 0
for entry in entries:
clean = _clean_memory_entry(entry)
if not clean:
continue
store_avatar_memory({
"avatar_id": avatar_id,
"admin_id": admin_id,
"entry": clean,
"private": private
})
stored += 1
rows = _current_memory_rows(avatar_id, admin_id)
if stored == 0:
return "No memories extracted.", rows
if stored == 1:
return "Stored 1 memory.", rows
return f"Stored {stored} memories.", rows
def ui_add_context_text(avatar_id: str, admin_id: str, text: str, private: bool):
text = (text or "").strip()
if not text:
return "Enter context text.", _current_memory_rows(avatar_id, admin_id)
try:
return _ingest_memories(avatar_id, admin_id, text, private)
except ValueError as exc:
return str(exc), _current_memory_rows(avatar_id, admin_id)
def ui_add_context_file(avatar_id: str, admin_id: str, file_obj, private: bool):
if not file_obj:
return "Upload a file.", _current_memory_rows(avatar_id, admin_id)
try:
with open(file_obj.name, "r", encoding="utf-8") as f:
data = f.read()
return ui_add_context_text(avatar_id, admin_id, data, private)
except Exception as exc:
return f"Unable to read file: {exc}", _current_memory_rows(avatar_id, admin_id)
def ui_upload_portrait(avatar_id: str, admin_id: str, file_path):
portrait_path = file_path.name if hasattr(file_path, "name") else file_path
if not portrait_path:
return "Upload a portrait image.", None
try:
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
result = set_avatar_portrait({
"avatar_id": avatar_id,
"admin_id": admin_id,
"portrait_file": portrait_path
})
return "Portrait updated.", resolve_path(result.get("portrait"))
except ValueError as exc:
return str(exc), None
def ui_remove_portrait(avatar_id: str, admin_id: str):
try:
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
delete_avatar_portrait({"avatar_id": avatar_id, "admin_id": admin_id})
return "Portrait removed.", None
except ValueError as exc:
return str(exc), None
def ui_clear_generated(avatar_id: str, admin_id: str):
try:
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
result = delete_generated_images({"avatar_id": avatar_id, "admin_id": admin_id})
return f"Removed {result.get('removed', 0)} generated images."
except ValueError as exc:
return str(exc)
def ui_chat_portrait(avatar_id: str):
if not avatar_id:
return None
try:
data = get_avatar({"avatar_id": avatar_id, "admin_id": ""})
return resolve_path(data.get("portrait"))
except ValueError:
return None
def ui_select_memory(evt=None, current=None):
"""Toggle row selection in memory table."""
current = list(current or [])
if not evt or getattr(evt, "index", None) is None:
return current
idx = evt.index
if isinstance(idx, (list, tuple)):
idx = idx[0]
if idx is None:
return current
row = int(idx)
if row in current:
current.remove(row)
else:
current.append(row)
return current
def ui_update_memory(avatar_id, admin_id, updated_table):
"""
updated_table is a list of rows like:
[
["text of memory", True],
["another memory", False],
]
"""
try:
avatar_id = _ensure_avatar_id(avatar_id, admin_id)
if hasattr(updated_table, "to_numpy"):
updated_table = updated_table.to_numpy().tolist()
result = save_all_memories(avatar_id, admin_id, updated_table)
return "Memories updated successfully!"
except ValueError as exc:
return str(exc)