Spaces:
Sleeping
Sleeping
| # app.py - KC Robot AI V4.1 (Full - FPT female TTS) | |
| # Full-feature Flask server: | |
| # - /ask (text) -> HF LLM | |
| # - /tts (text) -> HF TTS (default: NguyenManhTuan/VietnameseTTS_FPT_AI_Female) | |
| # - /stt (audio) -> HF STT (default: openai/whisper-small) | |
| # - /presence (radar event) -> greeting + Telegram notify | |
| # - /display -> OLED lines | |
| # - Web UI for quick test | |
| # - Telegram poller (background thread) to accept /ask, /say, /status | |
| # | |
| # Configuration via environment variables / Secrets in HF Space: | |
| # HF_API_TOKEN (required for HF inference) | |
| # HF_MODEL (optional, default google/flan-t5-large) | |
| # HF_TTS_MODEL (optional, default NguyenManhTuan/VietnameseTTS_FPT_AI_Female) | |
| # HF_STT_MODEL (optional, default openai/whisper-small) | |
| # TELEGRAM_TOKEN (optional) | |
| # TELEGRAM_CHATID (optional) | |
| # | |
| # Keep requirements minimal to improve HF Space stability: | |
| # flask, requests | |
| # | |
| # Important: set tokens in HF Space Settings -> Secrets (do not hardcode) | |
| import os | |
| import io | |
| import time | |
| import json | |
| import uuid | |
| import logging | |
| import threading | |
| from typing import Optional, List, Tuple | |
| from pathlib import Path | |
| import requests | |
| from flask import Flask, request, jsonify, send_file, render_template_string, abort | |
| # ----------------- Config & Logging ----------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("kcrobot.v4") | |
| app = Flask(__name__) | |
| # Directory for temporary files (tts audio) | |
| TMP_DIR = Path("/tmp/kcrobot") if os.name != "nt" else Path.cwd() / "tmp_kcrobot" | |
| TMP_DIR.mkdir(parents=True, exist_ok=True) | |
| # Environment / Secrets (set these in HF Space) | |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() | |
| HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large").strip() | |
| # Default FPT female Vietnamese TTS | |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "NguyenManhTuan/VietnameseTTS_FPT_AI_Female").strip() | |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() | |
| TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip() | |
| # Port (HF sets PORT env in runtime) | |
| PORT = int(os.getenv("PORT", os.getenv("SERVER_PORT", 7860))) | |
| if not HF_API_TOKEN: | |
| logger.warning("HF_API_TOKEN is not set — HF inference will fail until you add it in Secrets.") | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} | |
| # ----------------- In-memory buffers ----------------- | |
| CONV: List[Tuple[str, str]] = [] # list of (user, bot) | |
| DISPLAY_LINES: List[str] = [] # lines for OLED display | |
| def push_display(line: str, limit: int = 6): | |
| """Keep last `limit` lines for display.""" | |
| global DISPLAY_LINES | |
| DISPLAY_LINES.append(line) | |
| if len(DISPLAY_LINES) > limit: | |
| DISPLAY_LINES = DISPLAY_LINES[-limit:] | |
| # ----------------- Helper: Hugging Face inference ----------------- | |
| def hf_post_json(model_id: str, payload: dict, timeout: int = 120): | |
| """POST JSON to HF inference; return parsed JSON or raise.""" | |
| if not HF_API_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN missing (set in Secrets).") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = dict(HF_HEADERS) | |
| headers["Content-Type"] = "application/json" | |
| r = requests.post(url, headers=headers, json=payload, timeout=timeout) | |
| if not r.ok: | |
| logger.error("HF POST JSON error %s: %s", r.status_code, r.text[:400]) | |
| r.raise_for_status() | |
| try: | |
| return r.json() | |
| except Exception: | |
| return r.text | |
| def hf_post_bytes(model_id: str, data: bytes, content_type: str = "application/octet-stream", timeout: int = 180): | |
| """POST binary data (audio) to HF inference; return response object or raise.""" | |
| if not HF_API_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN missing (set in Secrets).") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = dict(HF_HEADERS) | |
| headers["Content-Type"] = content_type | |
| r = requests.post(url, headers=headers, data=data, timeout=timeout) | |
| if not r.ok: | |
| logger.error("HF POST bytes error %s: %s", r.status_code, r.text[:400]) | |
| r.raise_for_status() | |
| return r | |
| # ----------------- Text generation (LLM) ----------------- | |
| def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: | |
| model = model or HF_MODEL | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, | |
| "options": {"wait_for_model": True} | |
| } | |
| out = hf_post_json(model, payload, timeout=120) | |
| # parse common shapes | |
| if isinstance(out, list) and len(out) > 0: | |
| first = out[0] | |
| if isinstance(first, dict) and "generated_text" in first: | |
| return first["generated_text"] | |
| return str(first) | |
| if isinstance(out, dict): | |
| for k in ("generated_text", "text", "summary_text"): | |
| if k in out: | |
| return out[k] | |
| return json.dumps(out) | |
| return str(out) | |
| # ----------------- TTS (Text -> audio bytes) ----------------- | |
| def hf_tts_get_audio_bytes(text: str, model: Optional[str] = None) -> bytes: | |
| """Call HF TTS model and return audio bytes (commonly mp3 or wav).""" | |
| model = model or HF_TTS_MODEL | |
| payload = {"inputs": text} | |
| r = requests.post(f"https://api-inference.huggingface.co/models/{model}", headers={**HF_HEADERS, "Content-Type": "application/json"}, json=payload, timeout=120) | |
| if not r.ok: | |
| logger.error("HF TTS error %s: %s", r.status_code, r.text[:400]) | |
| r.raise_for_status() | |
| return r.content | |
| def save_tts_temp(audio_bytes: bytes, ext_hint: str = "mp3") -> str: | |
| """Save bytes to a temp file under TMP_DIR and return filename.""" | |
| fname = f"tts_{int(time.time())}_{uuid.uuid4().hex}.{ext_hint}" | |
| p = TMP_DIR / fname | |
| p.write_bytes(audio_bytes) | |
| return fname | |
| # ----------------- STT (audio bytes -> text) ----------------- | |
| def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: | |
| model = model or HF_STT_MODEL | |
| r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) | |
| # often returns {"text": "..."} | |
| try: | |
| j = r.json() | |
| if isinstance(j, dict) and "text" in j: | |
| return j["text"] | |
| if isinstance(j, list) and len(j) and isinstance(j[0], dict) and "text" in j[0]: | |
| return j[0]["text"] | |
| return str(j) | |
| except Exception: | |
| return r.text if hasattr(r, "text") else "" | |
| # ----------------- Endpoints for ESP32 / Web ----------------- | |
| def health(): | |
| return jsonify({ | |
| "ok": True, | |
| "hf_api_token": bool(HF_API_TOKEN), | |
| "hf_model": HF_MODEL, | |
| "hf_tts_model": HF_TTS_MODEL, | |
| "hf_stt_model": HF_STT_MODEL, | |
| "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHATID), | |
| "tmp_dir": str(TMP_DIR), | |
| }) | |
| def route_ask(): | |
| """ | |
| POST JSON: { "text": "...", "lang": "vi"|"en"|"auto" (optional) } | |
| Returns: { "answer": "..." } | |
| """ | |
| try: | |
| data = request.get_json(force=True) or {} | |
| text = (data.get("text") or "").strip() | |
| lang = (data.get("lang") or "auto").lower() | |
| if not text: | |
| return jsonify({"error": "no text"}), 400 | |
| # build bilingual instruction | |
| if lang == "vi": | |
| prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, ngắn gọn và lịch sự:\n\n{text}" | |
| elif lang == "en": | |
| prompt = f"You are a helpful assistant. Answer in clear English, concise:\n\n{text}" | |
| else: | |
| prompt = f"Bạn là trợ lý thông minh song ngữ (Vietnamese/English). Trả lời bằng ngôn ngữ phù hợp với câu hỏi:\n\n{text}" | |
| answer = hf_text_generate(prompt) | |
| # store conversation and display preview | |
| CONV.append((text, answer)) | |
| push_display("YOU: " + (text[:40])) | |
| push_display("BOT: " + (answer[:40])) | |
| return jsonify({"answer": answer}) | |
| except Exception as e: | |
| logger.exception("route_ask failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_tts(): | |
| """ | |
| POST JSON: { "text":"..." } | |
| Returns: audio bytes (audio/mpeg) - HF TTS output (mp3/wav) | |
| """ | |
| try: | |
| data = request.get_json(force=True) or {} | |
| text = (data.get("text") or "").strip() | |
| if not text: | |
| return jsonify({"error": "no text"}), 400 | |
| audio_bytes = hf_tts_get_audio_bytes(text) | |
| # Try to detect extension: if content-type present? HF sometimes returns mp3 bytes. | |
| # We'll send as audio/mpeg (mp3) which is widely supported by ESP32 players. | |
| return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="tts.mp3") | |
| except Exception as e: | |
| logger.exception("route_tts failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_stt(): | |
| """ | |
| Accepts multipart 'file' or raw audio bytes in body. | |
| Returns JSON: { "text": "recognized text" } | |
| """ | |
| try: | |
| if "file" in request.files: | |
| f = request.files["file"] | |
| audio_bytes = f.read() | |
| else: | |
| audio_bytes = request.get_data() or b"" | |
| if not audio_bytes: | |
| return jsonify({"error": "no audio"}), 400 | |
| text = hf_stt_from_bytes(audio_bytes) | |
| push_display("UserAudio: " + (text[:40])) | |
| return jsonify({"text": text}) | |
| except Exception as e: | |
| logger.exception("route_stt failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_presence(): | |
| """ | |
| ESP32 radar posts: JSON {"note": "..." } | |
| Server responds with greeting, and optionally sends Telegram alert. | |
| """ | |
| try: | |
| data = request.get_json(force=True) or {} | |
| note = data.get("note", "Có người tới") | |
| greeting = f"Xin chào! {note}" | |
| CONV.append(("__presence__", greeting)) | |
| push_display("RADAR: " + note[:40]) | |
| # Telegram notify | |
| if TELEGRAM_TOKEN and TELEGRAM_CHATID: | |
| try: | |
| send_telegram_message(f"⚠️ Robot: Phát hiện: {note}") | |
| except Exception: | |
| logger.exception("Telegram notify failed") | |
| return jsonify({"greeting": greeting}) | |
| except Exception as e: | |
| logger.exception("route_presence failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def route_display(): | |
| return jsonify({"lines": DISPLAY_LINES[-6:], "conv_len": len(CONV)}) | |
| # Serve tts files by filename if needed | |
| def serve_tts_file(fname): | |
| p = TMP_DIR / fname | |
| if not p.exists(): | |
| return abort(404) | |
| # guess mime | |
| mime = "audio/mpeg" if str(fname).lower().endswith(".mp3") else "audio/wav" | |
| return send_file(str(p), mimetype=mime) | |
| # ----------------- Simple Web UI for testing ----------------- | |
| INDEX_HTML = """ | |
| <!doctype html> | |
| <html> | |
| <head> | |
| <meta charset="utf-8"> | |
| <title>KC Robot AI V4.1</title> | |
| <meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <style> | |
| body{font-family:Arial,Helvetica, sans-serif; margin:12px; color:#111} | |
| textarea{width:100%; height:90px; padding:8px; font-size:16px} | |
| #chat{border:1px solid #ddd; padding:8px; height:260px; overflow:auto; background:#fbfbfb} | |
| button{padding:8px 12px; margin-top:8px; font-size:15px} | |
| </style> | |
| </head> | |
| <body> | |
| <h2>KC Robot AI V4.1 — Cloud Brain (FPT female)</h2> | |
| <div id="chat"></div> | |
| <textarea id="txt" placeholder="Nhập tiếng Việt hoặc English..."></textarea><br> | |
| <button onclick="ask()">Gửi (Ask)</button> | |
| <button onclick="playLast()">Phát TTS</button> | |
| <hr/> | |
| <input type="file" id="afile" accept="audio/*"><button onclick="uploadAudio()">Upload audio → STT</button> | |
| <hr/> | |
| <div id="log"></div> | |
| <script> | |
| window._lastAnswer = ""; | |
| async function ask(){ | |
| let t = document.getElementById('txt').value; | |
| if(!t) return; | |
| appendUser(t); | |
| let res = await fetch('/ask', {method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({text:t})}); | |
| let j = await res.json(); | |
| if(j.answer){ appendBot(j.answer); window._lastAnswer = j.answer; } | |
| else appendBot('[Error] ' + JSON.stringify(j)); | |
| } | |
| function appendUser(t){ document.getElementById('chat').innerHTML += '<div style="color:#006"><b>You:</b> '+escapeHtml(t)+'</div>'; scroll();} | |
| function appendBot(t){ document.getElementById('chat').innerHTML += '<div style="color:#080"><b>Robot:</b> '+escapeHtml(t)+'</div>'; scroll();} | |
| function escapeHtml(s){ return (s+'').replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>'); } | |
| function scroll(){ let c = document.getElementById('chat'); c.scrollTop = c.scrollHeight; } | |
| async function playLast(){ | |
| const txt = window._lastAnswer || document.getElementById('txt').value; | |
| if(!txt) return alert('Chưa có câu trả lời'); | |
| let r = await fetch('/tts',{method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({text: txt})}); | |
| if(!r.ok) return alert('TTS lỗi'); | |
| const b = await r.blob(); | |
| const url = URL.createObjectURL(b); | |
| const a = new Audio(url); | |
| a.play(); | |
| } | |
| async function uploadAudio(){ | |
| const f = document.getElementById('afile').files[0]; | |
| if(!f) return alert('Chọn file audio'); | |
| const fd = new FormData(); fd.append('file', f); | |
| const r = await fetch('/stt', {method:'POST', body: fd}); | |
| const j = await r.json(); | |
| if(j.text) appendUser('[voice] '+j.text); | |
| else appendUser('[stt error] '+JSON.stringify(j)); | |
| } | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| def index(): | |
| return render_template_string(INDEX_HTML) | |
| # ----------------- Telegram helpers & poller ----------------- | |
| def send_telegram_message(text: str) -> bool: | |
| if not TELEGRAM_TOKEN or not TELEGRAM_CHATID: | |
| logger.debug("Telegram not configured") | |
| return False | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
| r = requests.post(url, json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10) | |
| if not r.ok: | |
| logger.warning("Telegram send failed: %s %s", r.status_code, r.text) | |
| return False | |
| return True | |
| except Exception: | |
| logger.exception("send_telegram_message exception") | |
| return False | |
| def telegram_poll_loop(): | |
| """Long-polling loop to fetch updates and respond to simple commands.""" | |
| if not TELEGRAM_TOKEN: | |
| logger.info("telegram_poll_loop: TELEGRAM_TOKEN not set, exiting poller.") | |
| return | |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" | |
| offset = None | |
| logger.info("telegram_poll_loop: starting.") | |
| while True: | |
| try: | |
| params = {"timeout": 30} | |
| if offset: | |
| params["offset"] = offset | |
| r = requests.get(base + "/getUpdates", params=params, timeout=35) | |
| if not r.ok: | |
| logger.warning("telegram getUpdates failed: %s", r.status_code) | |
| time.sleep(2) | |
| continue | |
| j = r.json() | |
| for upd in j.get("result", []): | |
| offset = upd["update_id"] + 1 | |
| msg = upd.get("message") or {} | |
| chat = msg.get("chat", {}) | |
| chat_id = chat.get("id") | |
| text = (msg.get("text") or "").strip() | |
| if not text: | |
| continue | |
| logger.info("TG msg %s: %s", chat_id, text) | |
| lower = text.lower() | |
| if lower.startswith("/ask "): | |
| q = text[5:].strip() | |
| try: | |
| ans = hf_text_generate(q) | |
| except Exception as e: | |
| ans = f"[HF error] {e}" | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) | |
| except Exception: | |
| logger.exception("tg reply failed") | |
| elif lower.startswith("/say "): | |
| tts_text = text[5:].strip() | |
| try: | |
| audio = hf_tts_get_audio_bytes(tts_text) | |
| files = {"audio": ("reply.mp3", audio, "audio/mpeg")} | |
| requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30) | |
| except Exception: | |
| logger.exception("tg say failed") | |
| elif lower.startswith("/status"): | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot AI is running."}, timeout=10) | |
| except Exception: | |
| pass | |
| else: | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) | |
| except Exception: | |
| pass | |
| except Exception: | |
| logger.exception("telegram_poll_loop exception, sleeping 3s") | |
| time.sleep(3) | |
| def start_background_tasks(): | |
| # start telegram poller thread (if token provided) | |
| if TELEGRAM_TOKEN: | |
| t = threading.Thread(target=telegram_poll_loop, daemon=True) | |
| t.start() | |
| logger.info("Started Telegram poller thread.") | |
| else: | |
| logger.info("Telegram token not provided; poller disabled.") | |
| def _startup(): | |
| start_background_tasks() | |
| # ----------------- Run ----------------- | |
| if __name__ == "__main__": | |
| logger.info("Starting KC Robot AI V4.1 (FPT female TTS).") | |
| start_background_tasks() | |
| app.run(host="0.0.0.0", port=PORT) | |