# app.py — KC Robot AI v7.5 FINAL (auto-model-select, bilingual, TTS fallback, Telegram, ESP32 endpoints) # Secrets expected (HF Space -> Settings -> Secrets): # HF_TOKEN (required) # HF_MODEL (optional preferred model id like "mistralai/Mistral-7B-Instruct-v0.3") # TELEGRAM_TOKEN (optional) # TELEGRAM_CHAT_ID (optional) # Optional: # HF_TTS_MODEL, HF_STT_MODEL # # Minimal deps: flask, requests, gTTS, python-multipart # Keep requirements.txt consistent with these packages. import os import io import sys import time import json import uuid import logging import threading from typing import Any, List, Tuple, Optional from pathlib import Path import requests from flask import Flask, request, jsonify, Response, render_template_string # gTTS fallback try: from gtts import gTTS _HAS_GTTS = True except Exception: _HAS_GTTS = False # ---------------- logging ---------------- logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger("kcrobot.v7.5") # ---------------- env / secrets ---------------- HF_TOKEN = os.getenv("HF_TOKEN", "").strip() HF_MODEL = os.getenv("HF_MODEL", "").strip() # preferred model (may be empty) HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "").strip() # optional HF TTS model HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "").strip() PORT = int(os.getenv("PORT", 7860)) HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} # ---------------- tmp dir ---------------- TMPDIR = Path("/tmp/kcrobot") if os.name != "nt" else Path.cwd() / "tmp_kcrobot" TMPDIR.mkdir(parents=True, exist_ok=True) CONV_LOG = TMPDIR / "conversation_log.jsonl" # ---------------- in-memory ---------------- CONVERSATION: List[Tuple[str, str]] = [] DISPLAY_BUFFER: List[str] = [] DISPLAY_LIMIT = 6 def push_display(line: str): global DISPLAY_BUFFER DISPLAY_BUFFER.append(line) if len(DISPLAY_BUFFER) > DISPLAY_LIMIT: DISPLAY_BUFFER = DISPLAY_BUFFER[-DISPLAY_LIMIT:] def save_conv(user: str, bot: str): try: with open(CONV_LOG, "a", encoding="utf-8") as f: f.write(json.dumps({"time": time.time(), "user": user, "bot": bot}, ensure_ascii=False) + "\n") except Exception: logger.exception("save_conv failed") # ---------------- small helpers ---------------- def clean_text(text: Any) -> str: if text is None: return "" s = str(text) import re s = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]+', ' ', s) s = re.sub(r'\s+', ' ', s).strip() return s VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụứừửữựýỳỷỹỵ") def detect_language(text: str) -> str: t = (text or "").lower() for ch in t: if ch in VI_CHARS: return "vi" return "en" # ---------------- Hugging Face HTTP helpers ---------------- def hf_post_json(model_id: str, payload: dict, timeout: int = 90) -> requests.Response: if not HF_TOKEN: raise RuntimeError("HF_TOKEN not configured in Secrets") url = f"https://api-inference.huggingface.co/models/{model_id}" headers = dict(HF_HEADERS) headers["Content-Type"] = "application/json" return requests.post(url, headers=headers, json=payload, timeout=timeout) def hf_post_bytes(model_id: str, data: bytes, content_type: str = "application/octet-stream", timeout: int = 180) -> requests.Response: if not HF_TOKEN: raise RuntimeError("HF_TOKEN not configured in Secrets") url = f"https://api-inference.huggingface.co/models/{model_id}" headers = dict(HF_HEADERS) headers["Content-Type"] = content_type return requests.post(url, headers=headers, data=data, timeout=timeout) def parse_hf_text_output(obj: Any) -> str: try: if isinstance(obj, dict): for k in ("generated_text","text","answer"): if k in obj: return obj.get(k,"") if "choices" in obj and isinstance(obj["choices"], list) and obj["choices"]: c0 = obj["choices"][0] return c0.get("text") or c0.get("message",{}).get("content","") or str(c0) return json.dumps(obj, ensure_ascii=False) if isinstance(obj, list) and obj: first = obj[0] if isinstance(first, dict): for k in ("generated_text","text"): if k in first: return first.get(k,"") return str(first) return str(obj) except Exception: logger.exception("parse_hf_text_output") return str(obj) # ---------------- Auto model finder ---------------- # Candidate fallback list — you can extend DEFAULT_MODEL_CANDIDATES = [ "mistralai/Mistral-7B-Instruct-v0.3", "google/gemma-2b-it", "databricks/dolly-v2-3b", "tiiuae/falcon-7b-instruct", # may be private at times "facebook/blenderbot-400M-distill", # Vietnamese candidates (if public) "vinai/PhoGPT-4B", ] def test_model_working(model_id: str, sample_prompt: str = "Xin chào, bạn khỏe không?") -> Tuple[bool, dict]: """ Return (ok, response_short_info) ok True if got status 200 and some textual output parseable """ try: payload = {"inputs": sample_prompt, "parameters": {"max_new_tokens": 20}, "options": {"wait_for_model": True}} r = hf_post_json(model_id, payload, timeout=30) info = {"status": r.status_code, "text": (r.text[:500] if r.text else "")} if r.status_code == 200: # try parse try: j = r.json() out = parse_hf_text_output(j) if out and len(out.strip())>0: info["result"] = out return True, info except Exception: # maybe non-json; if text length present, accept minimally if r.text and len(r.text.strip())>0: info["result"] = r.text return True, info return False, info except requests.exceptions.RequestException as e: logger.warning("test_model_working request exception for %s: %s", model_id, e) return False, {"error": str(e)} except Exception: logger.exception("test_model_working unexpected") return False, {"error": "unexpected"} def auto_select_model(preferred: Optional[str] = None) -> Optional[str]: """ Try preferred model first. If fail, iterate DEFAULT_MODEL_CANDIDATES Returns selected model id or None. """ tried = [] if preferred: logger.info("Auto-check preferred model: %s", preferred) ok, info = test_model_working(preferred) tried.append((preferred, ok, info)) if ok: logger.info("Preferred model OK: %s", preferred) return preferred logger.info("Preferred model not usable or not provided, scanning candidates...") for m in DEFAULT_MODEL_CANDIDATES: if m == preferred: continue logger.info("Testing candidate: %s", m) ok, info = test_model_working(m) tried.append((m, ok, info)) if ok: logger.info("Selected fallback model: %s", m) return m # nothing found logger.warning("Auto-select model found none usable. Tried: %s", [(t[0], t[1]) for t in tried]) return None # initial selected model (will be mutated at runtime) SELECTED_MODEL = HF_MODEL if HF_MODEL else None # ---------------- HF text / stt / tts wrappers using SELECTED_MODEL ---------------- def hf_text_generate(prompt: str, model_override: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: model = model_override or SELECTED_MODEL if not model: raise RuntimeError("No HF model selected") payload = {"inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True}} r = hf_post_json(model, payload, timeout=120) if r.status_code == 200: try: j = r.json() return parse_hf_text_output(j) except Exception: return r.text elif r.status_code == 403: raise RuntimeError("HF returned 403 (forbidden) — token or access rights issue") elif r.status_code == 404: raise RuntimeError("HF returned 404 (model not found) — check HF_MODEL or model access") else: raise RuntimeError(f"HF text gen returned {r.status_code}: {r.text[:300]}") def hf_stt_from_bytes(audio_bytes: bytes, model_override: Optional[str] = None) -> str: model = model_override or HF_STT_MODEL if not model: raise RuntimeError("HF_STT_MODEL not configured") r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) if r.status_code == 200: try: j = r.json() if isinstance(j, dict) and "text" in j: return j["text"] return parse_hf_text_output(j) except Exception: return r.text else: raise RuntimeError(f"HF STT returned {r.status_code}: {r.text[:300]}") def hf_tts_get_bytes(text: str, model_override: Optional[str] = None) -> bytes: text = text.strip() if not text: raise RuntimeError("TTS text empty") model = model_override or HF_TTS_MODEL if model: # Try HF TTS model first try: payload = {"inputs": text} r = hf_post_json(model, payload, timeout=120) if r.status_code == 200 and r.content: return r.content # fallback to content or parse if r.status_code == 200: try: j = r.json() return parse_hf_text_output(j).encode("utf-8") except Exception: return r.content logger.warning("HF TTS returned %s: %s", r.status_code, r.text[:200]) except Exception: logger.exception("HF TTS call failed") # fallback to gTTS if present if _HAS_GTTS: try: lang = "vi" if detect_language(text) == "vi" else "en" tts = gTTS(text=text, lang=lang) bio = io.BytesIO() tts.write_to_fp(bio) bio.seek(0) return bio.read() except Exception: logger.exception("gTTS fallback failed") raise RuntimeError("gTTS fallback failed") raise RuntimeError("No TTS available (no HF_TTS_MODEL and gTTS not installed)") # ---------------- Telegram helpers ---------------- def telegram_send_message(chat_id: str, text: str) -> bool: if not TELEGRAM_TOKEN or not chat_id: return False try: url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" r = requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=8) if r.status_code != 200: logger.warning("Telegram sendMessage failed %s: %s", r.status_code, r.text[:300]) return False return True except Exception: logger.exception("telegram_send_message") return False def telegram_send_audio(chat_id: str, audio_bytes: bytes, filename: str = "reply.mp3") -> bool: if not TELEGRAM_TOKEN or not chat_id: return False try: url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendAudio" files = {"audio": (filename, io.BytesIO(audio_bytes), "audio/mpeg")} data = {"chat_id": chat_id} r = requests.post(url, files=files, data=data, timeout=30) if r.status_code != 200: logger.warning("Telegram sendAudio failed %s: %s", r.status_code, r.text[:300]) return False return True except Exception: logger.exception("telegram_send_audio") return False # ---------------- Telegram poller (background) ---------------- def telegram_poller_loop(): if not TELEGRAM_TOKEN: logger.info("Telegram token not set; poller disabled") return logger.info("Starting Telegram poller") base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" offset = None while True: try: params = {"timeout": 30} if offset: params["offset"] = offset r = requests.get(base + "/getUpdates", params=params, timeout=35) if r.status_code != 200: logger.warning("Telegram getUpdates failed: %s", r.status_code) time.sleep(2); continue j = r.json() for upd in j.get("result", []): offset = upd.get("update_id", 0) + 1 msg = upd.get("message") or {} chat = msg.get("chat", {}) chat_id = str(chat.get("id")) text = (msg.get("text") or "").strip() if not text: continue logger.info("TG msg %s: %s", chat_id, text[:200]) lower = text.lower() if lower.startswith("/ask "): q = text[5:].strip() try: ans = hf_text_generate(q) except Exception as e: ans = f"[HF error] {e}" try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) except Exception: logger.exception("tg reply failed") elif lower.startswith("/say "): phrase = text[5:].strip() try: audio = hf_tts_get_bytes(phrase) telegram_send_audio(chat_id, audio, filename="say.mp3") except Exception: logger.exception("tg say failed") elif lower.startswith("/status"): try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot v7.5 running"}, timeout=10) except Exception: pass else: try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask | /say | /status"}, timeout=10) except Exception: pass except Exception: logger.exception("telegram poller crashed, sleeping 3s") time.sleep(3) if TELEGRAM_TOKEN: try: t = threading.Thread(target=telegram_poller_loop, daemon=True) t.start() except Exception: logger.exception("start telegram thread failed") # ---------------- Flask app & endpoints ---------------- app = Flask(__name__) INDEX_HTML = """ KC Robot AI v7.5

