|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import io |
|
|
import sys |
|
|
import time |
|
|
import json |
|
|
import uuid |
|
|
import logging |
|
|
import threading |
|
|
from typing import Any, List, Tuple, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
import requests |
|
|
from flask import Flask, request, jsonify, Response, render_template_string |
|
|
|
|
|
|
|
|
try: |
|
|
from gtts import gTTS |
|
|
_HAS_GTTS = True |
|
|
except Exception: |
|
|
_HAS_GTTS = False |
|
|
|
|
|
|
|
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO, |
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s") |
|
|
logger = logging.getLogger("kcrobot.v7.5") |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
|
|
HF_MODEL = os.getenv("HF_MODEL", "").strip() |
|
|
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "").strip() |
|
|
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() |
|
|
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() |
|
|
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "").strip() |
|
|
PORT = int(os.getenv("PORT", 7860)) |
|
|
|
|
|
HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
|
|
|
|
|
|
TMPDIR = Path("/tmp/kcrobot") if os.name != "nt" else Path.cwd() / "tmp_kcrobot" |
|
|
TMPDIR.mkdir(parents=True, exist_ok=True) |
|
|
CONV_LOG = TMPDIR / "conversation_log.jsonl" |
|
|
|
|
|
|
|
|
CONVERSATION: List[Tuple[str, str]] = [] |
|
|
DISPLAY_BUFFER: List[str] = [] |
|
|
DISPLAY_LIMIT = 6 |
|
|
|
|
|
def push_display(line: str): |
|
|
global DISPLAY_BUFFER |
|
|
DISPLAY_BUFFER.append(line) |
|
|
if len(DISPLAY_BUFFER) > DISPLAY_LIMIT: |
|
|
DISPLAY_BUFFER = DISPLAY_BUFFER[-DISPLAY_LIMIT:] |
|
|
|
|
|
def save_conv(user: str, bot: str): |
|
|
try: |
|
|
with open(CONV_LOG, "a", encoding="utf-8") as f: |
|
|
f.write(json.dumps({"time": time.time(), "user": user, "bot": bot}, ensure_ascii=False) + "\n") |
|
|
except Exception: |
|
|
logger.exception("save_conv failed") |
|
|
|
|
|
|
|
|
def clean_text(text: Any) -> str: |
|
|
if text is None: |
|
|
return "" |
|
|
s = str(text) |
|
|
import re |
|
|
s = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]+', ' ', s) |
|
|
s = re.sub(r'\s+', ' ', s).strip() |
|
|
return s |
|
|
|
|
|
VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụứừửữựýỳỷỹỵ") |
|
|
def detect_language(text: str) -> str: |
|
|
t = (text or "").lower() |
|
|
for ch in t: |
|
|
if ch in VI_CHARS: |
|
|
return "vi" |
|
|
return "en" |
|
|
|
|
|
|
|
|
def hf_post_json(model_id: str, payload: dict, timeout: int = 90) -> requests.Response: |
|
|
if not HF_TOKEN: |
|
|
raise RuntimeError("HF_TOKEN not configured in Secrets") |
|
|
url = f"https://api-inference.huggingface.co/models/{model_id}" |
|
|
headers = dict(HF_HEADERS) |
|
|
headers["Content-Type"] = "application/json" |
|
|
return requests.post(url, headers=headers, json=payload, timeout=timeout) |
|
|
|
|
|
def hf_post_bytes(model_id: str, data: bytes, content_type: str = "application/octet-stream", timeout: int = 180) -> requests.Response: |
|
|
if not HF_TOKEN: |
|
|
raise RuntimeError("HF_TOKEN not configured in Secrets") |
|
|
url = f"https://api-inference.huggingface.co/models/{model_id}" |
|
|
headers = dict(HF_HEADERS) |
|
|
headers["Content-Type"] = content_type |
|
|
return requests.post(url, headers=headers, data=data, timeout=timeout) |
|
|
|
|
|
def parse_hf_text_output(obj: Any) -> str: |
|
|
try: |
|
|
if isinstance(obj, dict): |
|
|
for k in ("generated_text","text","answer"): |
|
|
if k in obj: |
|
|
return obj.get(k,"") |
|
|
if "choices" in obj and isinstance(obj["choices"], list) and obj["choices"]: |
|
|
c0 = obj["choices"][0] |
|
|
return c0.get("text") or c0.get("message",{}).get("content","") or str(c0) |
|
|
return json.dumps(obj, ensure_ascii=False) |
|
|
if isinstance(obj, list) and obj: |
|
|
first = obj[0] |
|
|
if isinstance(first, dict): |
|
|
for k in ("generated_text","text"): |
|
|
if k in first: |
|
|
return first.get(k,"") |
|
|
return str(first) |
|
|
return str(obj) |
|
|
except Exception: |
|
|
logger.exception("parse_hf_text_output") |
|
|
return str(obj) |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_MODEL_CANDIDATES = [ |
|
|
"mistralai/Mistral-7B-Instruct-v0.3", |
|
|
"google/gemma-2b-it", |
|
|
"databricks/dolly-v2-3b", |
|
|
"tiiuae/falcon-7b-instruct", |
|
|
"facebook/blenderbot-400M-distill", |
|
|
|
|
|
"vinai/PhoGPT-4B", |
|
|
] |
|
|
|
|
|
def test_model_working(model_id: str, sample_prompt: str = "Xin chào, bạn khỏe không?") -> Tuple[bool, dict]: |
|
|
""" |
|
|
Return (ok, response_short_info) |
|
|
ok True if got status 200 and some textual output parseable |
|
|
""" |
|
|
try: |
|
|
payload = {"inputs": sample_prompt, "parameters": {"max_new_tokens": 20}, "options": {"wait_for_model": True}} |
|
|
r = hf_post_json(model_id, payload, timeout=30) |
|
|
info = {"status": r.status_code, "text": (r.text[:500] if r.text else "")} |
|
|
if r.status_code == 200: |
|
|
|
|
|
try: |
|
|
j = r.json() |
|
|
out = parse_hf_text_output(j) |
|
|
if out and len(out.strip())>0: |
|
|
info["result"] = out |
|
|
return True, info |
|
|
except Exception: |
|
|
|
|
|
if r.text and len(r.text.strip())>0: |
|
|
info["result"] = r.text |
|
|
return True, info |
|
|
return False, info |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.warning("test_model_working request exception for %s: %s", model_id, e) |
|
|
return False, {"error": str(e)} |
|
|
except Exception: |
|
|
logger.exception("test_model_working unexpected") |
|
|
return False, {"error": "unexpected"} |
|
|
|
|
|
def auto_select_model(preferred: Optional[str] = None) -> Optional[str]: |
|
|
""" |
|
|
Try preferred model first. If fail, iterate DEFAULT_MODEL_CANDIDATES |
|
|
Returns selected model id or None. |
|
|
""" |
|
|
tried = [] |
|
|
if preferred: |
|
|
logger.info("Auto-check preferred model: %s", preferred) |
|
|
ok, info = test_model_working(preferred) |
|
|
tried.append((preferred, ok, info)) |
|
|
if ok: |
|
|
logger.info("Preferred model OK: %s", preferred) |
|
|
return preferred |
|
|
logger.info("Preferred model not usable or not provided, scanning candidates...") |
|
|
for m in DEFAULT_MODEL_CANDIDATES: |
|
|
if m == preferred: |
|
|
continue |
|
|
logger.info("Testing candidate: %s", m) |
|
|
ok, info = test_model_working(m) |
|
|
tried.append((m, ok, info)) |
|
|
if ok: |
|
|
logger.info("Selected fallback model: %s", m) |
|
|
return m |
|
|
|
|
|
logger.warning("Auto-select model found none usable. Tried: %s", [(t[0], t[1]) for t in tried]) |
|
|
return None |
|
|
|
|
|
|
|
|
SELECTED_MODEL = HF_MODEL if HF_MODEL else None |
|
|
|
|
|
|
|
|
def hf_text_generate(prompt: str, model_override: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: |
|
|
model = model_override or SELECTED_MODEL |
|
|
if not model: |
|
|
raise RuntimeError("No HF model selected") |
|
|
payload = {"inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True}} |
|
|
r = hf_post_json(model, payload, timeout=120) |
|
|
if r.status_code == 200: |
|
|
try: |
|
|
j = r.json() |
|
|
return parse_hf_text_output(j) |
|
|
except Exception: |
|
|
return r.text |
|
|
elif r.status_code == 403: |
|
|
raise RuntimeError("HF returned 403 (forbidden) — token or access rights issue") |
|
|
elif r.status_code == 404: |
|
|
raise RuntimeError("HF returned 404 (model not found) — check HF_MODEL or model access") |
|
|
else: |
|
|
raise RuntimeError(f"HF text gen returned {r.status_code}: {r.text[:300]}") |
|
|
|
|
|
def hf_stt_from_bytes(audio_bytes: bytes, model_override: Optional[str] = None) -> str: |
|
|
model = model_override or HF_STT_MODEL |
|
|
if not model: |
|
|
raise RuntimeError("HF_STT_MODEL not configured") |
|
|
r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180) |
|
|
if r.status_code == 200: |
|
|
try: |
|
|
j = r.json() |
|
|
if isinstance(j, dict) and "text" in j: |
|
|
return j["text"] |
|
|
return parse_hf_text_output(j) |
|
|
except Exception: |
|
|
return r.text |
|
|
else: |
|
|
raise RuntimeError(f"HF STT returned {r.status_code}: {r.text[:300]}") |
|
|
|
|
|
def hf_tts_get_bytes(text: str, model_override: Optional[str] = None) -> bytes: |
|
|
text = text.strip() |
|
|
if not text: |
|
|
raise RuntimeError("TTS text empty") |
|
|
model = model_override or HF_TTS_MODEL |
|
|
if model: |
|
|
|
|
|
try: |
|
|
payload = {"inputs": text} |
|
|
r = hf_post_json(model, payload, timeout=120) |
|
|
if r.status_code == 200 and r.content: |
|
|
return r.content |
|
|
|
|
|
if r.status_code == 200: |
|
|
try: |
|
|
j = r.json() |
|
|
return parse_hf_text_output(j).encode("utf-8") |
|
|
except Exception: |
|
|
return r.content |
|
|
logger.warning("HF TTS returned %s: %s", r.status_code, r.text[:200]) |
|
|
except Exception: |
|
|
logger.exception("HF TTS call failed") |
|
|
|
|
|
if _HAS_GTTS: |
|
|
try: |
|
|
lang = "vi" if detect_language(text) == "vi" else "en" |
|
|
tts = gTTS(text=text, lang=lang) |
|
|
bio = io.BytesIO() |
|
|
tts.write_to_fp(bio) |
|
|
bio.seek(0) |
|
|
return bio.read() |
|
|
except Exception: |
|
|
logger.exception("gTTS fallback failed") |
|
|
raise RuntimeError("gTTS fallback failed") |
|
|
raise RuntimeError("No TTS available (no HF_TTS_MODEL and gTTS not installed)") |
|
|
|
|
|
|
|
|
def telegram_send_message(chat_id: str, text: str) -> bool: |
|
|
if not TELEGRAM_TOKEN or not chat_id: |
|
|
return False |
|
|
try: |
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" |
|
|
r = requests.post(url, json={"chat_id": chat_id, "text": text}, timeout=8) |
|
|
if r.status_code != 200: |
|
|
logger.warning("Telegram sendMessage failed %s: %s", r.status_code, r.text[:300]) |
|
|
return False |
|
|
return True |
|
|
except Exception: |
|
|
logger.exception("telegram_send_message") |
|
|
return False |
|
|
|
|
|
def telegram_send_audio(chat_id: str, audio_bytes: bytes, filename: str = "reply.mp3") -> bool: |
|
|
if not TELEGRAM_TOKEN or not chat_id: |
|
|
return False |
|
|
try: |
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendAudio" |
|
|
files = {"audio": (filename, io.BytesIO(audio_bytes), "audio/mpeg")} |
|
|
data = {"chat_id": chat_id} |
|
|
r = requests.post(url, files=files, data=data, timeout=30) |
|
|
if r.status_code != 200: |
|
|
logger.warning("Telegram sendAudio failed %s: %s", r.status_code, r.text[:300]) |
|
|
return False |
|
|
return True |
|
|
except Exception: |
|
|
logger.exception("telegram_send_audio") |
|
|
return False |
|
|
|
|
|
|
|
|
def telegram_poller_loop(): |
|
|
if not TELEGRAM_TOKEN: |
|
|
logger.info("Telegram token not set; poller disabled") |
|
|
return |
|
|
logger.info("Starting Telegram poller") |
|
|
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" |
|
|
offset = None |
|
|
while True: |
|
|
try: |
|
|
params = {"timeout": 30} |
|
|
if offset: params["offset"] = offset |
|
|
r = requests.get(base + "/getUpdates", params=params, timeout=35) |
|
|
if r.status_code != 200: |
|
|
logger.warning("Telegram getUpdates failed: %s", r.status_code) |
|
|
time.sleep(2); continue |
|
|
j = r.json() |
|
|
for upd in j.get("result", []): |
|
|
offset = upd.get("update_id", 0) + 1 |
|
|
msg = upd.get("message") or {} |
|
|
chat = msg.get("chat", {}) |
|
|
chat_id = str(chat.get("id")) |
|
|
text = (msg.get("text") or "").strip() |
|
|
if not text: continue |
|
|
logger.info("TG msg %s: %s", chat_id, text[:200]) |
|
|
lower = text.lower() |
|
|
if lower.startswith("/ask "): |
|
|
q = text[5:].strip() |
|
|
try: |
|
|
ans = hf_text_generate(q) |
|
|
except Exception as e: |
|
|
ans = f"[HF error] {e}" |
|
|
try: |
|
|
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) |
|
|
except Exception: |
|
|
logger.exception("tg reply failed") |
|
|
elif lower.startswith("/say "): |
|
|
phrase = text[5:].strip() |
|
|
try: |
|
|
audio = hf_tts_get_bytes(phrase) |
|
|
telegram_send_audio(chat_id, audio, filename="say.mp3") |
|
|
except Exception: |
|
|
logger.exception("tg say failed") |
|
|
elif lower.startswith("/status"): |
|
|
try: |
|
|
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot v7.5 running"}, timeout=10) |
|
|
except Exception: |
|
|
pass |
|
|
else: |
|
|
try: |
|
|
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
logger.exception("telegram poller crashed, sleeping 3s") |
|
|
time.sleep(3) |
|
|
|
|
|
if TELEGRAM_TOKEN: |
|
|
try: |
|
|
t = threading.Thread(target=telegram_poller_loop, daemon=True) |
|
|
t.start() |
|
|
except Exception: |
|
|
logger.exception("start telegram thread failed") |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
INDEX_HTML = """ |
|
|
<!doctype html> |
|
|
<html> |
|
|
<head> |
|
|
<meta charset="utf-8"> |
|
|
<title>KC Robot AI v7.5</title> |
|
|
<meta name="viewport" content="width=device-width,initial-scale=1"> |
|
|
<style> |
|
|
body{font-family:Arial,Helvetica,sans-serif;margin:12px;color:#111} |
|
|
.box{max-width:960px;margin:auto} |
|
|
textarea{width:100%;height:90px;padding:10px;font-size:16px;border-radius:8px;border:1px solid #ddd} |
|
|
button{padding:10px 14px;margin:6px 4px;border-radius:8px;background:#0b74de;color:white;border:none;cursor:pointer;font-weight:700} |
|
|
#chat{border:1px solid #eee;padding:10px;height:360px;overflow:auto;background:#fafafa;border-radius:8px} |
|
|
.you{color:#0b63d6;margin:6px 0} |
|
|
.bot{color:#0b8a3f;margin:6px 0} |
|
|
.small{font-size:13px;color:#666} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="box"> |
|
|
<h2>🤖 KC Robot AI v7.5 — Final (Auto-model)</h2> |
|
|
<div class="small">Model: <span id="modelName">loading...</span> | Telegram: <span id="tgstatus">checking...</span></div> |
|
|
<textarea id="userText" placeholder="Nhập tiếng Việt hoặc English..."></textarea> |
|
|
<div> |
|
|
<select id="lang"><option value="auto">Auto</option><option value="vi">Vietnamese</option><option value="en">English</option></select> |
|
|
<button onclick="send()">Gửi</button> |
|
|
<button onclick="playLast()">Phát âm</button> |
|
|
<button onclick="clearChat()">Xóa</button> |
|
|
</div> |
|
|
<div id="chat"></div> |
|
|
<div style="margin-top:10px"> |
|
|
<input type="file" id="afile" accept="audio/*"><button onclick="uploadAudio()">Upload → STT</button> |
|
|
</div> |
|
|
<hr> |
|
|
<div class="small">Diagnostics: <button onclick="modelCheck()">Kiểm tra model</button><span id="diag"></span></div> |
|
|
</div> |
|
|
<script> |
|
|
let lastAnswer = ""; |
|
|
async function loadStatus(){ try{ let r=await fetch('/health'); let j=await r.json(); document.getElementById('modelName').innerText=j.hf_model||'(not set)'; document.getElementById('tgstatus').innerText=j.telegram ? 'enabled' : 'disabled'; }catch(e){ console.log(e); } } |
|
|
function escapeHtml(s){ return (s+'').replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>'); } |
|
|
function appendYou(t){ document.getElementById('chat').innerHTML += '<div class="you"><b>You:</b> '+escapeHtml(t)+'</div>'; scroll(); } |
|
|
function appendBot(t){ document.getElementById('chat').innerHTML += '<div class="bot"><b>Robot:</b> '+escapeHtml(t)+'</div>'; scroll(); } |
|
|
function scroll(){ let c=document.getElementById('chat'); c.scrollTop = c.scrollHeight; } |
|
|
async function send(){ |
|
|
let t=document.getElementById('userText').value.trim(); if(!t) return; appendYou(t); document.getElementById('userText').value=''; |
|
|
let lang=document.getElementById('lang').value; |
|
|
try{ |
|
|
let r=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:t,lang:lang})}); |
|
|
let j=await r.json(); |
|
|
if(j.answer){ lastAnswer=j.answer; appendBot(j.answer); } else appendBot('[error] '+JSON.stringify(j)); |
|
|
}catch(e){ appendBot('[network error] '+e); } |
|
|
} |
|
|
async function playLast(){ |
|
|
if(!lastAnswer) return alert('Chưa có câu trả lời'); |
|
|
try{ |
|
|
let r=await fetch('/tts',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:lastAnswer})}); |
|
|
if(!r.ok){ alert('TTS lỗi'); return; } |
|
|
const blob = await r.blob(); |
|
|
const url=URL.createObjectURL(blob); |
|
|
const audio=new Audio(url); audio.play(); |
|
|
}catch(e){ alert('Play error: '+e); } |
|
|
} |
|
|
async function uploadAudio(){ |
|
|
const f=document.getElementById('afile').files[0]; if(!f) return alert('Chọn file audio'); |
|
|
const fd=new FormData(); fd.append('file', f); |
|
|
const r=await fetch('/stt',{method:'POST', body: fd}); |
|
|
const j=await r.json(); |
|
|
if(j.text){ appendYou('[voice] '+j.text); } else appendYou('[stt error] '+JSON.stringify(j)); |
|
|
} |
|
|
function clearChat(){ document.getElementById('chat').innerHTML=''; lastAnswer=''; } |
|
|
async function modelCheck(){ |
|
|
document.getElementById('diag').innerText=' checking...'; |
|
|
try{ |
|
|
let r=await fetch('/model_check'); |
|
|
let j=await r.json(); |
|
|
document.getElementById('diag').innerText = ' ' + JSON.stringify(j).slice(0,200); |
|
|
loadStatus(); |
|
|
}catch(e){ document.getElementById('diag').innerText=' error'; } |
|
|
} |
|
|
loadStatus(); |
|
|
</script> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
|
|
|
@app.route("/", methods=["GET"]) |
|
|
def index(): |
|
|
return render_template_string(INDEX_HTML) |
|
|
|
|
|
@app.route("/health", methods=["GET"]) |
|
|
def health(): |
|
|
return jsonify({ |
|
|
"ok": True, |
|
|
"hf_token": bool(HF_TOKEN), |
|
|
"hf_model": SELECTED_MODEL or HF_MODEL or "", |
|
|
"hf_tts_model": HF_TTS_MODEL, |
|
|
"hf_stt_model": HF_STT_MODEL, |
|
|
"telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID), |
|
|
"conv_len": len(CONVERSATION), |
|
|
"display_len": len(DISPLAY_BUFFER) |
|
|
}) |
|
|
|
|
|
@app.route("/ask", methods=["POST"]) |
|
|
def route_ask(): |
|
|
try: |
|
|
j = request.get_json(force=True) or {} |
|
|
text = clean_text(j.get("text","") or "") |
|
|
lang = (j.get("lang","auto") or "auto") |
|
|
if not text: |
|
|
return jsonify({"error":"no text"}), 400 |
|
|
if lang == "vi": |
|
|
prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\n\n{text}" |
|
|
elif lang == "en": |
|
|
prompt = f"You are a helpful assistant. Answer in clear English, concise:\n\n{text}" |
|
|
else: |
|
|
prompt = f"You are a bilingual assistant (Vietnamese/English). Answer in the same language as the user, clearly and concisely:\n\n{text}" |
|
|
try: |
|
|
ans = hf_text_generate(prompt) |
|
|
except Exception as e: |
|
|
logger.exception("hf_text_generate failed") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
CONVERSATION.append((text, ans)) |
|
|
save_conv(text, ans) |
|
|
push_display("YOU: " + (text[:60])) |
|
|
push_display("BOT: " + (ans[:60] if isinstance(ans,str) else str(ans)[:60])) |
|
|
|
|
|
if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: |
|
|
try: |
|
|
telegram_send_message(TELEGRAM_CHAT_ID, f"You: {text}\nBot: {ans[:300]}") |
|
|
except Exception: |
|
|
logger.exception("telegram notify failed") |
|
|
return jsonify({"answer": ans}) |
|
|
except Exception as e: |
|
|
logger.exception("route_ask exception") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route("/tts", methods=["POST"]) |
|
|
def route_tts(): |
|
|
try: |
|
|
j = request.get_json(force=True) or {} |
|
|
text = clean_text(j.get("text","") or "") |
|
|
if not text: |
|
|
return jsonify({"error":"no text"}), 400 |
|
|
try: |
|
|
audio_bytes = hf_tts_get_bytes(text) |
|
|
except Exception as e: |
|
|
logger.exception("tts generation failed") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
return Response(audio_bytes, mimetype="audio/mpeg") |
|
|
except Exception as e: |
|
|
logger.exception("route_tts exception") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route("/stt", methods=["POST"]) |
|
|
def route_stt(): |
|
|
try: |
|
|
if "file" in request.files: |
|
|
f = request.files["file"] |
|
|
audio_bytes = f.read() |
|
|
else: |
|
|
audio_bytes = request.get_data() |
|
|
if not audio_bytes: |
|
|
return jsonify({"error":"no audio provided"}), 400 |
|
|
try: |
|
|
txt = hf_stt_from_bytes(audio_bytes) |
|
|
except Exception as e: |
|
|
logger.exception("STT failed") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
CONVERSATION.append((f"[voice] {txt}", "")) |
|
|
save_conv(f"[voice] {txt}", "") |
|
|
push_display("VOICE: " + (txt[:60] if isinstance(txt,str) else str(txt))) |
|
|
return jsonify({"text": txt}) |
|
|
except Exception as e: |
|
|
logger.exception("route_stt exception") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route("/presence", methods=["POST"]) |
|
|
def route_presence(): |
|
|
""" |
|
|
ESP32 radar should POST JSON {"note":"..."}. |
|
|
Server returns greeting audio (if TTS available) or JSON greeting. |
|
|
Also sends telegram notification if configured. |
|
|
""" |
|
|
try: |
|
|
j = request.get_json(force=True) or {} |
|
|
note = clean_text(j.get("note","Có người phía trước") or "Có người phía trước") |
|
|
greeting = f"Xin chào! {note}" |
|
|
CONVERSATION.append(("__presence__", greeting)) |
|
|
save_conv("__presence__", greeting) |
|
|
push_display("RADAR: " + note[:60]) |
|
|
if TELEGRAM_TOKEN and TELEGRAM_CHAT_ID: |
|
|
try: |
|
|
telegram_send_message(TELEGRAM_CHAT_ID, f"⚠️ Robot: Phát hiện người - {note}") |
|
|
except Exception: |
|
|
logger.exception("telegram notify failed") |
|
|
try: |
|
|
audio_bytes = hf_tts_get_bytes(greeting) |
|
|
return Response(audio_bytes, mimetype="audio/mpeg") |
|
|
except Exception: |
|
|
return jsonify({"greeting": greeting}) |
|
|
except Exception as e: |
|
|
logger.exception("route_presence exception") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route("/display", methods=["GET"]) |
|
|
def route_display(): |
|
|
return jsonify({"lines": DISPLAY_BUFFER.copy(), "conv_len": len(CONVERSATION)}) |
|
|
|
|
|
@app.route("/model_check", methods=["GET"]) |
|
|
def model_check(): |
|
|
""" |
|
|
Attempt to verify HF_MODEL / select fallback, returns diagnostic JSON. |
|
|
""" |
|
|
global SELECTED_MODEL |
|
|
|
|
|
results = {} |
|
|
try: |
|
|
|
|
|
if SELECTED_MODEL: |
|
|
results["selected_model"] = SELECTED_MODEL |
|
|
ok, info = test_model_working(SELECTED_MODEL) |
|
|
results["selected_ok"] = ok |
|
|
results["selected_info"] = info |
|
|
return jsonify(results) |
|
|
|
|
|
chosen = auto_select_model(HF_MODEL if HF_MODEL else None) |
|
|
if chosen: |
|
|
SELECTED_MODEL = chosen |
|
|
results["selected_model"] = chosen |
|
|
results["note"] = "Model selected" |
|
|
return jsonify(results) |
|
|
else: |
|
|
results["error"] = "No usable model found in candidates" |
|
|
return jsonify(results), 404 |
|
|
except Exception as e: |
|
|
logger.exception("model_check failed") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route("/config", methods=["GET","POST"]) |
|
|
def config(): |
|
|
""" |
|
|
GET returns current config. |
|
|
POST JSON can change HF_MODEL / HF_TTS_MODEL / HF_STT_MODEL at runtime (temporary). |
|
|
Example: {"hf_model":"...", "hf_tts_model":"..."} |
|
|
""" |
|
|
global HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, SELECTED_MODEL |
|
|
if request.method == "GET": |
|
|
return jsonify({"hf_model": HF_MODEL, "hf_tts_model": HF_TTS_MODEL, "hf_stt_model": HF_STT_MODEL, "selected_model": SELECTED_MODEL}) |
|
|
try: |
|
|
j = request.get_json(force=True) or {} |
|
|
changed = {} |
|
|
if "hf_model" in j: |
|
|
HF_MODEL = j["hf_model"] |
|
|
changed["hf_model"] = HF_MODEL |
|
|
SELECTED_MODEL = None |
|
|
if "hf_tts_model" in j: |
|
|
HF_TTS_MODEL = j["hf_tts_model"] |
|
|
changed["hf_tts_model"] = HF_TTS_MODEL |
|
|
if "hf_stt_model" in j: |
|
|
HF_STT_MODEL = j["hf_stt_model"] |
|
|
changed["hf_stt_model"] = HF_STT_MODEL |
|
|
return jsonify({"changed": changed}) |
|
|
except Exception as e: |
|
|
logger.exception("config post failed") |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
|
|
|
def startup_model_check(): |
|
|
global SELECTED_MODEL |
|
|
logger.info("Startup: checking/selecting model...") |
|
|
try: |
|
|
chosen = auto_select_model(HF_MODEL if HF_MODEL else None) |
|
|
if chosen: |
|
|
SELECTED_MODEL = chosen |
|
|
logger.info("Startup: selected model = %s", SELECTED_MODEL) |
|
|
else: |
|
|
logger.warning("Startup: no usable HF model found yet. Use /model_check or set HF_MODEL secret.") |
|
|
except Exception: |
|
|
logger.exception("startup_model_check failed") |
|
|
|
|
|
|
|
|
t_start = threading.Thread(target=startup_model_check, daemon=True) |
|
|
t_start.start() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("KC Robot AI v7.5 starting. PREF_HF_MODEL=%s HF_TTS=%s HF_STT=%s Telegram=%s", |
|
|
HF_MODEL or "(none)", HF_TTS_MODEL or "(none)", HF_STT_MODEL or "(none)", bool(TELEGRAM_TOKEN and TELEGRAM_CHAT_ID)) |
|
|
app.run(host="0.0.0.0", port=PORT) |
|
|
|