Spaces:
Sleeping
Sleeping
| # app.py — KC Robot AI v7.5 FINAL (auto-model-select, bilingual, TTS fallback, Telegram, ESP32 endpoints) | |
| # Secrets expected (HF Space -> Settings -> Secrets): | |
| # HF_TOKEN (required) | |
| # HF_MODEL (optional preferred model id like "mistralai/Mistral-7B-Instruct-v0.3") | |
| # TELEGRAM_TOKEN (optional) | |
| # TELEGRAM_CHAT_ID (optional) | |
| # Optional: | |
| # HF_TTS_MODEL, HF_STT_MODEL | |
| # | |
| # Minimal deps: flask, requests, gTTS, python-multipart | |
| # Keep requirements.txt consistent with these packages. | |
| import os | |
| import io | |
| import sys | |
| import time | |
| import json | |
| import uuid | |
| import logging | |
| import threading | |
| from typing import Any, List, Tuple, Optional | |
| from pathlib import Path | |
| import requests | |
| from flask import Flask, request, jsonify, Response, render_template_string | |
| # gTTS fallback | |
| try: | |
| from gtts import gTTS | |
| _HAS_GTTS = True | |
| except Exception: | |
| _HAS_GTTS = False | |
| # ---------------- logging ---------------- | |
| logging.basicConfig(stream=sys.stdout, level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s: %(message)s") | |
| logger = logging.getLogger("kcrobot.v7.5") | |
| # ---------------- env / secrets ---------------- | |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() | |
| HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-base").strip() # preferred model (may be empty) | |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "").strip() # optional HF TTS model | |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() | |
| TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "").strip() | |
| PORT = int(os.getenv("PORT", 7860)) | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} | |
| # ---------------- tmp dir ---------------- | |
| TMPDIR = Path("/tmp/kcrobot") if os.name != "nt" else Path.cwd() / "tmp_kcrobot" | |
| TMPDIR.mkdir(parents=True, exist_ok=True) | |
| CONV_LOG = TMPDIR / "conversation_log.jsonl" | |
| # ---------------- in-memory ---------------- | |
| CONVERSATION: List[Tuple[str, str]] = [] | |
| DISPLAY_BUFFER: List[str] = [] | |
| DISPLAY_LIMIT = 6 | |
| def push_display(line: str): | |
| global DISPLAY_BUFFER | |
| DISPLAY_BUFFER.append(line) | |
| if len(DISPLAY_BUFFER) > DISPLAY_LIMIT: | |
| DISPLAY_BUFFER = DISPLAY_BUFFER[-DISPLAY_LIMIT:] | |
| def save_conv(user: str, bot: str): | |
| try: | |
| with open(CONV_LOG, "a", encoding="utf-8") as f: | |
| f.write(json.dumps({"time": time.time(), "user": user, "bot": bot}, ensure_ascii=False) + "\n") | |
| except Exception: | |
| logger.exception("save_conv failed") | |
| # ---------------- small helpers ---------------- | |
| def clean_text(text: Any) -> str: | |
| if text is None: | |
| return "" | |
| s = str(text) | |
| import re | |
| s = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]+', ' ', s) | |
| s = re.sub(r'\s+', ' ', s).strip() | |
| return s | |
| VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụứừửữựýỳỷỹỵ") | |
| def detect_language(text: str) -> str: | |
| t = (text or "").lower() | |
| for ch in t: | |
| if ch in VI_CHARS: | |
| return "vi" | |
| return "en" | |
| # ---------------- Hugging Face HTTP helpers ---------------- | |
| def hf_post_json(model_id: str, payload: dict, timeout: int = 90) -> requests.Response: | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN not configured in Secrets") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = dict(HF_HEADERS) | |
| headers["Content-Type"] = "application/json" | |
| return requests.post(url, headers=headers, json=payload, timeout=timeout) | |
| def hf_post_bytes(model_id: str, data: bytes, content_type: str = "application/octet-stream", timeout: int = 180) -> requests.Response: | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN not configured in Secrets") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = dict(HF_HEADERS) | |
| headers["Content-Type"] = content_type | |
| return requests.post(url, headers=headers, data=data, timeout=timeout) | |
| def parse_hf_text_output(obj: Any) -> str: | |
| try: | |
| if isinstance(obj, dict): | |
| for k in ("generated_text","text","answer"): | |
| if k in obj: | |
| return obj.get(k,"") | |
| if "choices" in obj and isinstance(obj["choices"], list) and obj["choices"]: | |
| c0 = obj["choices"][0] | |
| return c0.get("text") or c0.get("message",{}).get("content","") or str(c0) | |
| return json.dumps(obj, ensure_ascii=False) | |
| if isinstance(obj, list) and obj: | |
| first = obj[0] | |
| if isinstance(first, dict): | |
| for k in ("generated_text","text"): | |
| if k in first: | |
| return first.get(k,"") | |
| return str(first) | |
| return str(obj) | |
| except Exception: | |
| logger.exception("parse_hf_text_output") | |
| return str(obj) | |
| # ---------------- Auto model finder ---------------- | |
| # Candidate fallback list — you can extend | |
| DEFAULT_MODEL_CANDIDATES = [ | |
| "google/flan-t5-base" | |
| ] | |
| def test_model_working(model_id: str, sample_prompt: str = "Xin chào, bạn khỏe không?") -> Tuple[bool, dict]: | |
| """ | |
| Return (ok, response_short_info) | |
| ok True if got status 200 and some textual output parseable | |
| """ | |
| try: | |
| payload = {"inputs": sample_prompt, "parameters": {"max_new_tokens": 20}, "options": {"wait_for_model": True}} | |
| r = hf_post_json(model_id, payload, timeout=30) | |
| info = {"status": r.status_code, "text": (r.text[:500] if r.text else "")} | |
| if r.status_code == 200: | |
| # try parse | |
| try: | |
| j = r.json() | |
| out = parse_hf_text_output(j) | |
| if out and len(out.strip())>0: | |
| info["result"] = out | |
| return True, info | |
| except Exception: | |
| # maybe non-json; if text length present, accept minimally | |
| if r.text and len(r.text.strip())>0: | |
| info["result"] = r.text | |
| return True, info | |
| return False, info | |
| except requests.exceptions.RequestException as e: | |
| logger.warning("test_model_working request exception for %s: %s", model_id, e) | |
| return False, {"error": str(e)} | |
| except Exception: | |
| logger.exception("test_model_working unexpected") | |
| return False, {"error": "unexpected"} | |
| def auto_select_model(preferred: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Try preferred model first. If fail, iterate DEFAULT_MODEL_CANDIDATES | |
| Returns selected model id or None. | |
| """ | |
| tried = [] | |
| if preferred: | |
| logger.info("Auto-check preferred model: %s", preferred) | |
| ok, info = test_model_working(preferred) | |
| tried.append((preferred, ok, info)) | |
| if ok: | |
| logger.info("Preferred model OK: %s", preferred) | |
| return preferred | |
| logger.info("Preferred model not usable or not provided, scanning candidates...") | |
| for m in DEFAULT_MODEL_CANDIDATES: | |
| if m == preferred: | |
| continue | |
| logger.info("Testing candidate: %s", m) | |
| ok, info = test_model_working(m) | |
| tried.append((m, ok, info)) | |
| if ok: | |
| logger.info("Selected fallback model: %s", m) | |
| return m | |
| # nothing found | |
| logger.warning("Auto-select model found none usable. Tried: %s", [(t[0], t[1]) for t in tried]) | |
| return None | |
| # initial selected model (will be mutated at runtime) | |
| SELECTED_MODEL = HF_MODEL if HF_MODEL else None | |
| # ---------------- HF text / stt / tts wrappers using SELECTED_MODEL ---------------- | |
| def hf_text_generate(prompt: str, model_override: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: | |
| model = model_override or SELECTED_MODEL | |
| if not model: | |
| raise RuntimeError("No HF model selected") | |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True}} | |
| r = hf_post_json(model, payload, timeout=120) | |
| if r.status_code == 200: | |
| try: | |
| j = r.json() | |
| return parse_hf_text_output(j) | |
| except Exception: | |
| return r.text | |
| elif r.status_code == 403: | |
| raise RuntimeError("HF returned 403 (forbidden) — token or access rights issue") | |
| elif r.status_code == 404: | |
| raise RuntimeError("HF returned 404 (model not found) — check HF_MODEL or model access") | |
| else: | |
| raise RuntimeError(f"HF text gen returned {r.status_code}: {r.text[:300]}") | |
| def hf_stt_from_bytes(audio_bytes: bytes, model_override: Optional[str] = None) -> str: | |
| model = model_override or HF_STT_MODEL | |
| if not model: | |
| raise RuntimeError("HF_STT_MODEL not configured") | |
| r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) | |
| if r.status_code == 200: | |
| try: | |
| j = r.json() | |
| if isinstance(j, dict) and "text" in j: | |
| return j["text"] | |
| return parse_hf_text_output(j) | |
| except Exception: | |
| return r.text | |
| else: | |
| raise RuntimeError(f"HF STT returned {r.status_code}: {r.text[:300]}") | |
| def hf_tts_get_bytes(text: str, model_override: Optional[str] = None) -> bytes: | |
| text = text.strip() | |
| if not text: | |
| raise RuntimeError("TTS text empty") | |
| model = model_override or HF_TTS_MODEL | |
| if model: | |
| # Try HF TTS model first | |
| try: | |
| payload = {"inputs": text} | |
| r = hf_post_json(model, payload, timeout=120) | |
| if r.status_code == 200 and r.content: | |
| return r.content | |
| # fallback to content or parse | |
| if r.status_code == 200: | |
| try: | |
| j = r.json() | |
| return parse_hf_text_output(j).encode("utf-8") | |
| except Exception: | |
| return r.content | |
| logger.warning("HF TTS returned %s: %s", r.status_code, r.text[:200]) | |
| except Exception: | |
| logger.exception("HF TTS call failed") | |
| # fallback to gTTS if present | |
| if _HAS_GTTS: | |
| try: | |
| lang = "vi" if detect_language(text) == "vi" else "en" | |
| tts = gTTS(text=text, lang=lang) | |
| bio = io.BytesIO() | |
| tts.write_to_fp(bio) | |
| bio.seek(0) | |
| return bio.read() | |
| except Exception: | |
| logger.exception("gTTS fallback failed") | |
| raise RuntimeError("gTTS fallback failed") | |
| raise RuntimeError("No TTS available (no HF_TTS_MODEL and gTTS not installed)") | |
| # ---------------- Telegram helpers ---------------- | |
| def telegram_send_message(chat_id: str, text: str) -> bool: | |
| if not TELEGRAM_TOKEN or not chat_id: | |
| return False | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
| r = requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=8) | |
| if r.status_code != 200: | |
| logger.warning("Telegram sendMessage failed %s: %s", r.status_code, r.text[:300]) | |
| return False | |
| return True | |
| except Exception: | |
| logger.exception("telegram_send_message") | |
| return False | |
| def telegram_send_audio(chat_id: str, audio_bytes: bytes, filename: str = "reply.mp3") -> bool: | |
| if not TELEGRAM_TOKEN or not chat_id: | |
| return False | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendAudio" | |
| files = {"audio": (filename, io.BytesIO(audio_bytes), "audio/mpeg")} | |
| data = {"chat_id": chat_id} | |
| r = requests.post(url, files=files, data=data, timeout=30) | |
| if r.status_code != 200: | |
| logger.warning("Telegram sendAudio failed %s: %s", r.status_code, r.text[:300]) | |
| return False | |
| return True | |
| except Exception: | |
| logger.exception("telegram_send_audio") | |
| return False | |
| # ---------------- Telegram poller (background) ---------------- | |
| def telegram_poller_loop(): | |
| if not TELEGRAM_TOKEN: | |
| logger.info("Telegram token not set; poller disabled") | |
| return | |
| logger.info("Starting Telegram poller") | |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" | |
| offset = None | |
| while True: | |
| try: | |
| params = {"timeout": 30} | |
| if offset: params["offset"] = offset | |
| r = requests.get(base + "/getUpdates", params=params, timeout=35) | |
| if r.status_code != 200: | |
| logger.warning("Telegram getUpdates failed: %s", r.status_code) | |
| time.sleep(2); continue | |
| j = r.json() | |
| for upd in j.get("result", []): | |
| offset = upd.get("update_id", 0) + 1 | |
| msg = upd.get("message") or {} | |
| chat = msg.get("chat", {}) | |
| chat_id = str(chat.get("id")) | |
| text = (msg.get("text") or "").strip() | |
| if not text: continue | |
| logger.info("TG msg %s: %s", chat_id, text[:200]) | |
| lower = text.lower() | |
| if lower.startswith("/ask "): | |
| q = text[5:].strip() | |
| try: | |
| ans = hf_text_generate(q) | |
| except Exception as e: | |
| ans = f"[HF error] {e}" | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) | |
| except Exception: | |
| logger.exception("tg reply failed") | |
| elif lower.startswith("/say "): | |
| phrase = text[5:].strip() | |
| try: | |
| audio = hf_tts_get_bytes(phrase) | |
| telegram_send_audio(chat_id, audio, filename="say.mp3") | |
| except Exception: | |
| logger.exception("tg say failed") | |
| elif lower.startswith("/status"): | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot v7.5 running"}, timeout=10) | |
| except Exception: | |
| pass | |
| else: | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) | |
| except Exception: | |
| pass | |
| except Exception: | |
| logger.exception("telegram poller crashed, sleeping 3s") | |
| time.sleep(3) | |
| if TELEGRAM_TOKEN: | |
| try: | |
| t = threading.Thread(target=telegram_poller_loop, daemon=True) | |
| t.start() | |
| except Exception: | |
| logger.exception("start telegram thread failed") | |
| # ---------------- Flask app & endpoints ---------------- | |
| app = Flask(__name__) | |
| INDEX_HTML = """ | |
| <!doctype html> | |
| <html> | |
| <head> | |
| <meta charset="utf-8"> | |
| <title>KC Robot AI v7.5</title> | |
| <meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <style> | |
| body{font-family:Arial,Helvetica,sans-serif;margin:12px;color:#111} | |
| .box{max-width:960px;margin:auto} | |
| textarea{width:100%;height:90px;padding:10px;font-size:16px;border-radius:8px;border:1px solid #ddd} | |
| button{padding:10px 14px;margin:6px 4px;border-radius:8px;background:#0b74de;color:white;border:none;cursor:pointer;font-weight:700} | |
| #chat{border:1px solid #eee;padding:10px;height:360px;overflow:auto;background:#fafafa;border-radius:8px} | |
| .you{color:#0b63d6;margin:6px 0} | |
| .bot{color:#0b8a3f;margin:6px 0} | |
| .small{font-size:13px;color:#666} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="box"> | |
| <h2>🤖 KC Robot AI v7.5 — Final (Auto-model)</h2> | |
| <div class="small">Model: <span id="modelName">loading...</span> | Telegram: <span id="tgstatus">checking...</span></div> | |
| <textarea id="userText" placeholder="Nhập tiếng Việt hoặc English..."></textarea> | |
| <div> | |
| <select id="lang"><option value="auto">Auto</option><option value="vi">Vietnamese</option><option value="en">English</option></select> | |
| <button onclick="send()">Gửi</button> | |
| <button onclick="playLast()">Phát âm</button> | |
| <button onclick="clearChat()">Xóa</button> | |
| </div> | |
| <div id="chat"></div> | |
| <div style="margin-top:10px"> | |
| <input type="file" id="afile" accept="audio/*"><button onclick="uploadAudio()">Upload → STT</button> | |
| </div> | |
| <hr> | |
| <div class="small">Diagnostics: <button onclick="modelCheck()">Kiểm tra model</button><span id="diag"></span></div> | |
| </div> | |
| <script> | |
| let lastAnswer = ""; | |
| async function loadStatus(){ try{ let r=await fetch('/health'); let j=await r.json(); document.getElementById('modelName').innerText=j.hf_model||'(not set)'; document.getElementById('tgstatus').innerText=j.telegram ? 'enabled' : 'disabled'; }catch(e){ console.log(e); } } | |
| function escapeHtml(s){ return (s+'').replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>'); } | |
| function appendYou(t){ document.getElementById('chat').innerHTML += '<div class="you"><b>You:</b> '+escapeHtml(t)+'</div>'; scroll(); } | |
| function appendBot(t){ document.getElementById('chat').innerHTML += '<div class="bot"><b>Robot:</b> '+escapeHtml(t)+'</div>'; scroll(); } | |
| function scroll(){ let c=document.getElementById('chat'); c.scrollTop = c.scrollHeight; } | |
| async function send(){ | |
| let t=document.getElementById('userText').value.trim(); if(!t) return; appendYou(t); document.getElementById('userText').value=''; | |
| let lang=document.getElementById('lang').value; | |
| try{ | |
| let r=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:t,lang:lang})}); | |
| let j=await r.json(); | |
| if(j.answer){ lastAnswer=j.answer; appendBot(j.answer); } else appendBot('[error] '+JSON.stringify(j)); | |
| }catch(e){ appendBot('[network error] '+e); } | |
| } | |
| async function playLast(){ | |
| if(!lastAnswer) return alert('Chưa có câu trả lời'); | |
| try{ | |
| let r=await fetch('/tts',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:lastAnswer})}); | |
| if(!r.ok){ alert('TTS lỗi'); return; } | |
| const blob = await r.blob(); | |
| const url=URL.createObjectURL(blob); | |
| const audio=new Audio(url); audio.play(); | |
| }catch(e){ alert('Play error: '+e); } | |
| } | |
| async function uploadAudio(){ | |
| const f=document.getElementById('afile').files[0]; if(!f) return alert('Chọn file audio'); | |
| const fd=new FormData(); fd.append('file', f); | |
| const r=await fetch('/stt',{method:'POST', body: fd}); | |
| const j=await r.json(); | |
| if(j.text){ appendYou('[voice] '+j.text); } else appendYou('[stt error] '+JSON.stringify(j)); | |
| } | |
| function clearChat(){ document.getElementById('chat').innerHTML=''; lastAnswer=''; } | |
| async function modelCheck(){ | |
| document.getElementById('diag').innerText=' checking...'; | |
| try{ | |
| let r=await fetch('/model_check'); | |
| let j=await r.json(); | |
| document.getElementById('diag').innerText = ' ' + JSON.stringify(j).slice(0,200); | |
| loadStatus(); | |
| }catch(e){ document.getElementById('diag').innerText=' error'; } | |
| } | |
| loadStatus(); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| def index(): | |
| return render_template_string(INDEX_HTML) | |
| def health(): | |
| return jsonify({ | |
| "ok": True, | |
| "hf_token": bool(HF_TOKEN), | |
| "hf_model": SELECTED_MODEL or HF_MODEL or "", | |
| "hf_tts_model": HF_TTS_MODEL, | |
| "hf_stt_model": HF_STT_MODEL, | |
| "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID), | |
| "conv_len": len(CONVERSATION), | |
| "display_len": len(DISPLAY_BUFFER) | |
| }) | |
| def route_ask(): | |
| try: | |
| j = request.get_json(force=True) or {} | |
| text = clean_text(j.get("text","") or "") | |
| lang = (j.get("lang","auto") or "auto") | |
| if not text: | |
| return jsonify({"error":"no text"}), 400 | |
| if lang == "vi": | |
| prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\n\n{text}" | |
| elif lang == "en": | |
| prompt = f"You are a helpful assistant. Answer in clear English, concise:\n\n{text}" | |
| else: | |
| prompt = f"You are a bilingual assistant (Vietnamese/English). Answer in the same language as the user, clearly and concisely:\n\n{text}" | |
| try: | |
| ans = hf_text_generate(prompt) | |
| except Exception as e: | |
| logger.exception("hf_text_generate failed") | |
| return jsonify({"error": str(e)}), 500 | |
| CONVERSATION.append((text, ans)) | |
| save_conv(text, ans) | |
| push_display("YOU: " + (text[:60])) | |
| push_display("BOT: " + (ans[:60] if isinstance(ans,str) else str(ans)[:60])) | |
| # notify telegram | |
| if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: | |
| try: | |
| telegram_send_message(TELEGRAM_CHAT_ID, f"You: {text}\nBot: {ans[:300]}") | |
| except Exception: | |
| logger.exception("telegram notify failed") | |
| return jsonify({"answer": ans}) | |
| except Exception as e: | |
| logger.exception("route_ask exception") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_tts(): | |
| try: | |
| j = request.get_json(force=True) or {} | |
| text = clean_text(j.get("text","") or "") | |
| if not text: | |
| return jsonify({"error":"no text"}), 400 | |
| try: | |
| audio_bytes = hf_tts_get_bytes(text) | |
| except Exception as e: | |
| logger.exception("tts generation failed") | |
| return jsonify({"error": str(e)}), 500 | |
| return Response(audio_bytes, mimetype="audio/mpeg") | |
| except Exception as e: | |
| logger.exception("route_tts exception") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_stt(): | |
| try: | |
| if "file" in request.files: | |
| f = request.files["file"] | |
| audio_bytes = f.read() | |
| else: | |
| audio_bytes = request.get_data() | |
| if not audio_bytes: | |
| return jsonify({"error":"no audio provided"}), 400 | |
| try: | |
| txt = hf_stt_from_bytes(audio_bytes) | |
| except Exception as e: | |
| logger.exception("STT failed") | |
| return jsonify({"error": str(e)}), 500 | |
| CONVERSATION.append((f"[voice] {txt}", "")) | |
| save_conv(f"[voice] {txt}", "") | |
| push_display("VOICE: " + (txt[:60] if isinstance(txt,str) else str(txt))) | |
| return jsonify({"text": txt}) | |
| except Exception as e: | |
| logger.exception("route_stt exception") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_presence(): | |
| """ | |
| ESP32 radar should POST JSON {"note":"..."}. | |
| Server returns greeting audio (if TTS available) or JSON greeting. | |
| Also sends telegram notification if configured. | |
| """ | |
| try: | |
| j = request.get_json(force=True) or {} | |
| note = clean_text(j.get("note","Có người phía trước") or "Có người phía trước") | |
| greeting = f"Xin chào! {note}" | |
| CONVERSATION.append(("__presence__", greeting)) | |
| save_conv("__presence__", greeting) | |
| push_display("RADAR: " + note[:60]) | |
| if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: | |
| try: | |
| telegram_send_message(TELEGRAM_CHAT_ID, f"⚠️ Robot: Phát hiện người - {note}") | |
| except Exception: | |
| logger.exception("telegram notify failed") | |
| try: | |
| audio_bytes = hf_tts_get_bytes(greeting) | |
| return Response(audio_bytes, mimetype="audio/mpeg") | |
| except Exception: | |
| return jsonify({"greeting": greeting}) | |
| except Exception as e: | |
| logger.exception("route_presence exception") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_display(): | |
| return jsonify({"lines": DISPLAY_BUFFER.copy(), "conv_len": len(CONVERSATION)}) | |
| def model_check(): | |
| """ | |
| Attempt to verify HF_MODEL / select fallback, returns diagnostic JSON. | |
| """ | |
| global SELECTED_MODEL | |
| # first try current HF_MODEL | |
| results = {} | |
| try: | |
| # if SELECTED_MODEL already set and seems good, return | |
| if SELECTED_MODEL: | |
| results["selected_model"] = SELECTED_MODEL | |
| ok, info = test_model_working(SELECTED_MODEL) | |
| results["selected_ok"] = ok | |
| results["selected_info"] = info | |
| return jsonify(results) | |
| # else try auto-select with preference HF_MODEL | |
| chosen = auto_select_model(HF_MODEL if HF_MODEL else None) | |
| if chosen: | |
| SELECTED_MODEL = chosen | |
| results["selected_model"] = chosen | |
| results["note"] = "Model selected" | |
| return jsonify(results) | |
| else: | |
| results["error"] = "No usable model found in candidates" | |
| return jsonify(results), 404 | |
| except Exception as e: | |
| logger.exception("model_check failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def config(): | |
| """ | |
| GET returns current config. | |
| POST JSON can change HF_MODEL / HF_TTS_MODEL / HF_STT_MODEL at runtime (temporary). | |
| Example: {"hf_model":"...", "hf_tts_model":"..."} | |
| """ | |
| global HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, SELECTED_MODEL | |
| if request.method == "GET": | |
| return jsonify({"hf_model": HF_MODEL, "hf_tts_model": HF_TTS_MODEL, "hf_stt_model": HF_STT_MODEL, "selected_model": SELECTED_MODEL}) | |
| try: | |
| j = request.get_json(force=True) or {} | |
| changed = {} | |
| if "hf_model" in j: | |
| HF_MODEL = j["hf_model"] | |
| changed["hf_model"] = HF_MODEL | |
| SELECTED_MODEL = None # force re-evaluation | |
| if "hf_tts_model" in j: | |
| HF_TTS_MODEL = j["hf_tts_model"] | |
| changed["hf_tts_model"] = HF_TTS_MODEL | |
| if "hf_stt_model" in j: | |
| HF_STT_MODEL = j["hf_stt_model"] | |
| changed["hf_stt_model"] = HF_STT_MODEL | |
| return jsonify({"changed": changed}) | |
| except Exception as e: | |
| logger.exception("config post failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---------------- startup auto model selection ---------------- | |
| def startup_model_check(): | |
| global SELECTED_MODEL | |
| logger.info("Startup: checking/selecting model...") | |
| try: | |
| chosen = auto_select_model(HF_MODEL if HF_MODEL else None) | |
| if chosen: | |
| SELECTED_MODEL = chosen | |
| logger.info("Startup: selected model = %s", SELECTED_MODEL) | |
| else: | |
| logger.warning("Startup: no usable HF model found yet. Use /model_check or set HF_MODEL secret.") | |
| except Exception: | |
| logger.exception("startup_model_check failed") | |
| # run startup check in a thread so Flask starts quickly | |
| t_start = threading.Thread(target=startup_model_check, daemon=True) | |
| t_start.start() | |
| # ---------------- run app ---------------- | |
| if __name__ == "__main__": | |
| logger.info("KC Robot AI v7.5 starting. PREF_HF_MODEL=%s HF_TTS=%s HF_STT=%s Telegram=%s", | |
| HF_MODEL or "(none)", HF_TTS_MODEL or "(none)", HF_STT_MODEL or "(none)", bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID)) | |
| app.run(host="0.0.0.0", port=PORT) |