🤖 KC Robot AI v7.5 — Final (Auto-model)

Model: loading... | Telegram: checking...

Diagnostics:
""" @app.route("/", methods=["GET"]) def index(): return render_template_string(INDEX_HTML) @app.route("/health", methods=["GET"]) def health(): return jsonify({ "ok": True, "hf_token": bool(HF_TOKEN), "hf_model": SELECTED_MODEL or HF_MODEL or "", "hf_tts_model": HF_TTS_MODEL, "hf_stt_model": HF_STT_MODEL, "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID), "conv_len": len(CONVERSATION), "display_len": len(DISPLAY_BUFFER) }) @app.route("/ask", methods=["POST"]) def route_ask(): try: j = request.get_json(force=True) or {} text = clean_text(j.get("text","") or "") lang = (j.get("lang","auto") or "auto") if not text: return jsonify({"error":"no text"}), 400 if lang == "vi": prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\n\n{text}" elif lang == "en": prompt = f"You are a helpful assistant. Answer in clear English, concise:\n\n{text}" else: prompt = f"You are a bilingual assistant (Vietnamese/English). Answer in the same language as the user, clearly and concisely:\n\n{text}" try: ans = hf_text_generate(prompt) except Exception as e: logger.exception("hf_text_generate failed") return jsonify({"error": str(e)}), 500 CONVERSATION.append((text, ans)) save_conv(text, ans) push_display("YOU: " + (text[:60])) push_display("BOT: " + (ans[:60] if isinstance(ans,str) else str(ans)[:60])) # notify telegram if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: try: telegram_send_message(TELEGRAM_CHAT_ID, f"You: {text}\nBot: {ans[:300]}") except Exception: logger.exception("telegram notify failed") return jsonify({"answer": ans}) except Exception as e: logger.exception("route_ask exception") return jsonify({"error": str(e)}), 500 @app.route("/tts", methods=["POST"]) def route_tts(): try: j = request.get_json(force=True) or {} text = clean_text(j.get("text","") or "") if not text: return jsonify({"error":"no text"}), 400 try: audio_bytes = hf_tts_get_bytes(text) except Exception as e: logger.exception("tts generation failed") return jsonify({"error": str(e)}), 500 return Response(audio_bytes, mimetype="audio/mpeg") except Exception as e: logger.exception("route_tts exception") return jsonify({"error": str(e)}), 500 @app.route("/stt", methods=["POST"]) def route_stt(): try: if "file" in request.files: f = request.files["file"] audio_bytes = f.read() else: audio_bytes = request.get_data() if not audio_bytes: return jsonify({"error":"no audio provided"}), 400 try: txt = hf_stt_from_bytes(audio_bytes) except Exception as e: logger.exception("STT failed") return jsonify({"error": str(e)}), 500 CONVERSATION.append((f"[voice] {txt}", "")) save_conv(f"[voice] {txt}", "") push_display("VOICE: " + (txt[:60] if isinstance(txt,str) else str(txt))) return jsonify({"text": txt}) except Exception as e: logger.exception("route_stt exception") return jsonify({"error": str(e)}), 500 @app.route("/presence", methods=["POST"]) def route_presence(): """ ESP32 radar should POST JSON {"note":"..."}. Server returns greeting audio (if TTS available) or JSON greeting. Also sends telegram notification if configured. """ try: j = request.get_json(force=True) or {} note = clean_text(j.get("note","Có người phía trước") or "Có người phía trước") greeting = f"Xin chào! {note}" CONVERSATION.append(("__presence__", greeting)) save_conv("__presence__", greeting) push_display("RADAR: " + note[:60]) if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: try: telegram_send_message(TELEGRAM_CHAT_ID, f"⚠️ Robot: Phát hiện người - {note}") except Exception: logger.exception("telegram notify failed") try: audio_bytes = hf_tts_get_bytes(greeting) return Response(audio_bytes, mimetype="audio/mpeg") except Exception: return jsonify({"greeting": greeting}) except Exception as e: logger.exception("route_presence exception") return jsonify({"error": str(e)}), 500 @app.route("/display", methods=["GET"]) def route_display(): return jsonify({"lines": DISPLAY_BUFFER.copy(), "conv_len": len(CONVERSATION)}) @app.route("/model_check", methods=["GET"]) def model_check(): """ Attempt to verify HF_MODEL / select fallback, returns diagnostic JSON. """ global SELECTED_MODEL # first try current HF_MODEL results = {} try: # if SELECTED_MODEL already set and seems good, return if SELECTED_MODEL: results["selected_model"] = SELECTED_MODEL ok, info = test_model_working(SELECTED_MODEL) results["selected_ok"] = ok results["selected_info"] = info return jsonify(results) # else try auto-select with preference HF_MODEL chosen = auto_select_model(HF_MODEL if HF_MODEL else None) if chosen: SELECTED_MODEL = chosen results["selected_model"] = chosen results["note"] = "Model selected" return jsonify(results) else: results["error"] = "No usable model found in candidates" return jsonify(results), 404 except Exception as e: logger.exception("model_check failed") return jsonify({"error": str(e)}), 500 @app.route("/config", methods=["GET","POST"]) def config(): """ GET returns current config. POST JSON can change HF_MODEL / HF_TTS_MODEL / HF_STT_MODEL at runtime (temporary). Example: {"hf_model":"...", "hf_tts_model":"..."} """ global HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, SELECTED_MODEL if request.method == "GET": return jsonify({"hf_model": HF_MODEL, "hf_tts_model": HF_TTS_MODEL, "hf_stt_model": HF_STT_MODEL, "selected_model": SELECTED_MODEL}) try: j = request.get_json(force=True) or {} changed = {} if "hf_model" in j: HF_MODEL = j["hf_model"] changed["hf_model"] = HF_MODEL SELECTED_MODEL = None # force re-evaluation if "hf_tts_model" in j: HF_TTS_MODEL = j["hf_tts_model"] changed["hf_tts_model"] = HF_TTS_MODEL if "hf_stt_model" in j: HF_STT_MODEL = j["hf_stt_model"] changed["hf_stt_model"] = HF_STT_MODEL return jsonify({"changed": changed}) except Exception as e: logger.exception("config post failed") return jsonify({"error": str(e)}), 500 # ---------------- startup auto model selection ---------------- def startup_model_check(): global SELECTED_MODEL logger.info("Startup: checking/selecting model...") try: chosen = auto_select_model(HF_MODEL if HF_MODEL else None) if chosen: SELECTED_MODEL = chosen logger.info("Startup: selected model = %s", SELECTED_MODEL) else: logger.warning("Startup: no usable HF model found yet. Use /model_check or set HF_MODEL secret.") except Exception: logger.exception("startup_model_check failed") # run startup check in a thread so Flask starts quickly t_start = threading.Thread(target=startup_model_check, daemon=True) t_start.start() # ---------------- run app ---------------- if __name__ == "__main__": logger.info("KC Robot AI v7.5 starting. PREF_HF_MODEL=%s HF_TTS=%s HF_STT=%s Telegram=%s", HF_MODEL or "(none)", HF_TTS_MODEL or "(none)", HF_STT_MODEL or "(none)", bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID)) app.run(host="0.0.0.0", port=PORT)