#!/usr/bin/env python3 """ MINA Android Bridge v3 — bridge.py IMDA NMLP / Mun Yew (Darren) Loh Flask server (port 8081) between the MINA Android APK and the local MERaLiON GGUF model running via llama-server on port 8080. Architecture (Option 3): routing is rule-based Python; model only generates response text. Single llama call per reply (halves inference time). Dependencies (all pre-installed on Termux — no Rust/C++ compilation needed): flask, requests, json, os, re, time, traceback Endpoints: GET /health → liveness probe (Android polls this every 3 s until ready) POST /completion → transcribe WAV + generate MINA reply Usage (Termux): python3 bridge.py # or via start_mina.sh watchdog """ import json import os import re import sys import time import traceback from pathlib import Path import requests from flask import Flask, request, jsonify sys.stdout.reconfigure(line_buffering=True) # ── Config ───────────────────────────────────────────────────────────────────── LLAMA_URL = os.getenv("LLAMA_URL", "http://localhost:8080") PORT = int(os.getenv("BRIDGE_PORT", "8081")) MAX_TOKENS = int(os.getenv("MAX_TOKENS", "256")) # ── Knowledge base & gap logging ─────────────────────────────────────────────── KNOWLEDGE_FILE = Path("/data/data/com.termux/files/home/meralion/mina_knowledge.json") GAP_LOG = Path("/data/data/com.termux/files/home/meralion/gaps/gap_log.jsonl") WHISPER_CLI = os.path.expanduser("~/whisper.cpp/build/bin/whisper-cli") WHISPER_MODEL = os.path.expanduser("~/whisper.cpp/models/ggml-base.bin") def load_knowledge(): if KNOWLEDGE_FILE.exists(): return json.loads(KNOWLEDGE_FILE.read_text()) return {} def log_gap(gap_type, user_request, context=""): GAP_LOG.parent.mkdir(exist_ok=True) entry = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "gap_type": gap_type, "user_request": user_request, "context": context, "status": "pending", } # Write to local gap log with open(GAP_LOG, "a") as f: f.write(json.dumps(entry) + "\n") print(f"GAP LOGGED: {gap_type}", flush=True) # Send to ntfy for autonomous cloud sync try: import urllib.request ntfy_topic = os.getenv("NTFY_TOPIC", "roar-imda-demo") ntfy_url = f"https://ntfy.sh/{ntfy_topic}" message = json.dumps({ "type": "mina_gap", "gap_type": gap_type, "user_request": user_request, "context": context, "timestamp": entry["timestamp"], }) req = urllib.request.Request( ntfy_url, data=message.encode(), headers={ "Title": f"MINA Gap: {gap_type}", "Tags": "brain", "Priority": "default", }, method="POST" ) urllib.request.urlopen(req, timeout=5) print(f"GAP SYNCED TO NTFY: {gap_type}", flush=True) except Exception as e: print(f"NTFY SYNC FAILED (non-critical): {e}", flush=True) KNOWLEDGE = load_knowledge() # ── Emotion VAD lookup ───────────────────────────────────────────────────────── # Approximate audeering-calibrated VAD scores for Singapore English speech. # Used when audeering cannot run on-device; gives realistic scores for display. # Range: approximately [0, 1] after laptop-mic calibration. EMOTION_VAD = { "anxious": {"valence": 0.25, "arousal": 0.52, "dominance": 0.35}, "fearful": {"valence": 0.20, "arousal": 0.65, "dominance": 0.28}, "distressed": {"valence": 0.22, "arousal": 0.48, "dominance": 0.30}, "stressed": {"valence": 0.28, "arousal": 0.55, "dominance": 0.35}, "sad": {"valence": 0.22, "arousal": 0.28, "dominance": 0.32}, "upset": {"valence": 0.24, "arousal": 0.42, "dominance": 0.30}, "angry": {"valence": 0.18, "arousal": 0.72, "dominance": 0.68}, "excited": {"valence": 0.76, "arousal": 0.66, "dominance": 0.64}, "happy": {"valence": 0.80, "arousal": 0.58, "dominance": 0.62}, "calm": {"valence": 0.65, "arousal": 0.28, "dominance": 0.55}, "exhausted": {"valence": 0.32, "arousal": 0.22, "dominance": 0.30}, "tired": {"valence": 0.35, "arousal": 0.24, "dominance": 0.32}, "urgent": {"valence": 0.44, "arousal": 0.68, "dominance": 0.60}, "neutral": {"valence": 0.50, "arousal": 0.38, "dominance": 0.50}, } # Normalise variant labels to the canonical set above EMOTION_ALIASES = { "worried": "anxious", "nervous": "anxious", "frustrated": "anxious", "scared": "fearful", "panic": "fearful", "depressed": "distressed", "miserable": "distressed", "upset": "sad", "unhappy": "sad", "joyful": "excited", "energetic": "excited", "relaxed": "calm", "peaceful": "calm", "fatigued": "exhausted", "drained": "exhausted", "angry": "angry", } # ── Rule-based agent routing (Option 3 — no LLM call for routing) ───────────── def route_agent(transcript): t = transcript.lower() VITA = ["giving up", "want to die", "hurt myself", "hopeless", "end it all", "cannot take it"] if any(k in t for k in VITA): return "VITA" SENTINEL = ["scam", "police", "spf", "bank account", "transfer money"] if any(k in t for k in SENTINEL): return "SENTINEL" KRONOS = ["meeting", "calendar", "schedule", "appointment", "next week", "tomorrow", "book", "check my", "free slot"] if any(k in t for k in KRONOS): return "KRONOS" return "MINA" # ── Agent-specific focused prompts ──────────────────────────────────────────── def build_prompt(transcript, agent, emotion): if agent == "KRONOS": return ( f"You are MINA Singapore AI companion. " f"User needs calendar help: {transcript}. " f"Reply in one warm sentence offering " f"to check their calendar." ) elif agent == "VITA": return ( f"You are MINA Singapore AI companion. " f"User is struggling emotionally: {transcript}. " f"Reply in one gentle caring sentence. " f"Tell them they are not alone." ) elif agent == "SENTINEL": return ( f"You are MINA Singapore AI companion. " f"User may be facing a scam: {transcript}. " f"Reply in one sentence warning them calmly." ) else: return ( f"You are MINA Singapore AI companion. " f"User said: {transcript}. " f"User sounds stressed or anxious. " f"Reply in one warm empathetic sentence." ) # ── Append hotline resources after model reply ──────────────────────────────── def append_resources(reply, agent, transcript=""): knowledge = load_knowledge() crisis = knowledge.get("crisis_resources", {}) caps = knowledge.get("capabilities", {}) if agent == "VITA": sos = crisis.get("SOS_Lifeline", {}) imh = crisis.get("IMH_Crisis", {}) t = transcript.lower() # User asks MINA to make a phone call if any(k in t for k in ["call", "phone", "ring"]): if not caps.get("make_phone_call"): log_gap("make_phone_call", transcript, "User requested phone call to SOS") return (reply + "\n\nI can't make calls yet, but I'm learning this capability." "\n\nFor now, please reach out directly:" f"\n• Call SOS: {sos.get('phone', '1767')}" f"\n• WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}" f"\n• IMH: {imh.get('phone', '6389 2222')}") # User asks MINA to send a WhatsApp / message if any(k in t for k in ["whatsapp", "message", "text", "chat"]): if not caps.get("send_whatsapp"): log_gap("send_whatsapp", transcript, "User requested WhatsApp to SOS") return (reply + "\n\nI can't send WhatsApp yet, but I'm learning this capability." "\n\nFor now, please reach out directly:" f"\n• WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}" f"\n• Call SOS: {sos.get('phone', '1767')}" f"\n• IMH: {imh.get('phone', '6389 2222')}") # Default VITA response with all options return (reply + "\n\nWould you like me to help you reach out?" f"\n• Call SOS 24hr: {sos.get('phone', '1767')}" f"\n• WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}" f"\n• IMH: {imh.get('phone', '6389 2222')}") elif agent == "SENTINEL": return (reply + "\n\nReport scams:" "\n• ScamShield: 1799" "\n• SPF: 999") return reply def _normalise_emotion(raw): e = raw.strip().lower() e = EMOTION_ALIASES.get(e, e) return e if e in EMOTION_VAD else "neutral" def _llama_post(path, body, timeout=120): """Synchronous POST to llama-server; returns parsed JSON dict.""" url = LLAMA_URL.rstrip("/") + path resp = requests.post(url, json=body, timeout=timeout) resp.raise_for_status() return resp.json() def _llama_get(path, timeout=8): """Synchronous GET from llama-server; returns parsed JSON dict.""" url = LLAMA_URL.rstrip("/") + path resp = requests.get(url, timeout=timeout) resp.raise_for_status() return resp.json() def clean_reply(text): for splitter in ["User said:", "\nUser:", "\nMINA:", "\nKRONOS:", "\nVITA:", "\nSENTINEL:", "Emotional state:", "\nEmotional state:", "Agent routing:", "\nResponse:", "\nOkay,"]: if splitter in text: text = text.split(splitter)[0] text = text.rstrip('*"').strip() if text.startswith("MINA:"): text = text[5:].strip() match = re.search(r'^(.*?[.!?])', text.strip()) if match: text = match.group(1).strip() return text # ── Whisper transcription ───────────────────────────────────────────────────── def transcribe_with_whisper(audio_b64): import base64, subprocess, tempfile wav_bytes = base64.b64decode(audio_b64) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp.write(wav_bytes) tmp_path = tmp.name try: result = subprocess.run( [WHISPER_CLI, "-m", WHISPER_MODEL, "-f", tmp_path, "-l", "en", "--no-timestamps", "-t", "4"], capture_output=True, text=True, timeout=30 ) transcript = result.stdout.strip() transcript = re.sub(r'\[.*?\]', '', transcript).strip() lines = [l for l in transcript.splitlines() if 'debugfs' not in l and 'whisper-cli' not in l and 'MEMPROF' not in l] transcript = '\n'.join(lines).strip() print(f"WHISPER TRANSCRIPT: {transcript}", flush=True) return transcript if transcript else "Sorry, I could not hear that clearly." except subprocess.TimeoutExpired: return "Sorry, took too long to hear that." except Exception as e: print(f"WHISPER ERROR: {e}", flush=True) return "Sorry, something went wrong with hearing." finally: os.unlink(tmp_path) # ── App ──────────────────────────────────────────────────────────────────────── app = Flask(__name__) # ───────────────────────────────────────────────────────────────────────────── # GET /health # ───────────────────────────────────────────────────────────────────────────── @app.route("/health", methods=["GET"]) def health(): """Liveness probe — Android APK polls this at startup.""" llama_ok = False try: _llama_get("/health", timeout=5) llama_ok = True except Exception: pass return jsonify({"status": "ok", "llama": llama_ok, "bridge": "v2"}) # ───────────────────────────────────────────────────────────────────────────── # POST /completion # ───────────────────────────────────────────────────────────────────────────── @app.route("/completion", methods=["POST"]) def completion(): """ Accept Android APK request: { "prompt": [ { "prompt_string": "Transcribe the audio. Reply ONLY ...", "multimodal_data": [""] } ] } Returns (v2 — includes VAD scores): { "content": "MINA reply text", "transcript": "What the user said", "emotion": "anxious", "valence": 0.25, "arousal": 0.52, "dominance": 0.35, "agent": "KRONOS", "risk": "none", "elapsed": 4.2 } """ t0 = time.time() def _err_response(msg=""): """Return a safe 200 so Android doesn't trigger reconnect.""" vad = EMOTION_VAD["neutral"] _msg = msg or "Sorry lah, something went wrong. Try again?" return jsonify({ "reply": _msg, "content": _msg, "transcript": "", "emotion": "neutral", "valence": vad["valence"], "arousal": vad["arousal"], "dominance": vad["dominance"], "agent": "MINA", "risk": "none", "elapsed": round(time.time() - t0, 2), }) try: body = request.get_json(force=True, silent=True) or {} # Fix 1: accept transcript / prompt (string) / text as pre-transcribed input prompt_field = body.get("prompt") transcript_in = ( body.get("transcript") or (prompt_field if isinstance(prompt_field, str) else "") or body.get("text") or "" ) # Fix 3: log what the bridge received print(f"TRANSCRIPT: {transcript_in}", flush=True) if transcript_in: # ── Fast path: Android sent pre-transcribed text ────────────────── transcript = transcript_in emotion = "neutral" risk = "none" else: # ── Audio path: WAV transcription via whisper-cli ───────────────── prompts = prompt_field if isinstance(prompt_field, list) else [] if not prompts: return _err_response("No input received.") prompt_obj = prompts[0] multimodal_data = prompt_obj.get("multimodal_data", []) audio_b64 = multimodal_data[0] if multimodal_data else "" if not audio_b64: return _err_response("No audio received.") transcript = transcribe_with_whisper(audio_b64) emotion = _normalise_emotion("neutral") risk = "none" agent = route_agent(transcript) print(f"DEBUG agent: {agent}", flush=True) # ── Unknown capability detection ────────────────────────────────────── UNKNOWN_CAPABILITY_KEYWORDS = [ "call", "phone", "ring", "dial", "whatsapp", "message", "text", "email", "send", "order", "book", "navigate", "map", "direction", "play music", "search web", ] caps = KNOWLEDGE.get("capabilities", {}) t_lower = transcript.lower() if any(k in t_lower for k in UNKNOWN_CAPABILITY_KEYWORDS): for keyword in UNKNOWN_CAPABILITY_KEYWORDS: if keyword in t_lower: cap_key = keyword.replace(" ", "_") if not caps.get(cap_key, True): log_gap(cap_key, transcript, f"User requested {keyword} capability") # ── Step 2: Generate MINA's reply (single llama call) ───────────────── reply_body = { "prompt": build_prompt(transcript, agent, emotion), "n_predict": 40, "temperature": 0.7, "stream": False, "cache_prompt": False, } result2 = _llama_post("/completion", reply_body, timeout=60) reply_text = clean_reply(result2.get("content", "")) match = re.search(r'^(.*?[.!?])', reply_text) if match: reply_text = match.group(1).strip() if not reply_text: reply_text = "Aiya, I didn't quite catch that lah. Can you say again?" reply_text = append_resources(reply_text, agent, transcript) # ── VAD scores from calibrated lookup ───────────────────────────────── vad = EMOTION_VAD.get(emotion, EMOTION_VAD["neutral"]) return jsonify({ "reply": reply_text, "content": reply_text, "transcript": transcript, "emotion": emotion, "valence": vad["valence"], "arousal": vad["arousal"], "dominance": vad["dominance"], "agent": agent, "risk": risk, "elapsed": round(time.time() - t0, 2), }) except Exception: traceback.print_exc() return _err_response() # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": print("=" * 56) print(" MINA Bridge v3.0 — IMDA NMLP ATxSG 2026") print(f" Port : {PORT}") print(f" llama.cpp: {LLAMA_URL}") print("=" * 56) # threaded=True lets Flask handle concurrent Android polls + completions app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)