|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os, io, json, time, logging, threading |
| from typing import List, Any, Tuple, Optional |
| from flask import Flask, request, jsonify, send_file, render_template_string |
| import requests |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("kcrobot.v7.2") |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
| HF_MODEL = os.getenv("HF_MODEL", "").strip() |
| HF_MODEL_VI = os.getenv("HF_MODEL_VI", "VietnamAIHub/Vietnamese_llama2_7B_8K_SFT_General_domain").strip() |
| HF_MODEL_VI_BACKUP = os.getenv("HF_MODEL_VI_BACKUP", "TheBloke/vietnamese-llama2-7B-40GB-AWQ,TheBloke/vietnamese-llama2-7B-40GB-GPTQ").strip() |
| HF_MODEL_EN = os.getenv("HF_MODEL_EN", "meta-llama/Llama-3.1-8B-Instruct").strip() |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "NguyenManhTuan/VietnameseTTS_FPT_AI_Female").strip() |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() |
|
|
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() |
| TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "").strip() |
|
|
| HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
| |
| CONVERSATION = [] |
| DISPLAY = [] |
| DISPLAY_LIMIT = 6 |
|
|
| def push_display(line: str): |
| DISPLAY.append(line) |
| if len(DISPLAY) > DISPLAY_LIMIT: |
| DISPLAY.pop(0) |
|
|
| |
| VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụứừửữựýỳỷỹỵ") |
| def detect_lang(text: str) -> str: |
| if not text: return "en" |
| for ch in text.lower(): |
| if ch in VI_CHARS: |
| return "vi" |
| return "en" |
|
|
| def models_from_env() -> Tuple[List[str], List[str]]: |
| |
| text_models = [] |
| if HF_MODEL: |
| text_models = [HF_MODEL] |
| else: |
| text_models = [HF_MODEL_VI] + [m for m in [*map(str.strip, HF_MODEL_VI_BACKUP.split(","))] if m] + [HF_MODEL_EN] |
| tts_models = [HF_TTS_MODEL] if HF_TTS_MODEL else [] |
| return text_models, tts_models |
|
|
| |
| def hf_post_json_try(models: List[str], payload: dict, timeout: int = 90): |
| last_err = None |
| if not HF_TOKEN: |
| raise RuntimeError("HF_TOKEN missing in Secrets.") |
| headers = dict(HF_HEADERS); headers["Content-Type"] = "application/json" |
| for model in models: |
| url = f"https://api-inference.huggingface.co/models/{model}" |
| try: |
| r = requests.post(url, headers=headers, json=payload, timeout=timeout) |
| except Exception as e: |
| last_err = f"network error {e}" |
| logger.warning("Network error for %s: %s", model, e) |
| continue |
| if r.status_code == 200: |
| try: |
| return model, r.json() |
| except Exception: |
| return model, r.content |
| if r.status_code in (401,403): |
| last_err = f"auth error {r.status_code} for {model}" |
| logger.warning(last_err) |
| continue |
| if r.status_code == 404: |
| last_err = f"not found 404 for {model}" |
| logger.warning(last_err) |
| continue |
| if r.status_code == 400: |
| last_err = f"bad request 400 for {model}" |
| logger.warning(last_err + " | " + r.text[:200]) |
| continue |
| last_err = f"HTTP {r.status_code} for {model}: {r.text[:200]}" |
| logger.warning(last_err) |
| raise RuntimeError(f"All model attempts failed. Last error: {last_err}") |
|
|
| |
| def hf_post_bytes_try(models: List[str], data: bytes, content_type: str = "application/octet-stream", timeout: int = 120): |
| last_err = None |
| if not HF_TOKEN: |
| raise RuntimeError("HF_TOKEN missing in Secrets.") |
| headers = dict(HF_HEADERS); headers["Content-Type"] = content_type |
| for model in models: |
| url = f"https://api-inference.huggingface.co/models/{model}" |
| try: |
| r = requests.post(url, headers=headers, data=data, timeout=timeout) |
| except Exception as e: |
| last_err = f"network error {e}" |
| logger.warning("Network error for %s: %s", model, e) |
| continue |
| if r.status_code == 200: |
| return model, r |
| if r.status_code in (401,403): |
| last_err = f"auth error {r.status_code} for {model}" |
| logger.warning(last_err) |
| continue |
| if r.status_code == 404: |
| last_err = f"not found 404 for {model}" |
| logger.warning(last_err) |
| continue |
| if r.status_code == 400: |
| last_err = f"bad request 400 for {model}: {r.text[:200]}" |
| logger.warning(last_err) |
| continue |
| last_err = f"HTTP {r.status_code} for {model}: {r.text[:200]}" |
| logger.warning(last_err) |
| raise RuntimeError(f"All byte-post attempts failed. Last error: {last_err}") |
|
|
| def parse_text_out(obj): |
| try: |
| if isinstance(obj, list) and obj and isinstance(obj[0], dict): |
| for k in ("generated_text","text"): |
| if k in obj[0]: |
| return obj[0][k] |
| return str(obj[0]) |
| if isinstance(obj, dict): |
| for k in ("generated_text","text","summary_text"): |
| if k in obj: |
| return obj[k] |
| if "choices" in obj and isinstance(obj["choices"], list) and obj["choices"]: |
| c0 = obj["choices"][0] |
| return c0.get("text") or c0.get("message",{}).get("content","") |
| return json.dumps(obj) |
| if isinstance(obj,(bytes,bytearray)): |
| return obj.decode(errors="ignore") |
| return str(obj) |
| except Exception as e: |
| return f"[parse error] {e}" |
|
|
| def text_generate(prompt: str, lang_hint: str="auto", max_new_tokens:int=512, temperature:float=0.7): |
| text_models, _ = models_from_env() |
| models = [] |
| if HF_MODEL: |
| models = [HF_MODEL] |
| else: |
| |
| if lang_hint == "vi" or (lang_hint=="auto" and detect_lang(prompt)=="vi"): |
| models = text_models |
| else: |
| |
| models = [m for m in [HF_MODEL_EN] if m] + text_models |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True}} |
| model_used, out = hf_post_json_try(models, payload, timeout=120) |
| logger.info("text_generate used model: %s", model_used) |
| return parse_text_out(out) |
|
|
| def tts_bytes(text: str): |
| _, tts_models = models_from_env() |
| if not tts_models: |
| raise RuntimeError("No TTS model configured.") |
| payload = json.dumps({"inputs": text}).encode("utf-8") |
| model_used, resp = hf_post_bytes_try(tts_models, payload, content_type="application/json", timeout=120) |
| logger.info("tts used model: %s", model_used) |
| return resp.content |
|
|
| def stt_from_bytes(audio_bytes: bytes): |
| models = [HF_STT_MODEL] if HF_STT_MODEL else [] |
| if not models: |
| raise RuntimeError("No STT model configured.") |
| model_used, resp = hf_post_bytes_try(models, audio_bytes, content_type="application/octet-stream", timeout=180) |
| logger.info("stt used model: %s", model_used) |
| try: |
| j = resp.json() |
| if isinstance(j, dict) and "text" in j: |
| return j["text"] |
| return parse_text_out(j) |
| except Exception: |
| return resp.text if hasattr(resp,"text") else str(resp) |
|
|
| |
| def send_telegram(msg: str): |
| if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: |
| logger.debug("telegram not configured") |
| return False |
| try: |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" |
| requests.post(url, json={"chat_id": TELEGRAM_CHAT_ID, "text": msg}, timeout=10) |
| return True |
| except Exception: |
| logger.exception("telegram send failed") |
| return False |
|
|
| |
| app = Flask(__name__) |
|
|
| INDEX_HTML = """ |
| <!doctype html> |
| <html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1"> |
| <title>KC Robot AI v7.2</title> |
| <style>body{font-family:Arial;margin:12px}textarea,input{width:100%;padding:8px;margin:6px 0}button{padding:8px 12px;background:#1976d2;color:#fff;border:none;border-radius:6px}</style> |
| </head><body> |
| <h2>KC Robot AI v7.2 — Cloud Brain</h2> |
| <textarea id="q" rows="3" placeholder="Nhập tiếng Việt hoặc English..."></textarea><br> |
| <select id="lang"><option value="auto">Auto</option><option value="vi">Vietnamese</option><option value="en">English</option></select> |
| <button onclick="send()">Send</button> <button onclick="play()">Play TTS</button> |
| <div id="out" style="margin-top:14px"></div> |
| <script> |
| let lastAns=""; |
| async function send(){ |
| let t=document.getElementById('q').value; if(!t) return; |
| document.getElementById('out').innerHTML += '<div><b>You:</b> '+t+'</div>'; |
| let res=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:t,lang:document.getElementById('lang').value})}); |
| let j=await res.json(); |
| if(j.answer){ lastAns=j.answer; document.getElementById('out').innerHTML += '<div><b>Robot:</b> '+j.answer+'</div>'; } |
| else document.getElementById('out').innerHTML += '<div><b>Error:</b> '+JSON.stringify(j)+'</div>'; |
| } |
| async function play(){ |
| if(!lastAns) return alert('No answer'); |
| let r=await fetch('/tts',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:lastAns})}); |
| if(!r.ok){ alert('TTS failed'); return;} |
| let b=await r.blob(); let url=URL.createObjectURL(b); let a=new Audio(url); a.play(); |
| } |
| </script> |
| </body></html> |
| """ |
|
|
| @app.route("/", methods=["GET"]) |
| def index(): |
| return INDEX_HTML |
|
|
| @app.route("/health", methods=["GET"]) |
| def health(): |
| return jsonify({ |
| "ok": True, |
| "hf_token_set": bool(HF_TOKEN), |
| "hf_model": HF_MODEL or HF_MODEL_VI, |
| "tts_model": HF_TTS_MODEL, |
| "stt_model": HF_STT_MODEL, |
| "telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID), |
| }) |
|
|
| @app.route("/ask", methods=["POST"]) |
| def api_ask(): |
| try: |
| j = request.get_json(force=True) or {} |
| text = (j.get("text","") or "").strip() |
| lang = (j.get("lang","auto") or "auto") |
| if not text: |
| return jsonify({"error":"no text"}), 400 |
| if lang=="vi": |
| prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, lịch sự và ngắn gọn:\\n\\n" + text |
| elif lang=="en": |
| prompt = "You are a helpful assistant. Answer in English, concise:\\n\\n" + text |
| else: |
| prompt = "You are a bilingual assistant. Answer in the same language as the user:\\n\\n" + text |
| ans = text_generate(prompt, lang_hint=(lang if lang in ("vi","en") else "auto")) |
| CONVERSATION.append((text, ans)) |
| push_display("YOU: " + text[:60]) |
| push_display("BOT: " + (ans[:60] if isinstance(ans,str) else str(ans)[:60])) |
| |
| if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: |
| try: |
| send_telegram(f"You: {text}\\nBot: {ans[:400]}") |
| except Exception: |
| logger.exception("telegram notify failed") |
| return jsonify({"answer": ans}) |
| except Exception as e: |
| logger.exception("ask error") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/tts", methods=["POST"]) |
| def api_tts(): |
| try: |
| j = request.get_json(force=True) or {} |
| text = (j.get("text","") or "").strip() |
| if not text: |
| return jsonify({"error":"no text"}), 400 |
| audio_bytes = tts_bytes(text) |
| return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="tts.mp3") |
| except Exception as e: |
| logger.exception("tts error") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/stt", methods=["POST"]) |
| def api_stt(): |
| try: |
| if "file" in request.files: |
| f = request.files["file"]; audio_bytes = f.read() |
| else: |
| audio_bytes = request.get_data() |
| if not audio_bytes: |
| return jsonify({"error":"no audio"}), 400 |
| text = stt_from_bytes(audio_bytes) |
| CONVERSATION.append((f"[voice] {text}", "")) |
| push_display("Voice: " + (text[:60] if isinstance(text,str) else str(text))) |
| return jsonify({"text": text}) |
| except Exception as e: |
| logger.exception("stt error") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/presence", methods=["POST"]) |
| def api_presence(): |
| try: |
| j = request.get_json(force=True) or {} |
| note = (j.get("note","Có người phía trước") or "Có người phía trước") |
| greeting = f"Xin chào! {note}" |
| CONVERSATION.append(("__presence__", greeting)) |
| push_display("RADAR: " + note[:60]) |
| if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: |
| try: send_telegram(f"⚠️ Robot: Phát hiện người - {note}") |
| except Exception: logger.exception("tg notify failed") |
| try: |
| audio_bytes = tts_bytes(greeting) |
| return send_file(io.BytesIO(audio_bytes), mimetype="audio/mpeg", as_attachment=False, download_name="presence.mp3") |
| except Exception: |
| return jsonify({"greeting": greeting}) |
| except Exception as e: |
| logger.exception("presence error") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route("/display", methods=["GET"]) |
| def api_display(): |
| return jsonify({"lines": DISPLAY.copy(), "conv_len": len(CONVERSATION)}) |
|
|
| |
| if __name__ == "__main__": |
| logger.info("Starting KC Robot AI v7.2") |
| if not HF_TOKEN: |
| logger.warning("HF_TOKEN missing — add it to Space Secrets.") |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), debug=False) |
|
|