kc40ai / app.py
kcrobot40's picture
initial commit
351e5b0 verified
# app.py — KC Robot AI v7.2 FINAL
# Flask cloud brain for ESP32 robot.
# Secrets (set in HF Space Settings -> Secrets):
# HF_TOKEN, HF_MODEL (or HF_MODEL_VI + HF_MODEL_VI_BACKUP + HF_MODEL_EN),
# HF_TTS_MODEL, HF_STT_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHAT_ID
#
# Endpoints:
# - GET / -> simple UI
# - GET /health
# - POST /ask {text,lang?}
# - POST /tts {text}
# - POST /stt multipart(file) or raw bytes
# - POST /presence {note?} -> returns audio (if possible) and notifies telegram
# - GET /display
# - GET /config
#
# Fallback and robust handling for 400/403/404 implemented.
import os, io, json, time, logging, threading
from typing import List, Any, Tuple, Optional
from flask import Flask, request, jsonify, send_file, render_template_string
import requests
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kcrobot.v7.2")
# Load secrets / env
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
HF_MODEL = os.getenv("HF_MODEL", "").strip() # general single model id (preferred)
HF_MODEL_VI = os.getenv("HF_MODEL_VI", "VietnamAIHub/Vietnamese_llama2_7B_8K_SFT_General_domain").strip()
HF_MODEL_VI_BACKUP = os.getenv("HF_MODEL_VI_BACKUP", "TheBloke/vietnamese-llama2-7B-40GB-AWQ,TheBloke/vietnamese-llama2-7B-40GB-GPTQ").strip()
HF_MODEL_EN = os.getenv("HF_MODEL_EN", "meta-llama/Llama-3.1-8B-Instruct").strip()
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "NguyenManhTuan/VietnameseTTS_FPT_AI_Female").strip()
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip()
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "").strip()
HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# in-memory state
CONVERSATION = []
DISPLAY = []
DISPLAY_LIMIT = 6
def push_display(line: str):
DISPLAY.append(line)
if len(DISPLAY) > DISPLAY_LIMIT:
DISPLAY.pop(0)
# language detection (simple)
VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụứừửữựýỳỷỹỵ")
def detect_lang(text: str) -> str:
if not text: return "en"
for ch in text.lower():
if ch in VI_CHARS:
return "vi"
return "en"
def models_from_env() -> Tuple[List[str], List[str]]:
# returns (text_models_list, tts_models_list)
text_models = []
if HF_MODEL:
text_models = [HF_MODEL]
else:
text_models = [HF_MODEL_VI] + [m for m in [*map(str.strip, HF_MODEL_VI_BACKUP.split(","))] if m] + [HF_MODEL_EN]
tts_models = [HF_TTS_MODEL] if HF_TTS_MODEL else []
return text_models, tts_models
# Robust POST JSON to HF with fallback models
def hf_post_json_try(models: List[str], payload: dict, timeout: int = 90):
last_err = None
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN missing in Secrets.")
headers = dict(HF_HEADERS); headers["Content-Type"] = "application/json"
for model in models:
url = f"https://api-inference.huggingface.co/models/{model}"
try:
r = requests.post(url, headers=headers, json=payload, timeout=timeout)
except Exception as e:
last_err = f"network error {e}"
logger.warning("Network error for %s: %s", model, e)
continue
if r.status_code == 200:
try:
return model, r.json()
except Exception:
return model, r.content
if r.status_code in (401,403):
last_err = f"auth error {r.status_code} for {model}"
logger.warning(last_err)
continue
if r.status_code == 404:
last_err = f"not found 404 for {model}"
logger.warning(last_err)
continue
if r.status_code == 400:
last_err = f"bad request 400 for {model}"
logger.warning(last_err + " | " + r.text[:200])
continue
last_err = f"HTTP {r.status_code} for {model}: {r.text[:200]}"
logger.warning(last_err)
raise RuntimeError(f"All model attempts failed. Last error: {last_err}")
# POST bytes (STT / TTS) with fallback
def hf_post_bytes_try(models: List[str], data: bytes, content_type: str = "application/octet-stream", timeout: int = 120):
last_err = None
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN missing in Secrets.")
headers = dict(HF_HEADERS); headers["Content-Type"] = content_type
for model in models:
url = f"https://api-inference.huggingface.co/models/{model}"
try:
r = requests.post(url, headers=headers, data=data, timeout=timeout)
except Exception as e:
last_err = f"network error {e}"
logger.warning("Network error for %s: %s", model, e)
continue
if r.status_code == 200:
return model, r
if r.status_code in (401,403):
last_err = f"auth error {r.status_code} for {model}"
logger.warning(last_err)
continue
if r.status_code == 404:
last_err = f"not found 404 for {model}"
logger.warning(last_err)
continue
if r.status_code == 400:
last_err = f"bad request 400 for {model}: {r.text[:200]}"
logger.warning(last_err)
continue
last_err = f"HTTP {r.status_code} for {model}: {r.text[:200]}"
logger.warning(last_err)
raise RuntimeError(f"All byte-post attempts failed. Last error: {last_err}")
def parse_text_out(obj):
try:
if isinstance(obj, list) and obj and isinstance(obj[0], dict):
for k in ("generated_text","text"):
if k in obj[0]:
return obj[0][k]
return str(obj[0])
if isinstance(obj, dict):
for k in ("generated_text","text","summary_text"):
if k in obj:
return obj[k]
if "choices" in obj and isinstance(obj["choices"], list) and obj["choices"]:
c0 = obj["choices"][0]
return c0.get("text") or c0.get("message",{}).get("content","")
return json.dumps(obj)
if isinstance(obj,(bytes,bytearray)):
return obj.decode(errors="ignore")
return str(obj)
except Exception as e:
return f"[parse error] {e}"
def text_generate(prompt: str, lang_hint: str="auto", max_new_tokens:int=512, temperature:float=0.7):
text_models, _ = models_from_env()
models = []
if HF_MODEL:
models = [HF_MODEL]
else:
# auto ordering
if lang_hint == "vi" or (lang_hint=="auto" and detect_lang(prompt)=="vi"):
models = text_models
else:
# put EN front if not VI
models = [m for m in [HF_MODEL_EN] if m] + text_models
payload = {"inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True}}
model_used, out = hf_post_json_try(models, payload, timeout=120)
logger.info("text_generate used model: %s", model_used)
return parse_text_out(out)
def tts_bytes(text: str):
_, tts_models = models_from_env()
if not tts_models:
raise RuntimeError("No TTS model configured.")
payload = json.dumps({"inputs": text}).encode("utf-8")
model_used, resp = hf_post_bytes_try(tts_models, payload, content_type="application/json", timeout=120)
logger.info("tts used model: %s", model_used)
return resp.content
def stt_from_bytes(audio_bytes: bytes):
models = [HF_STT_MODEL] if HF_STT_MODEL else []
if not models:
raise RuntimeError("No STT model configured.")
model_used, resp = hf_post_bytes_try(models, audio_bytes, content_type="application/octet-stream", timeout=180)
logger.info("stt used model: %s", model_used)
try:
j = resp.json()
if isinstance(j, dict) and "text" in j:
return j["text"]
return parse_text_out(j)
except Exception:
return resp.text if hasattr(resp,"text") else str(resp)
# Telegram helper
def send_telegram(msg: str):
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
logger.debug("telegram not configured")
return False
try:
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
requests.post(url, json={"chat_id": TELEGRAM_CHAT_ID, "text": msg}, timeout=10)
return True
except Exception:
logger.exception("telegram send failed")
return False
# Flask app + UI
app = Flask(__name__)
INDEX_HTML = """
<!doctype html>
<html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>KC Robot AI v7.2</title>
<style>body{font-family:Arial;margin:12px}textarea,input{width:100%;padding:8px;margin:6px 0}button{padding:8px 12px;background:#1976d2;color:#fff;border:none;border-radius:6px}</style>
</head><body>
<h2>KC Robot AI v7.2 — Cloud Brain</h2>
<textarea id="q" rows="3" placeholder="Nhập tiếng Việt hoặc English..."></textarea><br>
<select id="lang"><option value="auto">Auto</option><option value="vi">Vietnamese</option><option value="en">English</option></select>
<button onclick="send()">Send</button> <button onclick="play()">Play TTS</button>
<div id="out" style="margin-top:14px"></div>
<script>
let lastAns="";
async function send(){
let t=document.getElementById('q').value; if(!t) return;
document.getElementById('out').innerHTML += '<div><b>You:</b> '+t+'</div>';
let res=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:t,lang:document.getElementById('lang').value})});
let j=await res.json();
if(j.answer){ lastAns=j.answer; document.getElementById('out').innerHTML += '<div><b>Robot:</b> '+j.answer+'</div>'; }
else document.getElementById('out').innerHTML += '<div><b>Error:</b> '+JSON.stringify(j)+'</div>';
}
async function play(){
if(!lastAns) return alert('No answer');
let r=await fetch('/tts',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:lastAns})});
if(!r.ok){ alert('TTS failed'); return;}
let b=await r.blob(); let url=URL.createObjectURL(b); let a=new Audio(url); a.play();
}
</script>
</body></html>
"""
@app.route("/", methods=["GET"])
def index():
return INDEX_HTML
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"ok": True,
"hf_token_set": bool(HF_TOKEN),
"hf_model": HF_MODEL or HF_MODEL_VI,
"tts_model": HF_TTS_MODEL,
"stt_model": HF_STT_MODEL,
"telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID),
})
@app.route("/ask", methods=["POST"])
def api_ask():
try:
j = request.get_json(force=True) or {}
text = (j.get("text","") or "").strip()
lang = (j.get("lang","auto") or "auto")
if not text:
return jsonify({"error":"no text"}), 400
if lang=="vi":
prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, lịch sự và ngắn gọn:\\n\\n" + text
elif lang=="en":
prompt = "You are a helpful assistant. Answer in English, concise:\\n\\n" + text
else:
prompt = "You are a bilingual assistant. Answer in the same language as the user:\\n\\n" + text
ans = text_generate(prompt, lang_hint=(lang if lang in ("vi","en") else "auto"))
CONVERSATION.append((text, ans))
push_display("YOU: " + text[:60])
push_display("BOT: " + (ans[:60] if isinstance(ans,str) else str(ans)[:60]))
# notify telegram
if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID:
try:
send_telegram(f"You: {text}\\nBot: {ans[:400]}")
except Exception:
logger.exception("telegram notify failed")
return jsonify({"answer": ans})
except Exception as e:
logger.exception("ask error")
return jsonify({"error": str(e)}), 500
@app.route("/tts", methods=["POST"])
def api_tts():
try:
j = request.get_json(force=True) or {}
text = (j.get("text","") or "").strip()
if not text:
return jsonify({"error":"no text"}), 400
audio_bytes = tts_bytes(text)
return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="tts.mp3")
except Exception as e:
logger.exception("tts error")
return jsonify({"error": str(e)}), 500
@app.route("/stt", methods=["POST"])
def api_stt():
try:
if "file" in request.files:
f = request.files["file"]; audio_bytes = f.read()
else:
audio_bytes = request.get_data()
if not audio_bytes:
return jsonify({"error":"no audio"}), 400
text = stt_from_bytes(audio_bytes)
CONVERSATION.append((f"[voice] {text}", ""))
push_display("Voice: " + (text[:60] if isinstance(text,str) else str(text)))
return jsonify({"text": text})
except Exception as e:
logger.exception("stt error")
return jsonify({"error": str(e)}), 500
@app.route("/presence", methods=["POST"])
def api_presence():
try:
j = request.get_json(force=True) or {}
note = (j.get("note","Có người phía trước") or "Có người phía trước")
greeting = f"Xin chào! {note}"
CONVERSATION.append(("__presence__", greeting))
push_display("RADAR: " + note[:60])
if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID:
try: send_telegram(f"⚠️ Robot: Phát hiện người - {note}")
except Exception: logger.exception("tg notify failed")
try:
audio_bytes = tts_bytes(greeting)
return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="presence.mp3")
except Exception:
return jsonify({"greeting": greeting})
except Exception as e:
logger.exception("presence error")
return jsonify({"error": str(e)}), 500
@app.route("/display", methods=["GET"])
def api_display():
return jsonify({"lines": DISPLAY.copy(), "conv_len": len(CONVERSATION)})
# run
if __name__ == "__main__":
logger.info("Starting KC Robot AI v7.2")
if not HF_TOKEN:
logger.warning("HF_TOKEN missing — add it to Space Secrets.")
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), debug=False)