Spaces:
Runtime error
Runtime error
| # app.py — KC Robot AI v5.5 FINAL | |
| # Flask server for Hugging Face Space | |
| # - Requirements: see requirements.txt | |
| # - Secrets expected: HF_API_TOKEN (required), optional: HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHATID | |
| import os | |
| import io | |
| import time | |
| import json | |
| import base64 | |
| import threading | |
| import logging | |
| from typing import Optional | |
| from pathlib import Path | |
| import requests | |
| from flask import Flask, request, jsonify, render_template_string | |
| # Fallback TTS | |
| from gtts import gTTS | |
| # Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("kcrobot.v5.5.final") | |
| app = Flask(__name__) | |
| # Config / Secrets (set in Space -> Settings -> Secrets) | |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() | |
| HF_MODEL = os.getenv("HF_MODEL", "bkai-foundation-models/vietnamese-llama2-7b").strip() | |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "doanthang/vietTTS-southern-female").strip() # optional public HF TTS | |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() # optional | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() | |
| TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip() | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} | |
| # Temp storage for history | |
| TMP_DIR = Path("/tmp/kcrobot") | |
| TMP_DIR.mkdir(parents=True, exist_ok=True) | |
| HISTORY_FILE = TMP_DIR / "history.json" | |
| def read_history(): | |
| try: | |
| if HISTORY_FILE.exists(): | |
| with open(HISTORY_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| logger.exception("read_history") | |
| return [] | |
| def append_history(user_text, bot_text): | |
| rec = {"user": user_text, "bot": bot_text, "ts": time.time()} | |
| data = read_history() | |
| data.append(rec) | |
| try: | |
| with open(HISTORY_FILE, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception: | |
| logger.exception("append_history") | |
| def clear_history(): | |
| try: | |
| if HISTORY_FILE.exists(): | |
| HISTORY_FILE.unlink() | |
| except Exception: | |
| logger.exception("clear_history") | |
| # Language detection heuristic (Vietnamese characters) | |
| VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụýỳỷỹỵ") | |
| def detect_lang(text: str) -> str: | |
| if not text: | |
| return "en" | |
| for ch in text.lower(): | |
| if ch in VI_CHARS: | |
| return "vi" | |
| return "en" | |
| # ---------------- Hugging Face helpers ---------------- | |
| def hf_post_json(model_id: str, payload: dict, timeout: int = 120): | |
| if not HF_API_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN not set in Space Secrets.") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = {**HF_HEADERS, "Content-Type": "application/json"} | |
| r = requests.post(url, headers=headers, json=payload, timeout=timeout) | |
| if not r.ok: | |
| logger.warning("HF json POST %s returned %s: %s", model_id, r.status_code, r.text[:300]) | |
| r.raise_for_status() | |
| try: | |
| return r.json() | |
| except Exception: | |
| return r.content | |
| def hf_post_bytes(model_id: str, bytes_data: bytes, content_type: str = "application/octet-stream", timeout: int = 180): | |
| if not HF_API_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN not set in Space Secrets.") | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = dict(HF_HEADERS) | |
| headers["Content-Type"] = content_type | |
| r = requests.post(url, headers=headers, data=bytes_data, timeout=timeout) | |
| if not r.ok: | |
| logger.warning("HF bytes POST %s returned %s: %s", model_id, r.status_code, r.text[:300]) | |
| r.raise_for_status() | |
| return r | |
| def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: | |
| model = model or HF_MODEL | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, | |
| "options": {"wait_for_model": True} | |
| } | |
| out = hf_post_json(model, payload, timeout=120) | |
| # parse common shapes | |
| try: | |
| if isinstance(out, list) and len(out) and isinstance(out[0], dict): | |
| return out[0].get("generated_text") or out[0].get("text") or str(out[0]) | |
| if isinstance(out, dict): | |
| if "generated_text" in out: | |
| return out.get("generated_text") | |
| if "text" in out: | |
| return out.get("text") | |
| # some models return choices... | |
| if "choices" in out and isinstance(out["choices"], list) and out["choices"]: | |
| c = out["choices"][0] | |
| return c.get("text") or c.get("message", {}).get("content", "") or str(c) | |
| return str(out) | |
| except Exception: | |
| logger.exception("hf_text_generate parse") | |
| return str(out) | |
| def hf_tts_bytes(text: str, model: Optional[str] = None) -> Optional[bytes]: | |
| model = model or HF_TTS_MODEL | |
| if not model: | |
| return None | |
| try: | |
| payload = {"inputs": text} | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| r = requests.post(url, headers={**HF_HEADERS, "Content-Type": "application/json"}, json=payload, timeout=120) | |
| if r.ok: | |
| return r.content | |
| else: | |
| logger.warning("hf_tts_bytes returned %s: %s", r.status_code, r.text[:200]) | |
| return None | |
| except Exception: | |
| logger.exception("hf_tts_bytes") | |
| return None | |
| def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: | |
| model = model or HF_STT_MODEL | |
| r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) | |
| try: | |
| j = r.json() | |
| if isinstance(j, dict) and "text" in j: | |
| return j["text"] | |
| if isinstance(j, list) and len(j) and isinstance(j[0], dict) and "text" in j[0]: | |
| return j[0]["text"] | |
| return str(j) | |
| except Exception: | |
| return r.text if hasattr(r, "text") else "" | |
| # ---------------- TTS fallback using gTTS ---------------- | |
| def tts_gtts_base64(text: str, lang: str = "vi") -> str: | |
| try: | |
| tts = gTTS(text=text, lang=lang) | |
| bio = io.BytesIO() | |
| tts.write_to_fp(bio) | |
| bio.seek(0) | |
| return base64.b64encode(bio.read()).decode("ascii") | |
| except Exception: | |
| logger.exception("tts_gtts_base64 failed") | |
| return "" | |
| def tts_get_audio_for_text(text: str, detected_lang: str = "vi"): | |
| # 1. try HF TTS model if configured | |
| audio_bytes = None | |
| if HF_TTS_MODEL: | |
| audio_bytes = hf_tts_bytes(text, HF_TTS_MODEL) | |
| if audio_bytes: | |
| return {"audio_base64": base64.b64encode(audio_bytes).decode("ascii"), "mime": "audio/mpeg"} | |
| # 2. fallback to gTTS for vi/en | |
| lang = "vi" if detected_lang == "vi" else "en" | |
| b64 = tts_gtts_base64(text, lang=lang) | |
| if b64: | |
| return {"audio_base64": b64, "mime": "audio/mpeg"} | |
| return {"audio_base64": "", "mime": ""} | |
| # ---------------- Telegram ---------------- | |
| def send_telegram_message(text: str): | |
| if not (TELEGRAM_TOKEN and TELEGRAM_CHATID): | |
| return False | |
| try: | |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
| r = requests.post(url, json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10) | |
| if not r.ok: | |
| logger.warning("Telegram send failed: %s %s", r.status_code, r.text[:200]) | |
| return r.ok | |
| except Exception: | |
| logger.exception("send_telegram_message") | |
| return False | |
| def telegram_poll_loop(): | |
| if not TELEGRAM_TOKEN: | |
| logger.info("telegram poll disabled") | |
| return | |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" | |
| offset = None | |
| logger.info("Starting telegram poller") | |
| while True: | |
| try: | |
| params = {"timeout": 30} | |
| if offset: | |
| params["offset"] = offset | |
| r = requests.get(base + "/getUpdates", params=params, timeout=35) | |
| if not r.ok: | |
| time.sleep(2) | |
| continue | |
| j = r.json() | |
| for upd in j.get("result", []): | |
| offset = upd.get("update_id", 0) + 1 | |
| msg = upd.get("message") or {} | |
| chat = msg.get("chat", {}) | |
| chat_id = chat.get("id") | |
| text = (msg.get("text") or "").strip() | |
| if not text: | |
| continue | |
| logger.info("TG msg %s: %s", chat_id, text) | |
| low = text.lower() | |
| if low.startswith("/ask "): | |
| q = text[5:].strip() | |
| ans = hf_text_generate(q) | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) | |
| except Exception: | |
| logger.exception("telegram reply failed") | |
| elif low.startswith("/say "): | |
| phrase = text[5:].strip() | |
| # TTS and send audio | |
| try: | |
| audio = hf_tts_bytes(phrase) or base64.b64decode(tts_gtts_base64(phrase, lang="vi" if detect_lang(phrase)=="vi" else "en")) | |
| files = {"audio": ("say.mp3", audio, "audio/mpeg")} | |
| requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30) | |
| except Exception: | |
| logger.exception("telegram say failed") | |
| elif low.startswith("/status"): | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot brain running."}, timeout=10) | |
| except Exception: | |
| pass | |
| else: | |
| try: | |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) | |
| except Exception: | |
| pass | |
| except Exception: | |
| logger.exception("telegram poll loop error") | |
| time.sleep(3) | |
| # start telegram poller thread | |
| if TELEGRAM_TOKEN: | |
| try: | |
| t = threading.Thread(target=telegram_poll_loop, daemon=True) | |
| t.start() | |
| except Exception: | |
| logger.exception("start telegram thread failed") | |
| # ---------------- Web UI HTML ---------------- | |
| INDEX_HTML = """ | |
| <!doctype html><html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1"> | |
| <title>KC Robot AI v5.5 Final</title> | |
| <style> | |
| body{font-family:Arial;background:#06111a;color:#dff; padding:12px} | |
| .container{max-width:980px;margin:auto} | |
| #chat{background:#04101a;padding:10px;border-radius:8px;height:420px;overflow:auto;border:1px solid #223344} | |
| .user{color:#bfe7ff;text-align:right;margin:6px} | |
| .bot{color:#dfffdc;text-align:left;margin:6px} | |
| .controls{display:flex;gap:8px;margin-top:8px} | |
| input[type=text]{flex:1;padding:10px;border-radius:8px;border:1px solid #223344;background:#021427;color:#e6eef6} | |
| button{padding:10px 12px;border-radius:8px;border:none;background:#0ea5a4;color:#fff;cursor:pointer} | |
| small{color:#99a0b0} | |
| </style></head><body> | |
| <div class="container"> | |
| <h2>🤖 KC Robot AI v5.5 — Final (Miền Nam voice, song ngữ)</h2> | |
| <div id="chat"></div> | |
| <div class="controls"> | |
| <input id="txt" placeholder="Gõ câu hỏi (VN/EN) hoặc bấm Ghi..." type="text"/> | |
| <button id="sendBtn">Gửi</button> | |
| <button id="recBtn">🎙 Ghi</button> | |
| <button id="greetBtn">▶ Chào</button> | |
| <button id="historyBtn">🗂 Lịch sử</button> | |
| </div> | |
| <audio id="player" controls style="width:100%;margin-top:10px"></audio> | |
| <p><small>Secrets: HF_API_TOKEN (required). Optionals: HF_MODEL, HF_TTS_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHATID</small></p> | |
| </div> | |
| <script> | |
| let mediaRecorder, audioChunks=[]; | |
| const chat=document.getElementById('chat'), player=document.getElementById('player'); | |
| function appendUser(t){ chat.innerHTML += '<div class="user"><b>You:</b> '+escapeHtml(t)+'</div>'; chat.scrollTop = chat.scrollHeight; } | |
| function appendBot(t){ chat.innerHTML += '<div class="bot"><b>Robot:</b> '+escapeHtml(t)+'</div>'; chat.scrollTop = chat.scrollHeight; } | |
| function escapeHtml(s){ return String(s).replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>'); } | |
| document.getElementById('sendBtn').onclick = async ()=>{ | |
| const v = document.getElementById('txt').value.trim(); if(!v) return; | |
| appendUser(v); document.getElementById('txt').value=''; | |
| const res = await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({text:v})}); | |
| const j = await res.json(); | |
| const ans = j.answer || j.error || 'No answer'; | |
| appendBot(ans); | |
| if(j.audio_base64){ | |
| const blob = base64ToBlob(j.audio_base64, j.mime || 'audio/mpeg'); | |
| const url = URL.createObjectURL(blob); player.src = url; player.play(); | |
| } | |
| }; | |
| document.getElementById('greetBtn').onclick = async ()=>{ | |
| const r = await fetch('/presence',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({note:'Xin chào chủ nhân'})}); | |
| const j = await r.json(); | |
| appendBot(j.greeting || j.error || ''); | |
| if(j.audio_base64){ const blob = base64ToBlob(j.audio_base64, j.mime||'audio/mpeg'); player.src = URL.createObjectURL(blob); player.play(); } | |
| else if(j.music_url){ player.src = j.music_url; player.play(); } | |
| }; | |
| document.getElementById('historyBtn').onclick = async ()=>{ | |
| const r = await fetch('/history'); const j = await r.json(); chat.innerHTML=''; j.forEach(it=>{ appendUser(it.user); appendBot(it.bot); }); | |
| }; | |
| document.getElementById('recBtn').onclick = async ()=>{ | |
| if(mediaRecorder && mediaRecorder.state === 'recording'){ mediaRecorder.stop(); return; } | |
| if(!navigator.mediaDevices) return alert('No mic support'); | |
| try{ | |
| const stream = await navigator.mediaDevices.getUserMedia({audio:true}); | |
| mediaRecorder = new MediaRecorder(stream); | |
| audioChunks=[]; | |
| mediaRecorder.ondataavailable = e => audioChunks.push(e.data); | |
| mediaRecorder.onstop = async ()=>{ | |
| const blob = new Blob(audioChunks, {type:'audio/webm'}); | |
| const fd = new FormData(); fd.append('file', blob, 'rec.webm'); | |
| const r = await fetch('/stt',{method:'POST', body: fd}); | |
| const j = await r.json(); | |
| if(j.text){ | |
| appendUser('[voice] '+ j.text); | |
| const res = await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({text: j.text})}); | |
| const aj = await res.json(); const ans = aj.answer || aj.error || 'No answer'; | |
| appendBot(ans); | |
| if(aj.audio_base64){ const blob2 = base64ToBlob(aj.audio_base64, aj.mime||'audio/mpeg'); player.src = URL.createObjectURL(blob2); player.play();} | |
| } else { appendBot('[STT error] '+JSON.stringify(j)); } | |
| }; | |
| mediaRecorder.start(); document.getElementById('recBtn').textContent='■ Dừng'; | |
| } catch(e){ alert('Mic error: '+e); } | |
| }; | |
| function base64ToBlob(b64, mime){ const bytes = atob(b64); let len = bytes.length; const buf = new Uint8Array(len); for(let i=0;i<len;i++) buf[i]=bytes.charCodeAt(i); return new Blob([buf], {type:mime}); } | |
| </script> | |
| </body></html> | |
| """ | |
| # ---------------- Endpoints ---------------- | |
| def index(): | |
| return render_template_string(INDEX_HTML) | |
| def get_config(): | |
| return jsonify({ | |
| "hf_token": bool(HF_API_TOKEN), | |
| "hf_model": HF_MODEL, | |
| "hf_tts_model": HF_TTS_MODEL, | |
| "hf_stt_model": HF_STT_MODEL, | |
| "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHATID) | |
| }) | |
| def ask_route(): | |
| data = request.get_json(force=True, silent=True) or {} | |
| text = (data.get("text") or "").strip() | |
| if not text: | |
| return jsonify({"error":"no text"}), 400 | |
| lang = detect_lang(text) | |
| if lang == "vi": | |
| prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\\n\\n{text}" | |
| else: | |
| prompt = f"You are a helpful assistant. Answer in clear English:\\n\\n{text}" | |
| try: | |
| answer = hf_text_generate(prompt) | |
| except Exception as e: | |
| logger.exception("hf_text_generate error") | |
| return jsonify({"error": str(e)}), 500 | |
| append_history(text, answer) | |
| # prepare audio | |
| tts = tts_get_audio_for_text(answer, detected_lang=lang) | |
| result = {"answer": answer} | |
| result.update(tts) | |
| return jsonify(result) | |
| def tts_route(): | |
| data = request.get_json(force=True, silent=True) or {} | |
| text = (data.get("text") or "").strip() | |
| if not text: | |
| return jsonify({"error":"no text"}), 400 | |
| lang = detect_lang(text) | |
| return jsonify(tts_get_audio_for_text(text, detected_lang=lang)) | |
| def stt_route(): | |
| try: | |
| if "file" in request.files: | |
| f = request.files["file"] | |
| audio_bytes = f.read() | |
| else: | |
| audio_bytes = request.get_data() or b"" | |
| if not audio_bytes: | |
| return jsonify({"error":"no audio"}), 400 | |
| try: | |
| txt = hf_stt_from_bytes(audio_bytes) | |
| except Exception as e: | |
| logger.exception("hf_stt failed") | |
| return jsonify({"error": str(e)}), 500 | |
| return jsonify({"text": txt}) | |
| except Exception: | |
| logger.exception("stt_route") | |
| return jsonify({"error":"stt internal error"}), 500 | |
| def presence_route(): | |
| data = request.get_json(force=True, silent=True) or {} | |
| note = (data.get("note") or "Có người đến gần robot").strip() | |
| greeting_vi = f"Xin chào! {note}" | |
| greeting_en = "Hello! Someone is near the robot." | |
| combined = f"{greeting_vi}\\n{greeting_en}" | |
| append_history("__presence__", combined) | |
| # prepare greeting audio | |
| tts = tts_get_audio_for_text(greeting_vi, detected_lang="vi") | |
| # telegram notify | |
| if TELEGRAM_TOKEN and TELEGRAM_CHATID: | |
| try: | |
| send_telegram_message("⚠️ Robot phát hiện: " + note) | |
| except Exception: | |
| logger.exception("telegram notify failed") | |
| resp = {"greeting": combined} | |
| if tts.get("audio_base64"): | |
| resp.update(tts) | |
| else: | |
| # if no TTS available, return a sample music url (client can play) | |
| resp["music_url"] = os.getenv("HF_MUSIC_URL", "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3") | |
| return jsonify(resp) | |
| def history_route(): | |
| return jsonify(read_history()) | |
| def clear_history_route(): | |
| clear_history() | |
| return jsonify({"cleared": True}) | |
| # startup warmup | |
| def warmup(): | |
| logger.info("Warmup: attempting lightweight calls (non-blocking)") | |
| def _w(): | |
| try: | |
| if HF_API_TOKEN: | |
| try: | |
| hf_text_generate("Xin chào. Hãy trả lời ngắn gọn: Xin chào!") | |
| except Exception: | |
| pass | |
| try: | |
| if HF_TTS_MODEL: | |
| hf_tts_bytes("Xin chào chủ nhân") | |
| except Exception: | |
| pass | |
| except Exception: | |
| logger.exception("warmup errors") | |
| threading.Thread(target=_w, daemon=True).start() | |
| def before_first(): | |
| warmup() | |
| if __name__ == "__main__": | |
| logger.info("Starting KC Robot AI v5.5 FINAL") | |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |