|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import io |
| import time |
| import json |
| import uuid |
| import logging |
| import threading |
| from typing import Optional, List, Tuple |
| from pathlib import Path |
|
|
| import requests |
| from flask import Flask, request, jsonify, send_file, render_template_string, abort |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("kcrobot.v4") |
|
|
| app = Flask(__name__) |
|
|
| |
| TMP_DIR = Path("/tmp/kcrobot") if os.name != "nt" else Path.cwd() / "tmp_kcrobot" |
| TMP_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() |
| HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large").strip() |
| |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "NguyenManhTuan/VietnameseTTS_FPT_AI_Female").strip() |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() |
| TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip() |
|
|
| |
| PORT = int(os.getenv("PORT", os.getenv("SERVER_PORT", 7860))) |
|
|
| if not HF_API_TOKEN: |
| logger.warning("HF_API_TOKEN is not set — HF inference will fail until you add it in Secrets.") |
|
|
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} |
|
|
| |
| CONV: List[Tuple[str, str]] = [] |
| DISPLAY_LINES: List[str] = [] |
|
|
| def push_display(line: str, limit: int = 6): |
| """Keep last `limit` lines for display.""" |
| global DISPLAY_LINES |
| DISPLAY_LINES.append(line) |
| if len(DISPLAY_LINES) > limit: |
| DISPLAY_LINES = DISPLAY_LINES[-limit:] |
|
|
| |
| def hf_post_json(model_id: str, payload: dict, timeout: int = 120): |
| """POST JSON to HF inference; return parsed JSON or raise.""" |
| if not HF_API_TOKEN: |
| raise RuntimeError("HF_API_TOKEN missing (set in Secrets).") |
| url = f"https://api-inference.huggingface.co/models/{model_id}" |
| headers = dict(HF_HEADERS) |
| headers["Content-Type"] = "application/json" |
| r = requests.post(url, headers=headers, json=payload, timeout=timeout) |
| if not r.ok: |
| logger.error("HF POST JSON error %s: %s", r.status_code, r.text[:400]) |
| r.raise_for_status() |
| try: |
| return r.json() |
| except Exception: |
| return r.text |
|
|
| def hf_post_bytes(model_id: str, data: bytes, content_type: str = "application/octet-stream", timeout: int = 180): |
| """POST binary data (audio) to HF inference; return response object or raise.""" |
| if not HF_API_TOKEN: |
| raise RuntimeError("HF_API_TOKEN missing (set in Secrets).") |
| url = f"https://api-inference.huggingface.co/models/{model_id}" |
| headers = dict(HF_HEADERS) |
| headers["Content-Type"] = content_type |
| r = requests.post(url, headers=headers, data=data, timeout=timeout) |
| if not r.ok: |
| logger.error("HF POST bytes error %s: %s", r.status_code, r.text[:400]) |
| r.raise_for_status() |
| return r |
|
|
| |
| def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: |
| model = model or HF_MODEL |
| payload = { |
| "inputs": prompt, |
| "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, |
| "options": {"wait_for_model": True} |
| } |
| out = hf_post_json(model, payload, timeout=120) |
| |
| if isinstance(out, list) and len(out) > 0: |
| first = out[0] |
| if isinstance(first, dict) and "generated_text" in first: |
| return first["generated_text"] |
| return str(first) |
| if isinstance(out, dict): |
| for k in ("generated_text", "text", "summary_text"): |
| if k in out: |
| return out[k] |
| return json.dumps(out) |
| return str(out) |
|
|
| |
| def hf_tts_get_audio_bytes(text: str, model: Optional[str] = None) -> bytes: |
| """Call HF TTS model and return audio bytes (commonly mp3 or wav).""" |
| model = model or HF_TTS_MODEL |
| payload = {"inputs": text} |
| r = requests.post(f"https://api-inference.huggingface.co/models/{model}", headers={**HF_HEADERS, "Content-Type": "application/json"}, json=payload, timeout=120) |
| if not r.ok: |
| logger.error("HF TTS error %s: %s", r.status_code, r.text[:400]) |
| r.raise_for_status() |
| return r.content |
|
|
| def save_tts_temp(audio_bytes: bytes, ext_hint: str = "mp3") -> str: |
| """Save bytes to a temp file under TMP_DIR and return filename.""" |
| fname = f"tts_{int(time.time())}_{uuid.uuid4().hex}.{ext_hint}" |
| p = TMP_DIR / fname |
| p.write_bytes(audio_bytes) |
| return fname |
|
|
| |
| def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: |
| model = model or HF_STT_MODEL |
| r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) |
| |
| try: |
| j = r.json() |
| if isinstance(j, dict) and "text" in j: |
| return j["text"] |
| if isinstance(j, list) and len(j) and isinstance(j[0], dict) and "text" in j[0]: |
| return j[0]["text"] |
| return str(j) |
| except Exception: |
| return r.text if hasattr(r, "text") else "" |
|
|
| |
| @app.route("/health", methods=["GET"]) |
| def health(): |
| return jsonify({ |
| "ok": True, |
| "hf_api_token": bool(HF_API_TOKEN), |
| "hf_model": HF_MODEL, |
| "hf_tts_model": HF_TTS_MODEL, |
| "hf_stt_model": HF_STT_MODEL, |
| "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHATID), |
| "tmp_dir": str(TMP_DIR), |
| }) |
|
|
| @app.route("/ask", methods=["POST"]) |
| def route_ask(): |
| """ |
| POST JSON: { "text": "...", "lang": "vi"|"en"|"auto" (optional) } |
| Returns: { "answer": "..." } |
| """ |
| try: |
| data = request.get_json(force=True) or {} |
| text = (data.get("text") or "").strip() |
| lang = (data.get("lang") or "auto").lower() |
| if not text: |
| return jsonify({"error": "no text"}), 400 |
|
|
| |
| if lang == "vi": |
| prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, ngắn gọn và lịch sự:\n\n{text}" |
| elif lang == "en": |
| prompt = f"You are a helpful assistant. Answer in clear English, concise:\n\n{text}" |
| else: |
| prompt = f"Bạn là trợ lý thông minh song ngữ (Vietnamese/English). Trả lời bằng ngôn ngữ phù hợp với câu hỏi:\n\n{text}" |
|
|
| answer = hf_text_generate(prompt) |
| |
| CONV.append((text, answer)) |
| push_display("YOU: " + (text[:40])) |
| push_display("BOT: " + (answer[:40])) |
| return jsonify({"answer": answer}) |
| except Exception as e: |
| logger.exception("route_ask failed") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/tts", methods=["POST"]) |
| def route_tts(): |
| """ |
| POST JSON: { "text":"..." } |
| Returns: audio bytes (audio/mpeg) - HF TTS output (mp3/wav) |
| """ |
| try: |
| data = request.get_json(force=True) or {} |
| text = (data.get("text") or "").strip() |
| if not text: |
| return jsonify({"error": "no text"}), 400 |
| audio_bytes = hf_tts_get_audio_bytes(text) |
| |
| |
| return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="tts.mp3") |
| except Exception as e: |
| logger.exception("route_tts failed") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/stt", methods=["POST"]) |
| def route_stt(): |
| """ |
| Accepts multipart 'file' or raw audio bytes in body. |
| Returns JSON: { "text": "recognized text" } |
| """ |
| try: |
| if "file" in request.files: |
| f = request.files["file"] |
| audio_bytes = f.read() |
| else: |
| audio_bytes = request.get_data() or b"" |
| if not audio_bytes: |
| return jsonify({"error": "no audio"}), 400 |
| text = hf_stt_from_bytes(audio_bytes) |
| push_display("UserAudio: " + (text[:40])) |
| return jsonify({"text": text}) |
| except Exception as e: |
| logger.exception("route_stt failed") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/presence", methods=["POST"]) |
| def route_presence(): |
| """ |
| ESP32 radar posts: JSON {"note": "..." } |
| Server responds with greeting, and optionally sends Telegram alert. |
| """ |
| try: |
| data = request.get_json(force=True) or {} |
| note = data.get("note", "Có người tới") |
| greeting = f"Xin chào! {note}" |
| CONV.append(("__presence__", greeting)) |
| push_display("RADAR: " + note[:40]) |
| |
| if TELEGRAM_TOKEN and TELEGRAM_CHATID: |
| try: |
| send_telegram_message(f"⚠️ Robot: Phát hiện: {note}") |
| except Exception: |
| logger.exception("Telegram notify failed") |
| return jsonify({"greeting": greeting}) |
| except Exception as e: |
| logger.exception("route_presence failed") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/display", methods=["GET"]) |
| def route_display(): |
| return jsonify({"lines": DISPLAY_LINES[-6:], "conv_len": len(CONV)}) |
|
|
| |
| @app.route("/tts_file/<path:fname>", methods=["GET"]) |
| def serve_tts_file(fname): |
| p = TMP_DIR / fname |
| if not p.exists(): |
| return abort(404) |
| |
| mime = "audio/mpeg" if str(fname).lower().endswith(".mp3") else "audio/wav" |
| return send_file(str(p), mimetype=mime) |
|
|
| |
| INDEX_HTML = """ |
| <!doctype html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <title>KC Robot AI V4.1</title> |
| <meta name="viewport" content="width=device-width,initial-scale=1"> |
| <style> |
| body{font-family:Arial,Helvetica, sans-serif; margin:12px; color:#111} |
| textarea{width:100%; height:90px; padding:8px; font-size:16px} |
| #chat{border:1px solid #ddd; padding:8px; height:260px; overflow:auto; background:#fbfbfb} |
| button{padding:8px 12px; margin-top:8px; font-size:15px} |
| </style> |
| </head> |
| <body> |
| <h2>KC Robot AI V4.1 — Cloud Brain (FPT female)</h2> |
| <div id="chat"></div> |
| <textarea id="txt" placeholder="Nhập tiếng Việt hoặc English..."></textarea><br> |
| <button onclick="ask()">Gửi (Ask)</button> |
| <button onclick="playLast()">Phát TTS</button> |
| <hr/> |
| <input type="file" id="afile" accept="audio/*"><button onclick="uploadAudio()">Upload audio → STT</button> |
| <hr/> |
| <div id="log"></div> |
| <script> |
| window._lastAnswer = ""; |
| async function ask(){ |
| let t = document.getElementById('txt').value; |
| if(!t) return; |
| appendUser(t); |
| let res = await fetch('/ask', {method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({text:t})}); |
| let j = await res.json(); |
| if(j.answer){ appendBot(j.answer); window._lastAnswer = j.answer; } |
| else appendBot('[Error] ' + JSON.stringify(j)); |
| } |
| function appendUser(t){ document.getElementById('chat').innerHTML += '<div style="color:#006"><b>You:</b> '+escapeHtml(t)+'</div>'; scroll();} |
| function appendBot(t){ document.getElementById('chat').innerHTML += '<div style="color:#080"><b>Robot:</b> '+escapeHtml(t)+'</div>'; scroll();} |
| function escapeHtml(s){ return (s+'').replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>'); } |
| function scroll(){ let c = document.getElementById('chat'); c.scrollTop = c.scrollHeight; } |
| async function playLast(){ |
| const txt = window._lastAnswer || document.getElementById('txt').value; |
| if(!txt) return alert('Chưa có câu trả lời'); |
| let r = await fetch('/tts',{method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({text: txt})}); |
| if(!r.ok) return alert('TTS lỗi'); |
| const b = await r.blob(); |
| const url = URL.createObjectURL(b); |
| const a = new Audio(url); |
| a.play(); |
| } |
| async function uploadAudio(){ |
| const f = document.getElementById('afile').files[0]; |
| if(!f) return alert('Chọn file audio'); |
| const fd = new FormData(); fd.append('file', f); |
| const r = await fetch('/stt', {method:'POST', body: fd}); |
| const j = await r.json(); |
| if(j.text) appendUser('[voice] '+j.text); |
| else appendUser('[stt error] '+JSON.stringify(j)); |
| } |
| </script> |
| </body> |
| </html> |
| """ |
| @app.route("/", methods=["GET"]) |
| def index(): |
| return render_template_string(INDEX_HTML) |
|
|
| |
| def send_telegram_message(text: str) -> bool: |
| if not TELEGRAM_TOKEN or not TELEGRAM_CHATID: |
| logger.debug("Telegram not configured") |
| return False |
| try: |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" |
| r = requests.post(url, json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10) |
| if not r.ok: |
| logger.warning("Telegram send failed: %s %s", r.status_code, r.text) |
| return False |
| return True |
| except Exception: |
| logger.exception("send_telegram_message exception") |
| return False |
|
|
| def telegram_poll_loop(): |
| """Long-polling loop to fetch updates and respond to simple commands.""" |
| if not TELEGRAM_TOKEN: |
| logger.info("telegram_poll_loop: TELEGRAM_TOKEN not set, exiting poller.") |
| return |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" |
| offset = None |
| logger.info("telegram_poll_loop: starting.") |
| while True: |
| try: |
| params = {"timeout": 30} |
| if offset: |
| params["offset"] = offset |
| r = requests.get(base + "/getUpdates", params=params, timeout=35) |
| if not r.ok: |
| logger.warning("telegram getUpdates failed: %s", r.status_code) |
| time.sleep(2) |
| continue |
| j = r.json() |
| for upd in j.get("result", []): |
| offset = upd["update_id"] + 1 |
| msg = upd.get("message") or {} |
| chat = msg.get("chat", {}) |
| chat_id = chat.get("id") |
| text = (msg.get("text") or "").strip() |
| if not text: |
| continue |
| logger.info("TG msg %s: %s", chat_id, text) |
| lower = text.lower() |
| if lower.startswith("/ask "): |
| q = text[5:].strip() |
| try: |
| ans = hf_text_generate(q) |
| except Exception as e: |
| ans = f"[HF error] {e}" |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) |
| except Exception: |
| logger.exception("tg reply failed") |
| elif lower.startswith("/say "): |
| tts_text = text[5:].strip() |
| try: |
| audio = hf_tts_get_audio_bytes(tts_text) |
| files = {"audio": ("reply.mp3", audio, "audio/mpeg")} |
| requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30) |
| except Exception: |
| logger.exception("tg say failed") |
| elif lower.startswith("/status"): |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot AI is running."}, timeout=10) |
| except Exception: |
| pass |
| else: |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) |
| except Exception: |
| pass |
| except Exception: |
| logger.exception("telegram_poll_loop exception, sleeping 3s") |
| time.sleep(3) |
|
|
| def start_background_tasks(): |
| |
| if TELEGRAM_TOKEN: |
| t = threading.Thread(target=telegram_poll_loop, daemon=True) |
| t.start() |
| logger.info("Started Telegram poller thread.") |
| else: |
| logger.info("Telegram token not provided; poller disabled.") |
|
|
| @app.before_first_request |
| def _startup(): |
| start_background_tasks() |
|
|
| |
| if __name__ == "__main__": |
| logger.info("Starting KC Robot AI V4.1 (FPT female TTS).") |
| start_background_tasks() |
| app.run(host="0.0.0.0", port=PORT) |
|
|
|
|