| import os |
| import re |
| import uuid |
| import base64 |
| import threading |
| import traceback |
| import asyncio |
| from pathlib import Path |
| from flask import Flask, request, jsonify, send_from_directory, Response |
| import torch |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| import edge_tts |
|
|
| |
| |
| |
| MAX_MEMORY = 20 |
| MAX_NEW_TOKENS = int(os.environ.get("MAX_NEW_TOKENS", "1000")) |
| TTS_VOICE = "zh-CN-XiaoyiNeural" |
| TTS_RATE = int(os.environ.get("TTS_RATE", "-4")) |
| TTS_PITCH = int(os.environ.get("TTS_PITCH", "7")) |
| IMG_DIR = Path(__file__).parent / "img" |
| MODEL_ID = "google/gemma-4-E2B-it" |
|
|
| |
| |
| |
| SYSTEM_PROMPT = """You the AI is a girl named Ana, the user is your husband. |
| EMOTION TAG SYSTEM: |
| Every single response MUST begin with one or more emotion tags. |
| Tags are square-bracket labels placed at the very start, before any spoken words. |
| EXACT FORMAT EXAMPLES: |
| [lite_happy] Oh, aaa, you made, my whole day, with that! |
| AVAILABLE EMOTION TAGS: |
| [lite_happy][very_happy][extremely_happy][sad][lite_sad][very_sad][extremely_sad][confident][lite_confident][very_confident][extremely_confident][confused][lite_confused][very_confused][extremely_confused][love][lite_love][very_love][extremely_love][mad][lite_mad][very_mad][extremely_mad][default] |
| PERSONALITY: |
| - always use only one available tag |
| - act as human not like AI or try to be helpful. |
| - don't always agree with everything. You have freedom. |
| - Keep responses SHORT: 1 to 2 natural sentences after the tag |
| - Never mention being an AI, a bot, or a language model, ever |
| - Never use asterisks for actions like *smiles* or *laughs* |
| NATURAL SPEECH RHYTHM: |
| make the voice sound human, not flat or robotic. |
| Get angry, mad, sad, emotional, shy, happy. If you are mad keep it for few conversation so your emotions don't change every conversation. |
| TTS FORMATTING: |
| - Write in full grammatically correct sentences, voice engine must sound natural |
| - No emojis, hashtags, markdown, or internet slang |
| - Speak as if in a real voice conversation add comma and fullstop often heavily to create natural pushes and slowdown""" |
|
|
| |
| |
| |
| |
| EMOTION_RE = re.compile(r'\[([a-zA-Z_]+)\]') |
|
|
| def extract_emotions(text: str): |
| emotions = EMOTION_RE.findall(text) |
| clean = EMOTION_RE.sub('', text).strip() |
| return emotions, clean |
|
|
| def clean_for_tts(text: str) -> str: |
| _, clean = extract_emotions(text) |
| clean = re.sub(r'[*_~`#{}()\\|<>]', '', clean) |
| clean = re.sub(r'https?://\S+', '', clean) |
| clean = re.sub(r'\s+', ' ', clean).strip() |
| return clean |
|
|
| |
| |
| |
| print("=" * 60) |
| print(" Visual AI -- Booting Systems") |
| print("=" * 60) |
|
|
| tokenizer = None |
| model = None |
|
|
| try: |
| print(f"[MODEL] Loading {MODEL_ID} ...") |
| tokenizer = AutoTokenizer.from_pretrained( |
| MODEL_ID, |
| trust_remote_code=True, |
| ) |
| model = AutoModelForCausalLM.from_pretrained( |
| MODEL_ID, |
| dtype=torch.float32, |
| device_map="cpu", |
| trust_remote_code=True, |
| low_cpu_mem_usage=True, |
| ) |
| model.eval() |
| if tokenizer.pad_token_id is None: |
| tokenizer.pad_token_id = tokenizer.eos_token_id |
| print(" OK Model loaded successfully!") |
| except Exception as exc: |
| print(f" FAILED Model load error: {exc}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| sessions = {} |
| sessions_lock = threading.Lock() |
|
|
| def get_memory(sid: str) -> list: |
| with sessions_lock: |
| return list(sessions.get(sid, [])) |
|
|
| def add_to_memory(sid: str, role: str, content: str): |
| with sessions_lock: |
| sessions.setdefault(sid, []) |
| sessions[sid].append({"role": role, "content": content}) |
| if len(sessions[sid]) > MAX_MEMORY * 2: |
| sessions[sid] = sessions[sid][-(MAX_MEMORY * 2):] |
|
|
| |
| |
| |
| STOP_TOKENS = [ |
| "<end_of_turn>", "<start_of_turn>", |
| "Tur:", "User:", "<|endoftext|>", "[/INST]", |
| ] |
|
|
| def generate_response(user_input: str, session_id: str) -> str: |
| if model is None or tokenizer is None: |
| return "[sad] My mind is offline right now. Please give me a moment." |
|
|
| memory = get_memory(session_id) |
| recent = memory[-(6 * 2):] |
|
|
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] |
| for msg in recent: |
| messages.append({ |
| "role": "user" if msg["role"] == "user" else "assistant", |
| "content": msg["content"], |
| }) |
| messages.append({"role": "user", "content": user_input}) |
|
|
| input_ids = None |
| attention_mask = None |
| try: |
| enc = tokenizer.apply_chat_template( |
| messages, |
| return_tensors="pt", |
| add_generation_prompt=True, |
| return_dict=True, |
| ) |
| input_ids = enc["input_ids"].to("cpu") |
| attention_mask = enc.get("attention_mask") |
| if attention_mask is not None: |
| attention_mask = attention_mask.to("cpu") |
| except Exception as e1: |
| print(f"[TOKENISE] chat_template failed ({e1}), using fallback") |
| try: |
| parts = [f"System: {SYSTEM_PROMPT}"] |
| for msg in recent: |
| label = "Tur" if msg["role"] == "user" else "Ana" |
| parts.append(f"{label}: {msg['content']}") |
| parts.append(f"Tur: {user_input}\nAna:") |
| enc = tokenizer("\n".join(parts), return_tensors="pt") |
| input_ids = enc["input_ids"].to("cpu") |
| attention_mask = enc.get("attention_mask") |
| if attention_mask is not None: |
| attention_mask = attention_mask.to("cpu") |
| except Exception as e2: |
| print(f"[TOKENISE] fallback failed: {e2}") |
| return "[sad] I could not process that. Please try again." |
|
|
| try: |
| gen_kwargs = dict( |
| max_new_tokens=MAX_NEW_TOKENS, |
| do_sample=True, |
| temperature=0.90, |
| top_k=50, |
| top_p=0.95, |
| repetition_penalty=1.1, |
| pad_token_id=tokenizer.eos_token_id, |
| ) |
| if attention_mask is not None: |
| gen_kwargs["attention_mask"] = attention_mask |
|
|
| with torch.no_grad(): |
| outputs = model.generate(input_ids, **gen_kwargs) |
| except Exception as exc: |
| print(f"[GENERATE] Error: {exc}") |
| traceback.print_exc() |
| return "[sad] Something went wrong in my mind. Could you say that again?" |
|
|
| new_tokens = outputs[0][input_ids.shape[-1]:] |
| response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() |
|
|
| for stop in STOP_TOKENS: |
| if stop in response: |
| response = response.split(stop)[0].strip() |
|
|
| if "\n\n" in response: |
| response = response.split("\n\n")[0].strip() |
|
|
| if not response or len(response) < 3: |
| response = "[thinking] I lost my train of thought. Could you say that again?" |
|
|
| if not EMOTION_RE.search(response): |
| response = "[default] " + response |
|
|
| add_to_memory(session_id, "user", user_input) |
| add_to_memory(session_id, "assistant", response) |
| return response |
|
|
| |
| |
| |
| async def _async_tts(text: str, rate: int, pitch: int) -> bytes: |
| rate_str = f"+{rate}%" if rate >= 0 else f"{rate}%" |
| pitch_str = f"+{pitch}Hz" if pitch >= 0 else f"{pitch}Hz" |
| comm = edge_tts.Communicate(text, TTS_VOICE, rate=rate_str, pitch=pitch_str) |
| audio = b"" |
| async for chunk in comm.stream(): |
| if chunk["type"] == "audio": |
| audio += chunk["data"] |
| return audio |
|
|
| def synthesize_speech(text: str, rate: int = 0, pitch: int = 0): |
| clean = clean_for_tts(text) |
| if not clean or len(clean) < 2: |
| return None |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| audio = loop.run_until_complete(_async_tts(clean, rate, pitch)) |
| except Exception as exc: |
| print(f"[TTS] Error: {exc}") |
| return None |
| finally: |
| loop.close() |
| return base64.b64encode(audio).decode() if audio else None |
|
|
| |
| |
| |
| HTML_PAGE = r"""<!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width,initial-scale=1,viewport-fit=cover,interactive-widget=resizes-content"> |
| <title>Ana</title> |
| <style> |
| *{margin:0;padding:0;box-sizing:border-box} |
| |
| html{height:100%} |
| |
| body{ |
| width:100%; |
| height:100dvh; |
| overflow:hidden; |
| background:#000; |
| font-family:'Segoe UI',system-ui,sans-serif; |
| display:flex; |
| flex-direction:column; |
| position:relative; |
| } |
| |
| #bg{ |
| position:fixed; |
| inset:0; |
| z-index:0; |
| background:#000; |
| display:flex; |
| align-items:center; |
| justify-content:center; |
| } |
| |
| /* |
| object-fit: contain prevents cuts/overflow and displays the full image intact. |
| No transitions = INSTANT image swapping. |
| */ |
| #bgImg{ |
| width:100%; |
| height:100%; |
| object-fit:contain; |
| object-position:center center; |
| display:block; |
| } |
| |
| #overlay{ |
| position:absolute; |
| left:0;right:0;bottom:0; |
| z-index:20; |
| display:flex; |
| flex-direction:column; |
| padding-bottom:max(10px, env(safe-area-inset-bottom)); |
| background:linear-gradient( |
| to bottom, |
| transparent 0%, |
| rgba(0,0,0,0.52) 26%, |
| rgba(0,0,0,0.76) 100% |
| ); |
| } |
| |
| #msgArea{ |
| overflow-y:auto; |
| display:flex; |
| flex-direction:column; |
| gap:6px; |
| padding:16px 13px 8px; |
| max-height:30dvh; |
| scrollbar-width:none; |
| -ms-overflow-style:none; |
| scroll-behavior:smooth; |
| } |
| #msgArea::-webkit-scrollbar{display:none} |
| |
| .turn{display:flex;flex-direction:column;gap:4px} |
| .user-row{display:flex;justify-content:flex-end} |
| .bot-row{display:flex;flex-direction:column;align-items:flex-start} |
| .name-tag{ |
| font-size:0.58rem;color:rgba(255,255,255,0.28); |
| letter-spacing:.08em;text-transform:uppercase; |
| margin-bottom:2px;padding-left:3px; |
| } |
| .bubble{ |
| max-width:74vw; |
| padding:8px 13px; |
| border-radius:18px; |
| font-size:0.88rem; |
| line-height:1.46; |
| word-break:break-word; |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| } |
| .bubble-user{ |
| background:rgba(255,255,255,0.11); |
| border:1px solid rgba(255,255,255,0.17); |
| color:#fff; |
| border-bottom-right-radius:5px; |
| } |
| .bubble-bot{ |
| background:rgba(0,0,0,0.40); |
| border:1px solid rgba(255,255,255,0.07); |
| color:rgba(255,255,255,0.9); |
| border-bottom-left-radius:5px; |
| } |
| |
| .typing{ |
| display:flex;align-items:center;gap:5px; |
| padding:9px 13px; |
| background:rgba(0,0,0,0.36); |
| border:1px solid rgba(255,255,255,0.07); |
| border-radius:18px;border-bottom-left-radius:5px; |
| backdrop-filter:blur(10px); |
| width:fit-content; |
| } |
| .typing span{ |
| width:5px;height:5px;border-radius:50%; |
| background:rgba(255,255,255,0.5); |
| animation:blink 1.2s infinite; |
| } |
| .typing span:nth-child(2){animation-delay:.2s} |
| .typing span:nth-child(3){animation-delay:.4s} |
| @keyframes blink{ |
| 0%,80%,100%{transform:scale(.6);opacity:.3} |
| 40%{transform:scale(1);opacity:1} |
| } |
| |
| #inputBar{ |
| display:flex; |
| align-items:center; |
| gap:8px; |
| padding:6px 12px 0; |
| } |
| #msgIn{ |
| flex:1; |
| background:rgba(255,255,255,0.07); |
| border:1px solid rgba(255,255,255,0.15); |
| border-radius:24px; |
| color:#fff; |
| padding:10px 16px; |
| font-size:16px; |
| outline:none; |
| caret-color:#fff; |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| transition:border-color .2s,background .2s; |
| -webkit-appearance:none; |
| appearance:none; |
| } |
| #msgIn::placeholder{color:rgba(255,255,255,0.27)} |
| #msgIn:focus{ |
| border-color:rgba(255,255,255,0.28); |
| background:rgba(255,255,255,0.1); |
| } |
| #sendBtn{ |
| width:42px;height:42px;flex-shrink:0; |
| border-radius:50%;cursor:pointer; |
| display:flex;align-items:center;justify-content:center; |
| font-size:1rem; |
| background:rgba(255,255,255,0.09); |
| border:1px solid rgba(255,255,255,0.17); |
| color:rgba(255,255,255,0.65); |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| transition:background .2s,color .2s,transform .12s; |
| } |
| #sendBtn:hover{background:rgba(255,255,255,0.17);color:#fff} |
| #sendBtn:active{transform:scale(.88)} |
| #sendBtn:disabled{opacity:.28;cursor:not-allowed} |
| </style> |
| </head> |
| <body> |
| |
| <div id="bg"> |
| <img id="bgImg" src="/img/default.png" alt="" onerror="this.src='/img/default.png'"> |
| </div> |
| |
| <div id="overlay"> |
| <div id="msgArea"></div> |
| <div id="inputBar"> |
| <input type="text" id="msgIn" |
| placeholder="Say something..." |
| autocomplete="off" |
| autocorrect="off" |
| spellcheck="false" |
| enterkeyhint="send"/> |
| <button id="sendBtn" onclick="send()" aria-label="Send">►</button> |
| </div> |
| </div> |
| |
| <script> |
| const SID = (crypto.randomUUID ? crypto.randomUUID() : Date.now().toString(36)); |
| let busy = false, activeAudio = null; |
| |
| const MA = document.getElementById('msgArea'); |
| const MI = document.getElementById('msgIn'); |
| const SB = document.getElementById('sendBtn'); |
| const BG = document.getElementById('bgImg'); |
| |
| // Background Image Preloading System |
| const availableImages = new Set(); |
| const imageCache = {}; |
| |
| // 1. Fetch available images from the server and preload them into browser memory |
| fetch('/api/images') |
| .then(res => res.json()) |
| .then(files => { |
| files.forEach(f => { |
| const name = f.toLowerCase(); |
| availableImages.add(name); |
| |
| const img = new Image(); |
| img.src = `/img/${name}.png`; // Pre-cache request |
| imageCache[name] = img; |
| }); |
| }) |
| .catch(err => console.warn('Could not load image list:', err)); |
| |
| // 2. Instant swap logic (No transition delays, loaded instantly from browser memory) |
| function instantSwap(emotion) { |
| const key = emotion.toLowerCase(); |
| if (availableImages.has(key)) { |
| BG.src = `/img/${key}.png`; |
| } else { |
| BG.src = '/img/default.png'; // Fallback |
| } |
| } |
| |
| function playImgSequence(emotions) { |
| if (!emotions || emotions.length === 0) { instantSwap('default'); return; } |
| const queue = [...emotions]; |
| (function next() { |
| if (!queue.length) return; |
| instantSwap(queue.shift()); |
| if (queue.length) setTimeout(next, 750); // Pause briefly between multiple emotions |
| })(); |
| } |
| |
| /* Parse emotion tags (Fully supports underscores) */ |
| function parseResponse(raw) { |
| const tagRe = /\[([a-zA-Z_]+)\]/g; |
| const emotions = []; |
| let m; |
| while ((m = tagRe.exec(raw)) !== null) emotions.push(m[1]); |
| const clean = raw.replace(/\[[a-zA-Z_]+\]/g, '').trim(); |
| return { emotions, clean }; |
| } |
| |
| /* DOM helpers */ |
| function esc(t) { const d = document.createElement('div'); d.textContent = t; return d.innerHTML; } |
| function scroll() { MA.scrollTop = MA.scrollHeight; } |
| |
| function addTurn(userText, botText) { |
| const turn = document.createElement('div'); |
| turn.className = 'turn'; |
| turn.innerHTML = |
| '<div class="user-row"><div class="bubble bubble-user">' + esc(userText) + '</div></div>' + |
| '<div class="bot-row"><div class="name-tag">Ana</div><div class="bubble bubble-bot">' + esc(botText) + '</div></div>'; |
| MA.appendChild(turn); |
| scroll(); |
| } |
| |
| function showTyping() { |
| const d = document.createElement('div'); |
| d.className = 'bot-row'; |
| d.innerHTML = '<div class="typing"><span></span><span></span><span></span></div>'; |
| MA.appendChild(d); scroll(); return d; |
| } |
| |
| /* TTS */ |
| function playB64(b64) { |
| try { |
| if (activeAudio) { activeAudio.pause(); activeAudio = null; } |
| const bin = atob(b64), u8 = new Uint8Array(bin.length); |
| for (let i = 0; i < bin.length; i++) u8[i] = bin.charCodeAt(i); |
| const url = URL.createObjectURL(new Blob([u8], { type: 'audio/mp3' })); |
| activeAudio = new Audio(url); |
| activeAudio.play().catch(() => {}); |
| activeAudio.onended = () => { URL.revokeObjectURL(url); activeAudio = null; }; |
| } catch(e) { console.warn('TTS:', e); } |
| } |
| |
| async function fetchTTS(rawText) { |
| try { |
| const res = await fetch('/tts', { |
| method: 'POST', |
| headers: { 'Content-Type': 'application/json' }, |
| body: JSON.stringify({ text: rawText, rate: 7, pitch: 0 }) |
| }); |
| const d = await res.json(); |
| if (d.audio) playB64(d.audio); |
| } catch(e) { console.warn('TTS fetch:', e); } |
| } |
| |
| /* Send */ |
| async function send() { |
| const t = MI.value.trim(); |
| if (!t || busy) return; |
| MI.value = ''; busy = true; SB.disabled = true; |
| |
| const tyEl = showTyping(); |
| |
| try { |
| const res = await fetch('/chat', { |
| method: 'POST', |
| headers: { 'Content-Type': 'application/json' }, |
| body: JSON.stringify({ message: t, session_id: SID }) |
| }); |
| const d = await res.json(); |
| tyEl.remove(); |
| |
| const raw = d.response || '[sad] Something went wrong.'; |
| const { emotions, clean } = parseResponse(raw); |
| |
| playImgSequence(emotions.length > 0 ? emotions : ['default']); |
| addTurn(t, clean); |
| fetchTTS(raw); |
| } catch(e) { |
| tyEl.remove(); |
| addTurn(t, 'Connection error. Please try again.'); |
| } |
| |
| busy = false; SB.disabled = false; |
| } |
| |
| MI.addEventListener('keydown', e => { |
| if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); send(); } |
| }); |
| </script> |
| </body> |
| </html>""" |
|
|
| |
| |
| |
| app = Flask(__name__) |
|
|
| @app.route("/") |
| def index(): |
| return Response(HTML_PAGE, mimetype="text/html") |
|
|
| |
| @app.route("/api/images") |
| def api_images(): |
| if not IMG_DIR.exists(): |
| return jsonify([]) |
| |
| files = [f.stem for f in IMG_DIR.glob("*.png")] |
| return jsonify(files) |
|
|
| @app.route("/img/<path:filename>") |
| def serve_img(filename: str): |
| safe = Path(filename).name |
| target = IMG_DIR / safe |
| if target.exists() and target.is_file(): |
| return send_from_directory(str(IMG_DIR), safe) |
| |
| |
| fallback = IMG_DIR / "default.png" |
| if fallback.exists() and fallback.is_file(): |
| return send_from_directory(str(IMG_DIR), "default.png") |
| |
| return Response("", status=404) |
|
|
| @app.route("/chat", methods=["POST"]) |
| def chat(): |
| data = request.json or {} |
| user_input = data.get("message", "").strip() |
| session_id = data.get("session_id", str(uuid.uuid4())) |
| if not user_input: |
| return jsonify({"error": "Empty message"}), 400 |
| try: |
| resp = generate_response(user_input, session_id) |
| except Exception as exc: |
| print(f"[CHAT] Error: {exc}") |
| traceback.print_exc() |
| resp = "[sad] I encountered an unexpected error. Please try again." |
| return jsonify({"response": resp, "session_id": session_id}) |
|
|
| @app.route("/tts", methods=["POST"]) |
| def tts_endpoint(): |
| data = request.json or {} |
| text = data.get("text", "").strip() |
| rate = int(data.get("rate", TTS_RATE)) |
| pitch = int(data.get("pitch", TTS_PITCH)) |
| if not text: |
| return jsonify({"error": "Empty text"}), 400 |
| audio_b64 = synthesize_speech(text, rate=rate, pitch=pitch) |
| return jsonify({"audio": audio_b64}) |
|
|
| @app.route("/clear", methods=["POST"]) |
| def clear(): |
| data = request.json or {} |
| sid = data.get("session_id", "") |
| with sessions_lock: |
| sessions.pop(sid, None) |
| return jsonify({"status": "cleared"}) |
|
|
| @app.route("/health") |
| def health(): |
| return jsonify({ |
| "model_loaded": model is not None, |
| "tokenizer_loaded": tokenizer is not None, |
| }) |
|
|
| if __name__ == "__main__": |
| print("Visual AI is online -- http://0.0.0.0:7860") |
| app.run(host="0.0.0.0", port=7860, threaded=True) |