mina-bridge / bridge.py
munyew's picture
Upload bridge.py with huggingface_hub
6ed1a03 verified
#!/usr/bin/env python3
"""
MINA Android Bridge v3 β€” bridge.py
IMDA NMLP / Mun Yew (Darren) Loh
Flask server (port 8081) between the MINA Android APK and the local
MERaLiON GGUF model running via llama-server on port 8080.
Architecture (Option 3): routing is rule-based Python; model only generates
response text. Single llama call per reply (halves inference time).
Dependencies (all pre-installed on Termux β€” no Rust/C++ compilation needed):
flask, requests, json, os, re, time, traceback
Endpoints:
GET /health β†’ liveness probe (Android polls this every 3 s until ready)
POST /completion β†’ transcribe WAV + generate MINA reply
Usage (Termux):
python3 bridge.py
# or via start_mina.sh watchdog
"""
import json
import os
import re
import sys
import time
import traceback
from pathlib import Path
import requests
from flask import Flask, request, jsonify
sys.stdout.reconfigure(line_buffering=True)
# ── Config ─────────────────────────────────────────────────────────────────────
LLAMA_URL = os.getenv("LLAMA_URL", "http://localhost:8080")
PORT = int(os.getenv("BRIDGE_PORT", "8081"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "256"))
# ── Knowledge base & gap logging ───────────────────────────────────────────────
KNOWLEDGE_FILE = Path("/data/data/com.termux/files/home/meralion/mina_knowledge.json")
GAP_LOG = Path("/data/data/com.termux/files/home/meralion/gaps/gap_log.jsonl")
WHISPER_CLI = os.path.expanduser("~/whisper.cpp/build/bin/whisper-cli")
WHISPER_MODEL = os.path.expanduser("~/whisper.cpp/models/ggml-base.bin")
def load_knowledge():
if KNOWLEDGE_FILE.exists():
return json.loads(KNOWLEDGE_FILE.read_text())
return {}
def log_gap(gap_type, user_request, context=""):
GAP_LOG.parent.mkdir(exist_ok=True)
entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"gap_type": gap_type,
"user_request": user_request,
"context": context,
"status": "pending",
}
# Write to local gap log
with open(GAP_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
print(f"GAP LOGGED: {gap_type}", flush=True)
# Send to ntfy for autonomous cloud sync
try:
import urllib.request
ntfy_topic = os.getenv("NTFY_TOPIC", "roar-imda-demo")
ntfy_url = f"https://ntfy.sh/{ntfy_topic}"
message = json.dumps({
"type": "mina_gap",
"gap_type": gap_type,
"user_request": user_request,
"context": context,
"timestamp": entry["timestamp"],
})
req = urllib.request.Request(
ntfy_url,
data=message.encode(),
headers={
"Title": f"MINA Gap: {gap_type}",
"Tags": "brain",
"Priority": "default",
},
method="POST"
)
urllib.request.urlopen(req, timeout=5)
print(f"GAP SYNCED TO NTFY: {gap_type}", flush=True)
except Exception as e:
print(f"NTFY SYNC FAILED (non-critical): {e}", flush=True)
KNOWLEDGE = load_knowledge()
# ── Emotion VAD lookup ─────────────────────────────────────────────────────────
# Approximate audeering-calibrated VAD scores for Singapore English speech.
# Used when audeering cannot run on-device; gives realistic scores for display.
# Range: approximately [0, 1] after laptop-mic calibration.
EMOTION_VAD = {
"anxious": {"valence": 0.25, "arousal": 0.52, "dominance": 0.35},
"fearful": {"valence": 0.20, "arousal": 0.65, "dominance": 0.28},
"distressed": {"valence": 0.22, "arousal": 0.48, "dominance": 0.30},
"stressed": {"valence": 0.28, "arousal": 0.55, "dominance": 0.35},
"sad": {"valence": 0.22, "arousal": 0.28, "dominance": 0.32},
"upset": {"valence": 0.24, "arousal": 0.42, "dominance": 0.30},
"angry": {"valence": 0.18, "arousal": 0.72, "dominance": 0.68},
"excited": {"valence": 0.76, "arousal": 0.66, "dominance": 0.64},
"happy": {"valence": 0.80, "arousal": 0.58, "dominance": 0.62},
"calm": {"valence": 0.65, "arousal": 0.28, "dominance": 0.55},
"exhausted": {"valence": 0.32, "arousal": 0.22, "dominance": 0.30},
"tired": {"valence": 0.35, "arousal": 0.24, "dominance": 0.32},
"urgent": {"valence": 0.44, "arousal": 0.68, "dominance": 0.60},
"neutral": {"valence": 0.50, "arousal": 0.38, "dominance": 0.50},
}
# Normalise variant labels to the canonical set above
EMOTION_ALIASES = {
"worried": "anxious",
"nervous": "anxious",
"frustrated": "anxious",
"scared": "fearful",
"panic": "fearful",
"depressed": "distressed",
"miserable": "distressed",
"upset": "sad",
"unhappy": "sad",
"joyful": "excited",
"energetic": "excited",
"relaxed": "calm",
"peaceful": "calm",
"fatigued": "exhausted",
"drained": "exhausted",
"angry": "angry",
}
# ── Rule-based agent routing (Option 3 β€” no LLM call for routing) ─────────────
def route_agent(transcript):
t = transcript.lower()
VITA = ["giving up", "want to die",
"hurt myself", "hopeless",
"end it all", "cannot take it"]
if any(k in t for k in VITA):
return "VITA"
SENTINEL = ["scam", "police", "spf",
"bank account", "transfer money"]
if any(k in t for k in SENTINEL):
return "SENTINEL"
KRONOS = ["meeting", "calendar", "schedule",
"appointment", "next week", "tomorrow",
"book", "check my", "free slot"]
if any(k in t for k in KRONOS):
return "KRONOS"
return "MINA"
# ── Agent-specific focused prompts ────────────────────────────────────────────
def build_prompt(transcript, agent, emotion):
if agent == "KRONOS":
return (
f"You are MINA Singapore AI companion. "
f"User needs calendar help: {transcript}. "
f"Reply in one warm sentence offering "
f"to check their calendar."
)
elif agent == "VITA":
return (
f"You are MINA Singapore AI companion. "
f"User is struggling emotionally: {transcript}. "
f"Reply in one gentle caring sentence. "
f"Tell them they are not alone."
)
elif agent == "SENTINEL":
return (
f"You are MINA Singapore AI companion. "
f"User may be facing a scam: {transcript}. "
f"Reply in one sentence warning them calmly."
)
else:
return (
f"You are MINA Singapore AI companion. "
f"User said: {transcript}. "
f"User sounds stressed or anxious. "
f"Reply in one warm empathetic sentence."
)
# ── Append hotline resources after model reply ────────────────────────────────
def append_resources(reply, agent, transcript=""):
knowledge = load_knowledge()
crisis = knowledge.get("crisis_resources", {})
caps = knowledge.get("capabilities", {})
if agent == "VITA":
sos = crisis.get("SOS_Lifeline", {})
imh = crisis.get("IMH_Crisis", {})
t = transcript.lower()
# User asks MINA to make a phone call
if any(k in t for k in ["call", "phone", "ring"]):
if not caps.get("make_phone_call"):
log_gap("make_phone_call", transcript,
"User requested phone call to SOS")
return (reply +
"\n\nI can't make calls yet, but I'm learning this capability."
"\n\nFor now, please reach out directly:"
f"\nβ€’ Call SOS: {sos.get('phone', '1767')}"
f"\nβ€’ WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}"
f"\nβ€’ IMH: {imh.get('phone', '6389 2222')}")
# User asks MINA to send a WhatsApp / message
if any(k in t for k in ["whatsapp", "message", "text", "chat"]):
if not caps.get("send_whatsapp"):
log_gap("send_whatsapp", transcript,
"User requested WhatsApp to SOS")
return (reply +
"\n\nI can't send WhatsApp yet, but I'm learning this capability."
"\n\nFor now, please reach out directly:"
f"\nβ€’ WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}"
f"\nβ€’ Call SOS: {sos.get('phone', '1767')}"
f"\nβ€’ IMH: {imh.get('phone', '6389 2222')}")
# Default VITA response with all options
return (reply +
"\n\nWould you like me to help you reach out?"
f"\nβ€’ Call SOS 24hr: {sos.get('phone', '1767')}"
f"\nβ€’ WhatsApp SOS: {sos.get('whatsapp', 'https://wa.me/6591511767')}"
f"\nβ€’ IMH: {imh.get('phone', '6389 2222')}")
elif agent == "SENTINEL":
return (reply +
"\n\nReport scams:"
"\nβ€’ ScamShield: 1799"
"\nβ€’ SPF: 999")
return reply
def _normalise_emotion(raw):
e = raw.strip().lower()
e = EMOTION_ALIASES.get(e, e)
return e if e in EMOTION_VAD else "neutral"
def _llama_post(path, body, timeout=120):
"""Synchronous POST to llama-server; returns parsed JSON dict."""
url = LLAMA_URL.rstrip("/") + path
resp = requests.post(url, json=body, timeout=timeout)
resp.raise_for_status()
return resp.json()
def _llama_get(path, timeout=8):
"""Synchronous GET from llama-server; returns parsed JSON dict."""
url = LLAMA_URL.rstrip("/") + path
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
return resp.json()
def clean_reply(text):
for splitter in ["User said:", "\nUser:",
"\nMINA:", "\nKRONOS:",
"\nVITA:", "\nSENTINEL:",
"Emotional state:",
"\nEmotional state:",
"Agent routing:",
"\nResponse:", "\nOkay,"]:
if splitter in text:
text = text.split(splitter)[0]
text = text.rstrip('*"').strip()
if text.startswith("MINA:"):
text = text[5:].strip()
match = re.search(r'^(.*?[.!?])', text.strip())
if match:
text = match.group(1).strip()
return text
# ── Whisper transcription ─────────────────────────────────────────────────────
def transcribe_with_whisper(audio_b64):
import base64, subprocess, tempfile
wav_bytes = base64.b64decode(audio_b64)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(wav_bytes)
tmp_path = tmp.name
try:
result = subprocess.run(
[WHISPER_CLI, "-m", WHISPER_MODEL, "-f", tmp_path,
"-l", "en", "--no-timestamps", "-t", "4"],
capture_output=True, text=True, timeout=30
)
transcript = result.stdout.strip()
transcript = re.sub(r'\[.*?\]', '', transcript).strip()
lines = [l for l in transcript.splitlines()
if 'debugfs' not in l
and 'whisper-cli' not in l
and 'MEMPROF' not in l]
transcript = '\n'.join(lines).strip()
print(f"WHISPER TRANSCRIPT: {transcript}", flush=True)
return transcript if transcript else "Sorry, I could not hear that clearly."
except subprocess.TimeoutExpired:
return "Sorry, took too long to hear that."
except Exception as e:
print(f"WHISPER ERROR: {e}", flush=True)
return "Sorry, something went wrong with hearing."
finally:
os.unlink(tmp_path)
# ── App ────────────────────────────────────────────────────────────────────────
app = Flask(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# GET /health
# ─────────────────────────────────────────────────────────────────────────────
@app.route("/health", methods=["GET"])
def health():
"""Liveness probe β€” Android APK polls this at startup."""
llama_ok = False
try:
_llama_get("/health", timeout=5)
llama_ok = True
except Exception:
pass
return jsonify({"status": "ok", "llama": llama_ok, "bridge": "v2"})
# ─────────────────────────────────────────────────────────────────────────────
# POST /completion
# ─────────────────────────────────────────────────────────────────────────────
@app.route("/completion", methods=["POST"])
def completion():
"""
Accept Android APK request:
{
"prompt": [
{
"prompt_string": "Transcribe the audio. Reply ONLY ...",
"multimodal_data": ["<base64-WAV>"]
}
]
}
Returns (v2 β€” includes VAD scores):
{
"content": "MINA reply text",
"transcript": "What the user said",
"emotion": "anxious",
"valence": 0.25,
"arousal": 0.52,
"dominance": 0.35,
"agent": "KRONOS",
"risk": "none",
"elapsed": 4.2
}
"""
t0 = time.time()
def _err_response(msg=""):
"""Return a safe 200 so Android doesn't trigger reconnect."""
vad = EMOTION_VAD["neutral"]
_msg = msg or "Sorry lah, something went wrong. Try again?"
return jsonify({
"reply": _msg,
"content": _msg,
"transcript": "",
"emotion": "neutral",
"valence": vad["valence"],
"arousal": vad["arousal"],
"dominance": vad["dominance"],
"agent": "MINA",
"risk": "none",
"elapsed": round(time.time() - t0, 2),
})
try:
body = request.get_json(force=True, silent=True) or {}
# Fix 1: accept transcript / prompt (string) / text as pre-transcribed input
prompt_field = body.get("prompt")
transcript_in = (
body.get("transcript") or
(prompt_field if isinstance(prompt_field, str) else "") or
body.get("text") or ""
)
# Fix 3: log what the bridge received
print(f"TRANSCRIPT: {transcript_in}", flush=True)
if transcript_in:
# ── Fast path: Android sent pre-transcribed text ──────────────────
transcript = transcript_in
emotion = "neutral"
risk = "none"
else:
# ── Audio path: WAV transcription via whisper-cli ─────────────────
prompts = prompt_field if isinstance(prompt_field, list) else []
if not prompts:
return _err_response("No input received.")
prompt_obj = prompts[0]
multimodal_data = prompt_obj.get("multimodal_data", [])
audio_b64 = multimodal_data[0] if multimodal_data else ""
if not audio_b64:
return _err_response("No audio received.")
transcript = transcribe_with_whisper(audio_b64)
emotion = _normalise_emotion("neutral")
risk = "none"
agent = route_agent(transcript)
print(f"DEBUG agent: {agent}", flush=True)
# ── Unknown capability detection ──────────────────────────────────────
UNKNOWN_CAPABILITY_KEYWORDS = [
"call", "phone", "ring", "dial",
"whatsapp", "message", "text",
"email", "send", "order", "book",
"navigate", "map", "direction",
"play music", "search web",
]
caps = KNOWLEDGE.get("capabilities", {})
t_lower = transcript.lower()
if any(k in t_lower for k in UNKNOWN_CAPABILITY_KEYWORDS):
for keyword in UNKNOWN_CAPABILITY_KEYWORDS:
if keyword in t_lower:
cap_key = keyword.replace(" ", "_")
if not caps.get(cap_key, True):
log_gap(cap_key, transcript,
f"User requested {keyword} capability")
# ── Step 2: Generate MINA's reply (single llama call) ─────────────────
reply_body = {
"prompt": build_prompt(transcript, agent, emotion),
"n_predict": 40,
"temperature": 0.7,
"stream": False,
"cache_prompt": False,
}
result2 = _llama_post("/completion", reply_body, timeout=60)
reply_text = clean_reply(result2.get("content", ""))
match = re.search(r'^(.*?[.!?])', reply_text)
if match:
reply_text = match.group(1).strip()
if not reply_text:
reply_text = "Aiya, I didn't quite catch that lah. Can you say again?"
reply_text = append_resources(reply_text, agent, transcript)
# ── VAD scores from calibrated lookup ─────────────────────────────────
vad = EMOTION_VAD.get(emotion, EMOTION_VAD["neutral"])
return jsonify({
"reply": reply_text,
"content": reply_text,
"transcript": transcript,
"emotion": emotion,
"valence": vad["valence"],
"arousal": vad["arousal"],
"dominance": vad["dominance"],
"agent": agent,
"risk": risk,
"elapsed": round(time.time() - t0, 2),
})
except Exception:
traceback.print_exc()
return _err_response()
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=" * 56)
print(" MINA Bridge v3.0 β€” IMDA NMLP ATxSG 2026")
print(f" Port : {PORT}")
print(f" llama.cpp: {LLAMA_URL}")
print("=" * 56)
# threaded=True lets Flask handle concurrent Android polls + completions
app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)