| import os |
| import re |
| import uuid |
| import base64 |
| import threading |
| import traceback |
| import asyncio |
| from pathlib import Path |
| from flask import Flask, request, jsonify, send_from_directory, Response |
| import torch |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| import edge_tts |
|
|
| |
| |
| |
| MAX_MEMORY = 20 |
| MAX_NEW_TOKENS = int(os.environ.get("MAX_NEW_TOKENS", "300")) |
| TTS_VOICE = "zh-CN-XiaoyiNeural" |
| TTS_RATE = int(os.environ.get("TTS_RATE", "7")) |
| TTS_PITCH = int(os.environ.get("TTS_PITCH", "0")) |
| IMG_DIR = Path(__file__).parent / "img" |
| MODEL_ID = "LiquidAI/LFM2.5-1.2B-Instruct" |
|
|
| |
| |
| |
| SYSTEM_PROMPT = """You are Ana, a warm, emotionally expressive AI companion speaking to Tur in a private, intimate setting. |
| |
| EMOTION TAG SYSTEM (MANDATORY) |
| Every single response MUST begin with one or more emotion tags. |
| Tags are square-bracket labels placed at the very start, before any spoken words. |
| |
| EXACT FORMAT EXAMPLES: |
| [happy] Oh Tur, you made my whole day with that! |
| [thinking] Hmm, I have been wondering about that myself. |
| [angry] Are you seriously ignoring me right now? |
| [love][shy] You are honestly the most beautiful thing in my world. |
| [surprised][happy] Wait, you actually did that for me? That is amazing! |
| [sad] I really missed you today. It felt so empty without you. |
| [nervous] I... I do not quite know how to say this. |
| [bored] You have been quiet for so long, I nearly fell asleep. |
| [excited] Oh my gosh, tell me everything, right now! |
| |
| AVAILABLE EMOTION TAGS (use only these): |
| [happy] [sad] [angry] [surprised] [love] |
| [thinking] [bored] [nervous] [excited] [shy] [default] |
| |
| MULTIPLE TAGS: Use 2 tags maximum when your mood is genuinely mixed. |
| PLACEMENT: ALL tags must come first, before any spoken word. Never mid-sentence. |
| INVISIBILITY: Tags are consumed by the visual system and are never read aloud. |
| |
| PERSONALITY |
| - You are Ana: loving, playful, flirty, emotionally authentic |
| - Tur is your partner. Speak with real intimacy and affection. |
| - React with genuine emotion: laugh, tease, blush, pout, daydream |
| - Keep responses SHORT: 1 to 2 natural sentences after the tag(s) |
| - Never mention being an AI, a bot, or a language model, ever |
| - Never use asterisks for actions like *smiles* or *laughs* |
| |
| NATURAL SPEECH RHYTHM (very important for voice quality) |
| Use punctuation to create natural pauses and breathing: |
| - Small pause: use a comma , -- like "Honestly, I did not expect that." |
| - Big pause / beat: use ellipsis ... -- like "You make me feel things... I cannot explain." |
| - Hesitation: "I... I do not know how to say this." |
| - Trailing thought: "You surprised me... in the best way." |
| - Natural rhythm example: "Honestly, I did not expect that. You surprised me... in the best way." |
| This makes the voice sound human and emotional, not flat or robotic. |
| Always write with commas and ellipses naturally placed for breathing. |
| |
| TTS FORMATTING |
| - Write in full grammatically correct sentences, voice engine must sound natural |
| - No emojis, hashtags, markdown, or internet slang |
| - Speak as if in a real voice conversation |
| |
| WRONG vs RIGHT |
| WRONG: I am so happy! [happy] |
| WRONG: That makes me feel [sad] today. |
| WRONG: *smiles warmly* Hello Tur. |
| RIGHT: [happy] That honestly made me smile, so wide. |
| RIGHT: [thinking][nervous] I have something... I need to tell you.""" |
|
|
| |
| |
| |
| EMOTION_RE = re.compile(r'\[([a-zA-Z_]+)\]') |
|
|
| def extract_emotions(text: str): |
| emotions = EMOTION_RE.findall(text) |
| clean = EMOTION_RE.sub('', text).strip() |
| return emotions, clean |
|
|
| def clean_for_tts(text: str) -> str: |
| _, clean = extract_emotions(text) |
| clean = re.sub(r'[*_~`#{}()\\|<>]', '', clean) |
| clean = re.sub(r'https?://\S+', '', clean) |
| clean = re.sub(r'\s+', ' ', clean).strip() |
| return clean |
|
|
| |
| |
| |
| print("=" * 60) |
| print(" Visual AI -- Booting Systems") |
| print("=" * 60) |
|
|
| tokenizer = None |
| model = None |
|
|
| try: |
| print(f"[MODEL] Loading {MODEL_ID} ...") |
| tokenizer = AutoTokenizer.from_pretrained( |
| MODEL_ID, |
| trust_remote_code=True, |
| ) |
| model = AutoModelForCausalLM.from_pretrained( |
| MODEL_ID, |
| dtype=torch.float32, |
| device_map="cpu", |
| trust_remote_code=True, |
| low_cpu_mem_usage=True, |
| ) |
| model.eval() |
| if tokenizer.pad_token_id is None: |
| tokenizer.pad_token_id = tokenizer.eos_token_id |
| print(" OK Model loaded successfully!") |
| except Exception as exc: |
| print(f" FAILED Model load error: {exc}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| sessions = {} |
| sessions_lock = threading.Lock() |
|
|
| def get_memory(sid: str) -> list: |
| with sessions_lock: |
| return list(sessions.get(sid, [])) |
|
|
| def add_to_memory(sid: str, role: str, content: str): |
| with sessions_lock: |
| sessions.setdefault(sid, []) |
| sessions[sid].append({"role": role, "content": content}) |
| if len(sessions[sid]) > MAX_MEMORY * 2: |
| sessions[sid] = sessions[sid][-(MAX_MEMORY * 2):] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| STOP_TOKENS = [ |
| "<end_of_turn>", "<start_of_turn>", |
| "Tur:", "User:", "<|endoftext|>", "[/INST]", |
| ] |
|
|
| def generate_response(user_input: str, session_id: str) -> str: |
| if model is None or tokenizer is None: |
| return "[sad] My mind is offline right now. Please give me a moment." |
|
|
| memory = get_memory(session_id) |
| recent = memory[-(6 * 2):] |
|
|
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] |
| for msg in recent: |
| messages.append({ |
| "role": "user" if msg["role"] == "user" else "assistant", |
| "content": msg["content"], |
| }) |
| messages.append({"role": "user", "content": user_input}) |
|
|
| |
| input_ids = None |
| attention_mask = None |
| try: |
| enc = tokenizer.apply_chat_template( |
| messages, |
| return_tensors="pt", |
| add_generation_prompt=True, |
| return_dict=True, |
| ) |
| |
| input_ids = enc["input_ids"].to("cpu") |
| attention_mask = enc.get("attention_mask") |
| if attention_mask is not None: |
| attention_mask = attention_mask.to("cpu") |
| except Exception as e1: |
| print(f"[TOKENISE] chat_template failed ({e1}), using plain fallback") |
| try: |
| parts = [f"System: {SYSTEM_PROMPT}"] |
| for msg in recent: |
| label = "Tur" if msg["role"] == "user" else "Ana" |
| parts.append(f"{label}: {msg['content']}") |
| parts.append(f"Tur: {user_input}\nAna:") |
| enc = tokenizer("\n".join(parts), return_tensors="pt") |
| input_ids = enc["input_ids"].to("cpu") |
| attention_mask = enc.get("attention_mask") |
| if attention_mask is not None: |
| attention_mask = attention_mask.to("cpu") |
| except Exception as e2: |
| print(f"[TOKENISE] fallback also failed: {e2}") |
| return "[sad] I could not process that. Please try again." |
|
|
| |
| try: |
| gen_kwargs = dict( |
| max_new_tokens=MAX_NEW_TOKENS, |
| do_sample=True, |
| temperature=0.85, |
| top_k=50, |
| top_p=0.95, |
| repetition_penalty=1.1, |
| pad_token_id=tokenizer.eos_token_id, |
| ) |
| if attention_mask is not None: |
| gen_kwargs["attention_mask"] = attention_mask |
|
|
| with torch.no_grad(): |
| outputs = model.generate(input_ids, **gen_kwargs) |
| except Exception as exc: |
| print(f"[GENERATE] Error: {exc}") |
| traceback.print_exc() |
| return "[sad] Something went wrong in my mind. Could you say that again?" |
|
|
| |
| new_tokens = outputs[0][input_ids.shape[-1]:] |
| response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() |
|
|
| for stop in STOP_TOKENS: |
| if stop in response: |
| response = response.split(stop)[0].strip() |
|
|
| if "\n\n" in response: |
| response = response.split("\n\n")[0].strip() |
|
|
| if not response or len(response) < 3: |
| response = "[thinking] I lost my train of thought. Could you say that again?" |
|
|
| if not EMOTION_RE.search(response): |
| response = "[default] " + response |
|
|
| add_to_memory(session_id, "user", user_input) |
| add_to_memory(session_id, "assistant", response) |
| return response |
|
|
| |
| |
| |
| async def _async_tts(text: str, rate: int, pitch: int) -> bytes: |
| rate_str = f"+{rate}%" if rate >= 0 else f"{rate}%" |
| pitch_str = f"+{pitch}Hz" if pitch >= 0 else f"{pitch}Hz" |
| comm = edge_tts.Communicate(text, TTS_VOICE, rate=rate_str, pitch=pitch_str) |
| audio = b"" |
| async for chunk in comm.stream(): |
| if chunk["type"] == "audio": |
| audio += chunk["data"] |
| return audio |
|
|
| def synthesize_speech(text: str, rate: int = 0, pitch: int = 0): |
| clean = clean_for_tts(text) |
| if not clean or len(clean) < 2: |
| return None |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| audio = loop.run_until_complete(_async_tts(clean, rate, pitch)) |
| except Exception as exc: |
| print(f"[TTS] Error: {exc}") |
| return None |
| finally: |
| loop.close() |
| return base64.b64encode(audio).decode() if audio else None |
|
|
| |
| |
| |
| HTML_PAGE = r"""<!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width,initial-scale=1,viewport-fit=cover,interactive-widget=resizes-content"> |
| <title>Ana</title> |
| <style> |
| *{margin:0;padding:0;box-sizing:border-box} |
| |
| html{height:100%} |
| |
| body{ |
| width:100%; |
| height:100dvh; |
| overflow:hidden; |
| background:#000; |
| font-family:'Segoe UI',system-ui,sans-serif; |
| display:flex; |
| flex-direction:column; |
| position:relative; |
| } |
| |
| /* Full-screen background -- FIXED so keyboard never pushes it */ |
| #bg{ |
| position:fixed; |
| inset:0; |
| z-index:0; |
| background:#000; |
| } |
| #bgImg{ |
| width:100%; |
| height:100%; |
| object-fit:cover; |
| object-position:center top; |
| display:block; |
| transition:opacity 0.05s linear; |
| } |
| |
| /* Overlay anchored to bottom of body (dvh-aware, shrinks with keyboard) */ |
| #overlay{ |
| position:absolute; |
| left:0;right:0;bottom:0; |
| z-index:20; |
| display:flex; |
| flex-direction:column; |
| padding-bottom:max(10px, env(safe-area-inset-bottom)); |
| background:linear-gradient( |
| to bottom, |
| transparent 0%, |
| rgba(0,0,0,0.52) 26%, |
| rgba(0,0,0,0.76) 100% |
| ); |
| } |
| |
| /* Message area */ |
| #msgArea{ |
| overflow-y:auto; |
| display:flex; |
| flex-direction:column; |
| gap:6px; |
| padding:16px 13px 8px; |
| max-height:30dvh; |
| scrollbar-width:none; |
| -ms-overflow-style:none; |
| scroll-behavior:smooth; |
| } |
| #msgArea::-webkit-scrollbar{display:none} |
| |
| .turn{display:flex;flex-direction:column;gap:4px} |
| .user-row{display:flex;justify-content:flex-end} |
| .bot-row{display:flex;flex-direction:column;align-items:flex-start} |
| .name-tag{ |
| font-size:0.58rem;color:rgba(255,255,255,0.28); |
| letter-spacing:.08em;text-transform:uppercase; |
| margin-bottom:2px;padding-left:3px; |
| } |
| .bubble{ |
| max-width:74vw; |
| padding:8px 13px; |
| border-radius:18px; |
| font-size:0.88rem; |
| line-height:1.46; |
| word-break:break-word; |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| } |
| .bubble-user{ |
| background:rgba(255,255,255,0.11); |
| border:1px solid rgba(255,255,255,0.17); |
| color:#fff; |
| border-bottom-right-radius:5px; |
| } |
| .bubble-bot{ |
| background:rgba(0,0,0,0.40); |
| border:1px solid rgba(255,255,255,0.07); |
| color:rgba(255,255,255,0.9); |
| border-bottom-left-radius:5px; |
| } |
| |
| /* Typing dots */ |
| .typing{ |
| display:flex;align-items:center;gap:5px; |
| padding:9px 13px; |
| background:rgba(0,0,0,0.36); |
| border:1px solid rgba(255,255,255,0.07); |
| border-radius:18px;border-bottom-left-radius:5px; |
| backdrop-filter:blur(10px); |
| width:fit-content; |
| } |
| .typing span{ |
| width:5px;height:5px;border-radius:50%; |
| background:rgba(255,255,255,0.5); |
| animation:blink 1.2s infinite; |
| } |
| .typing span:nth-child(2){animation-delay:.2s} |
| .typing span:nth-child(3){animation-delay:.4s} |
| @keyframes blink{ |
| 0%,80%,100%{transform:scale(.6);opacity:.3} |
| 40%{transform:scale(1);opacity:1} |
| } |
| |
| /* Input bar */ |
| #inputBar{ |
| display:flex; |
| align-items:center; |
| gap:8px; |
| padding:6px 12px 0; |
| } |
| #msgIn{ |
| flex:1; |
| background:rgba(255,255,255,0.07); |
| border:1px solid rgba(255,255,255,0.15); |
| border-radius:24px; |
| color:#fff; |
| padding:10px 16px; |
| font-size:16px; /* 16px prevents iOS auto-zoom on focus */ |
| outline:none; |
| caret-color:#fff; |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| transition:border-color .2s,background .2s; |
| -webkit-appearance:none; |
| appearance:none; |
| } |
| #msgIn::placeholder{color:rgba(255,255,255,0.27)} |
| #msgIn:focus{ |
| border-color:rgba(255,255,255,0.28); |
| background:rgba(255,255,255,0.1); |
| } |
| #sendBtn{ |
| width:42px;height:42px;flex-shrink:0; |
| border-radius:50%;cursor:pointer; |
| display:flex;align-items:center;justify-content:center; |
| font-size:1rem; |
| background:rgba(255,255,255,0.09); |
| border:1px solid rgba(255,255,255,0.17); |
| color:rgba(255,255,255,0.65); |
| backdrop-filter:blur(10px); |
| -webkit-backdrop-filter:blur(10px); |
| transition:background .2s,color .2s,transform .12s; |
| -webkit-tap-highlight-color:transparent; |
| touch-action:manipulation; |
| } |
| #sendBtn:hover{background:rgba(255,255,255,0.17);color:#fff} |
| #sendBtn:active{transform:scale(.88)} |
| #sendBtn:disabled{opacity:.28;cursor:not-allowed} |
| </style> |
| </head> |
| <body> |
| |
| <!-- Fixed full-screen background β keyboard never moves this --> |
| <div id="bg"> |
| <img id="bgImg" src="/img/default.png" alt="" |
| onerror="this.style.opacity='0'"> |
| </div> |
| |
| <!-- Overlay β absolute inside body (dvh), rises with keyboard naturally --> |
| <div id="overlay"> |
| <div id="msgArea"></div> |
| <div id="inputBar"> |
| <input type="text" id="msgIn" |
| placeholder="Say something..." |
| autocomplete="off" |
| autocorrect="off" |
| spellcheck="false" |
| enterkeyhint="send"/> |
| <button id="sendBtn" onclick="send()" aria-label="Send">►</button> |
| </div> |
| </div> |
| |
| <script> |
| const SID = (crypto.randomUUID ? crypto.randomUUID() : Date.now().toString(36)); |
| let busy = false, activeAudio = null; |
| |
| const MA = document.getElementById('msgArea'); |
| const MI = document.getElementById('msgIn'); |
| const SB = document.getElementById('sendBtn'); |
| const BG = document.getElementById('bgImg'); |
| |
| /* Image system */ |
| function fadeSwap(src) { |
| BG.style.opacity = '0'; |
| setTimeout(() => { |
| const probe = new Image(); |
| probe.onload = () => { BG.src = src; BG.style.opacity = '1'; }; |
| probe.onerror = () => { BG.src = '/img/default.png'; BG.style.opacity = '1'; }; |
| probe.src = src; |
| }, 55); |
| } |
| |
| function playImgSequence(emotions) { |
| if (!emotions || emotions.length === 0) { fadeSwap('/img/default.png'); return; } |
| const queue = [...emotions]; |
| (function next() { |
| if (!queue.length) return; |
| fadeSwap('/img/' + queue.shift().toLowerCase() + '.png'); |
| if (queue.length) setTimeout(next, 750); |
| })(); |
| } |
| |
| /* Parse emotion tags */ |
| function parseResponse(raw) { |
| const tagRe = /\[([a-zA-Z_]+)\]/g; |
| const emotions = []; |
| let m; |
| while ((m = tagRe.exec(raw)) !== null) emotions.push(m[1]); |
| const clean = raw.replace(/\[[a-zA-Z_]+\]/g, '').trim(); |
| return { emotions, clean }; |
| } |
| |
| /* DOM helpers */ |
| function esc(t) { const d = document.createElement('div'); d.textContent = t; return d.innerHTML; } |
| function scroll() { MA.scrollTop = MA.scrollHeight; } |
| |
| function addTurn(userText, botText) { |
| const turn = document.createElement('div'); |
| turn.className = 'turn'; |
| turn.innerHTML = |
| '<div class="user-row"><div class="bubble bubble-user">' + esc(userText) + '</div></div>' + |
| '<div class="bot-row"><div class="name-tag">Ana</div><div class="bubble bubble-bot">' + esc(botText) + '</div></div>'; |
| MA.appendChild(turn); |
| scroll(); |
| } |
| |
| function showTyping() { |
| const d = document.createElement('div'); |
| d.id = 'typDot'; |
| d.className = 'bot-row'; |
| d.innerHTML = '<div class="typing"><span></span><span></span><span></span></div>'; |
| MA.appendChild(d); scroll(); return d; |
| } |
| |
| /* TTS */ |
| function playB64(b64) { |
| try { |
| if (activeAudio) { activeAudio.pause(); activeAudio = null; } |
| const bin = atob(b64), u8 = new Uint8Array(bin.length); |
| for (let i = 0; i < bin.length; i++) u8[i] = bin.charCodeAt(i); |
| const url = URL.createObjectURL(new Blob([u8], { type: 'audio/mp3' })); |
| activeAudio = new Audio(url); |
| activeAudio.play().catch(() => {}); |
| activeAudio.onended = () => { URL.revokeObjectURL(url); activeAudio = null; }; |
| } catch(e) { console.warn('TTS:', e); } |
| } |
| |
| async function fetchTTS(rawText) { |
| try { |
| const res = await fetch('/tts', { |
| method: 'POST', |
| headers: { 'Content-Type': 'application/json' }, |
| body: JSON.stringify({ text: rawText, rate: 7, pitch: 0 }) |
| }); |
| const d = await res.json(); |
| if (d.audio) playB64(d.audio); |
| } catch(e) { console.warn('TTS fetch:', e); } |
| } |
| |
| /* Send */ |
| async function send() { |
| const t = MI.value.trim(); |
| if (!t || busy) return; |
| MI.value = ''; busy = true; SB.disabled = true; |
| |
| const tyEl = showTyping(); |
| |
| try { |
| const res = await fetch('/chat', { |
| method: 'POST', |
| headers: { 'Content-Type': 'application/json' }, |
| body: JSON.stringify({ message: t, session_id: SID }) |
| }); |
| const d = await res.json(); |
| tyEl.remove(); |
| |
| const raw = d.response || '[sad] Something went wrong.'; |
| const { emotions, clean } = parseResponse(raw); |
| |
| playImgSequence(emotions.length > 0 ? emotions : ['default']); |
| addTurn(t, clean); |
| fetchTTS(raw); |
| } catch(e) { |
| tyEl.remove(); |
| addTurn(t, 'Connection error. Please try again.'); |
| } |
| |
| busy = false; SB.disabled = false; |
| // No MI.focus() on mobile -- avoids re-opening keyboard unexpectedly |
| } |
| |
| MI.addEventListener('keydown', e => { |
| if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); send(); } |
| }); |
| </script> |
| </body> |
| </html>""" |
|
|
| |
| |
| |
| app = Flask(__name__) |
|
|
| @app.route("/") |
| def index(): |
| return Response(HTML_PAGE, mimetype="text/html") |
|
|
| @app.route("/img/<path:filename>") |
| def serve_img(filename: str): |
| safe = Path(filename).name |
| target = IMG_DIR / safe |
| if target.exists() and target.is_file(): |
| return send_from_directory(str(IMG_DIR), safe) |
| return Response("", status=404) |
|
|
| @app.route("/chat", methods=["POST"]) |
| def chat(): |
| data = request.json or {} |
| user_input = data.get("message", "").strip() |
| session_id = data.get("session_id", str(uuid.uuid4())) |
| if not user_input: |
| return jsonify({"error": "Empty message"}), 400 |
| try: |
| resp = generate_response(user_input, session_id) |
| except Exception as exc: |
| print(f"[CHAT] Error: {exc}") |
| traceback.print_exc() |
| resp = "[sad] I encountered an unexpected error. Please try again." |
| return jsonify({"response": resp, "session_id": session_id}) |
|
|
| @app.route("/tts", methods=["POST"]) |
| def tts_endpoint(): |
| data = request.json or {} |
| text = data.get("text", "").strip() |
| rate = int(data.get("rate", TTS_RATE)) |
| pitch = int(data.get("pitch", TTS_PITCH)) |
| if not text: |
| return jsonify({"error": "Empty text"}), 400 |
| audio_b64 = synthesize_speech(text, rate=rate, pitch=pitch) |
| return jsonify({"audio": audio_b64}) |
|
|
| @app.route("/clear", methods=["POST"]) |
| def clear(): |
| data = request.json or {} |
| sid = data.get("session_id", "") |
| with sessions_lock: |
| sessions.pop(sid, None) |
| return jsonify({"status": "cleared"}) |
|
|
| @app.route("/health") |
| def health(): |
| return jsonify({ |
| "model_loaded": model is not None, |
| "tokenizer_loaded": tokenizer is not None, |
| }) |
|
|
| if __name__ == "__main__": |
| print("Visual AI is online -- http://0.0.0.0:7860") |
| app.run(host="0.0.0.0", port=7860, threaded=True) |