kcai / app.py
kcrobot40's picture
Update
4e16848 verified
# app.py — KC Robot AI v5.5 FINAL
# Flask server for Hugging Face Space
# - Requirements: see requirements.txt
# - Secrets expected: HF_API_TOKEN (required), optional: HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHATID
import os
import io
import time
import json
import base64
import threading
import logging
from typing import Optional
from pathlib import Path
import requests
from flask import Flask, request, jsonify, render_template_string
# Fallback TTS
from gtts import gTTS
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kcrobot.v5.5.final")
app = Flask(__name__)
# Config / Secrets (set in Space -> Settings -> Secrets)
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
HF_MODEL = os.getenv("HF_MODEL", "bkai-foundation-models/vietnamese-llama2-7b").strip()
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "doanthang/vietTTS-southern-female").strip() # optional public HF TTS
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small").strip() # optional
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip()
TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip()
HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {}
# Temp storage for history
TMP_DIR = Path("/tmp/kcrobot")
TMP_DIR.mkdir(parents=True, exist_ok=True)
HISTORY_FILE = TMP_DIR / "history.json"
def read_history():
try:
if HISTORY_FILE.exists():
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
logger.exception("read_history")
return []
def append_history(user_text, bot_text):
rec = {"user": user_text, "bot": bot_text, "ts": time.time()}
data = read_history()
data.append(rec)
try:
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
logger.exception("append_history")
def clear_history():
try:
if HISTORY_FILE.exists():
HISTORY_FILE.unlink()
except Exception:
logger.exception("clear_history")
# Language detection heuristic (Vietnamese characters)
VI_CHARS = set("ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọúùủũụýỳỷỹỵ")
def detect_lang(text: str) -> str:
if not text:
return "en"
for ch in text.lower():
if ch in VI_CHARS:
return "vi"
return "en"
# ---------------- Hugging Face helpers ----------------
def hf_post_json(model_id: str, payload: dict, timeout: int = 120):
if not HF_API_TOKEN:
raise RuntimeError("HF_API_TOKEN not set in Space Secrets.")
url = f"https://api-inference.huggingface.co/models/{model_id}"
headers = {**HF_HEADERS, "Content-Type": "application/json"}
r = requests.post(url, headers=headers, json=payload, timeout=timeout)
if not r.ok:
logger.warning("HF json POST %s returned %s: %s", model_id, r.status_code, r.text[:300])
r.raise_for_status()
try:
return r.json()
except Exception:
return r.content
def hf_post_bytes(model_id: str, bytes_data: bytes, content_type: str = "application/octet-stream", timeout: int = 180):
if not HF_API_TOKEN:
raise RuntimeError("HF_API_TOKEN not set in Space Secrets.")
url = f"https://api-inference.huggingface.co/models/{model_id}"
headers = dict(HF_HEADERS)
headers["Content-Type"] = content_type
r = requests.post(url, headers=headers, data=bytes_data, timeout=timeout)
if not r.ok:
logger.warning("HF bytes POST %s returned %s: %s", model_id, r.status_code, r.text[:300])
r.raise_for_status()
return r
def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str:
model = model or HF_MODEL
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)},
"options": {"wait_for_model": True}
}
out = hf_post_json(model, payload, timeout=120)
# parse common shapes
try:
if isinstance(out, list) and len(out) and isinstance(out[0], dict):
return out[0].get("generated_text") or out[0].get("text") or str(out[0])
if isinstance(out, dict):
if "generated_text" in out:
return out.get("generated_text")
if "text" in out:
return out.get("text")
# some models return choices...
if "choices" in out and isinstance(out["choices"], list) and out["choices"]:
c = out["choices"][0]
return c.get("text") or c.get("message", {}).get("content", "") or str(c)
return str(out)
except Exception:
logger.exception("hf_text_generate parse")
return str(out)
def hf_tts_bytes(text: str, model: Optional[str] = None) -> Optional[bytes]:
model = model or HF_TTS_MODEL
if not model:
return None
try:
payload = {"inputs": text}
url = f"https://api-inference.huggingface.co/models/{model}"
r = requests.post(url, headers={**HF_HEADERS, "Content-Type": "application/json"}, json=payload, timeout=120)
if r.ok:
return r.content
else:
logger.warning("hf_tts_bytes returned %s: %s", r.status_code, r.text[:200])
return None
except Exception:
logger.exception("hf_tts_bytes")
return None
def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str:
model = model or HF_STT_MODEL
r = hf_post_bytes(model, audio_bytes, content_type="application/octet-stream", timeout=180)
try:
j = r.json()
if isinstance(j, dict) and "text" in j:
return j["text"]
if isinstance(j, list) and len(j) and isinstance(j[0], dict) and "text" in j[0]:
return j[0]["text"]
return str(j)
except Exception:
return r.text if hasattr(r, "text") else ""
# ---------------- TTS fallback using gTTS ----------------
def tts_gtts_base64(text: str, lang: str = "vi") -> str:
try:
tts = gTTS(text=text, lang=lang)
bio = io.BytesIO()
tts.write_to_fp(bio)
bio.seek(0)
return base64.b64encode(bio.read()).decode("ascii")
except Exception:
logger.exception("tts_gtts_base64 failed")
return ""
def tts_get_audio_for_text(text: str, detected_lang: str = "vi"):
# 1. try HF TTS model if configured
audio_bytes = None
if HF_TTS_MODEL:
audio_bytes = hf_tts_bytes(text, HF_TTS_MODEL)
if audio_bytes:
return {"audio_base64": base64.b64encode(audio_bytes).decode("ascii"), "mime": "audio/mpeg"}
# 2. fallback to gTTS for vi/en
lang = "vi" if detected_lang == "vi" else "en"
b64 = tts_gtts_base64(text, lang=lang)
if b64:
return {"audio_base64": b64, "mime": "audio/mpeg"}
return {"audio_base64": "", "mime": ""}
# ---------------- Telegram ----------------
def send_telegram_message(text: str):
if not (TELEGRAM_TOKEN and TELEGRAM_CHATID):
return False
try:
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
r = requests.post(url, json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10)
if not r.ok:
logger.warning("Telegram send failed: %s %s", r.status_code, r.text[:200])
return r.ok
except Exception:
logger.exception("send_telegram_message")
return False
def telegram_poll_loop():
if not TELEGRAM_TOKEN:
logger.info("telegram poll disabled")
return
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
offset = None
logger.info("Starting telegram poller")
while True:
try:
params = {"timeout": 30}
if offset:
params["offset"] = offset
r = requests.get(base + "/getUpdates", params=params, timeout=35)
if not r.ok:
time.sleep(2)
continue
j = r.json()
for upd in j.get("result", []):
offset = upd.get("update_id", 0) + 1
msg = upd.get("message") or {}
chat = msg.get("chat", {})
chat_id = chat.get("id")
text = (msg.get("text") or "").strip()
if not text:
continue
logger.info("TG msg %s: %s", chat_id, text)
low = text.lower()
if low.startswith("/ask "):
q = text[5:].strip()
ans = hf_text_generate(q)
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10)
except Exception:
logger.exception("telegram reply failed")
elif low.startswith("/say "):
phrase = text[5:].strip()
# TTS and send audio
try:
audio = hf_tts_bytes(phrase) or base64.b64decode(tts_gtts_base64(phrase, lang="vi" if detect_lang(phrase)=="vi" else "en"))
files = {"audio": ("say.mp3", audio, "audio/mpeg")}
requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30)
except Exception:
logger.exception("telegram say failed")
elif low.startswith("/status"):
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot brain running."}, timeout=10)
except Exception:
pass
else:
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10)
except Exception:
pass
except Exception:
logger.exception("telegram poll loop error")
time.sleep(3)
# start telegram poller thread
if TELEGRAM_TOKEN:
try:
t = threading.Thread(target=telegram_poll_loop, daemon=True)
t.start()
except Exception:
logger.exception("start telegram thread failed")
# ---------------- Web UI HTML ----------------
INDEX_HTML = """
<!doctype html><html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>KC Robot AI v5.5 Final</title>
<style>
body{font-family:Arial;background:#06111a;color:#dff; padding:12px}
.container{max-width:980px;margin:auto}
#chat{background:#04101a;padding:10px;border-radius:8px;height:420px;overflow:auto;border:1px solid #223344}
.user{color:#bfe7ff;text-align:right;margin:6px}
.bot{color:#dfffdc;text-align:left;margin:6px}
.controls{display:flex;gap:8px;margin-top:8px}
input[type=text]{flex:1;padding:10px;border-radius:8px;border:1px solid #223344;background:#021427;color:#e6eef6}
button{padding:10px 12px;border-radius:8px;border:none;background:#0ea5a4;color:#fff;cursor:pointer}
small{color:#99a0b0}
</style></head><body>
<div class="container">
<h2>🤖 KC Robot AI v5.5 — Final (Miền Nam voice, song ngữ)</h2>
<div id="chat"></div>
<div class="controls">
<input id="txt" placeholder="Gõ câu hỏi (VN/EN) hoặc bấm Ghi..." type="text"/>
<button id="sendBtn">Gửi</button>
<button id="recBtn">🎙 Ghi</button>
<button id="greetBtn">▶ Chào</button>
<button id="historyBtn">🗂 Lịch sử</button>
</div>
<audio id="player" controls style="width:100%;margin-top:10px"></audio>
<p><small>Secrets: HF_API_TOKEN (required). Optionals: HF_MODEL, HF_TTS_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHATID</small></p>
</div>
<script>
let mediaRecorder, audioChunks=[];
const chat=document.getElementById('chat'), player=document.getElementById('player');
function appendUser(t){ chat.innerHTML += '<div class="user"><b>You:</b> '+escapeHtml(t)+'</div>'; chat.scrollTop = chat.scrollHeight; }
function appendBot(t){ chat.innerHTML += '<div class="bot"><b>Robot:</b> '+escapeHtml(t)+'</div>'; chat.scrollTop = chat.scrollHeight; }
function escapeHtml(s){ return String(s).replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;'); }
document.getElementById('sendBtn').onclick = async ()=>{
const v = document.getElementById('txt').value.trim(); if(!v) return;
appendUser(v); document.getElementById('txt').value='';
const res = await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({text:v})});
const j = await res.json();
const ans = j.answer || j.error || 'No answer';
appendBot(ans);
if(j.audio_base64){
const blob = base64ToBlob(j.audio_base64, j.mime || 'audio/mpeg');
const url = URL.createObjectURL(blob); player.src = url; player.play();
}
};
document.getElementById('greetBtn').onclick = async ()=>{
const r = await fetch('/presence',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({note:'Xin chào chủ nhân'})});
const j = await r.json();
appendBot(j.greeting || j.error || '');
if(j.audio_base64){ const blob = base64ToBlob(j.audio_base64, j.mime||'audio/mpeg'); player.src = URL.createObjectURL(blob); player.play(); }
else if(j.music_url){ player.src = j.music_url; player.play(); }
};
document.getElementById('historyBtn').onclick = async ()=>{
const r = await fetch('/history'); const j = await r.json(); chat.innerHTML=''; j.forEach(it=>{ appendUser(it.user); appendBot(it.bot); });
};
document.getElementById('recBtn').onclick = async ()=>{
if(mediaRecorder && mediaRecorder.state === 'recording'){ mediaRecorder.stop(); return; }
if(!navigator.mediaDevices) return alert('No mic support');
try{
const stream = await navigator.mediaDevices.getUserMedia({audio:true});
mediaRecorder = new MediaRecorder(stream);
audioChunks=[];
mediaRecorder.ondataavailable = e => audioChunks.push(e.data);
mediaRecorder.onstop = async ()=>{
const blob = new Blob(audioChunks, {type:'audio/webm'});
const fd = new FormData(); fd.append('file', blob, 'rec.webm');
const r = await fetch('/stt',{method:'POST', body: fd});
const j = await r.json();
if(j.text){
appendUser('[voice] '+ j.text);
const res = await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body: JSON.stringify({text: j.text})});
const aj = await res.json(); const ans = aj.answer || aj.error || 'No answer';
appendBot(ans);
if(aj.audio_base64){ const blob2 = base64ToBlob(aj.audio_base64, aj.mime||'audio/mpeg'); player.src = URL.createObjectURL(blob2); player.play();}
} else { appendBot('[STT error] '+JSON.stringify(j)); }
};
mediaRecorder.start(); document.getElementById('recBtn').textContent='■ Dừng';
} catch(e){ alert('Mic error: '+e); }
};
function base64ToBlob(b64, mime){ const bytes = atob(b64); let len = bytes.length; const buf = new Uint8Array(len); for(let i=0;i<len;i++) buf[i]=bytes.charCodeAt(i); return new Blob([buf], {type:mime}); }
</script>
</body></html>
"""
# ---------------- Endpoints ----------------
@app.route("/", methods=["GET"])
def index():
return render_template_string(INDEX_HTML)
@app.route("/config", methods=["GET"])
def get_config():
return jsonify({
"hf_token": bool(HF_API_TOKEN),
"hf_model": HF_MODEL,
"hf_tts_model": HF_TTS_MODEL,
"hf_stt_model": HF_STT_MODEL,
"telegram": bool(TELEGRAM_TOKEN and TELEGRAM_CHATID)
})
@app.route("/ask", methods=["POST"])
def ask_route():
data = request.get_json(force=True, silent=True) or {}
text = (data.get("text") or "").strip()
if not text:
return jsonify({"error":"no text"}), 400
lang = detect_lang(text)
if lang == "vi":
prompt = f"Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\\n\\n{text}"
else:
prompt = f"You are a helpful assistant. Answer in clear English:\\n\\n{text}"
try:
answer = hf_text_generate(prompt)
except Exception as e:
logger.exception("hf_text_generate error")
return jsonify({"error": str(e)}), 500
append_history(text, answer)
# prepare audio
tts = tts_get_audio_for_text(answer, detected_lang=lang)
result = {"answer": answer}
result.update(tts)
return jsonify(result)
@app.route("/tts", methods=["POST"])
def tts_route():
data = request.get_json(force=True, silent=True) or {}
text = (data.get("text") or "").strip()
if not text:
return jsonify({"error":"no text"}), 400
lang = detect_lang(text)
return jsonify(tts_get_audio_for_text(text, detected_lang=lang))
@app.route("/stt", methods=["POST"])
def stt_route():
try:
if "file" in request.files:
f = request.files["file"]
audio_bytes = f.read()
else:
audio_bytes = request.get_data() or b""
if not audio_bytes:
return jsonify({"error":"no audio"}), 400
try:
txt = hf_stt_from_bytes(audio_bytes)
except Exception as e:
logger.exception("hf_stt failed")
return jsonify({"error": str(e)}), 500
return jsonify({"text": txt})
except Exception:
logger.exception("stt_route")
return jsonify({"error":"stt internal error"}), 500
@app.route("/presence", methods=["POST"])
def presence_route():
data = request.get_json(force=True, silent=True) or {}
note = (data.get("note") or "Có người đến gần robot").strip()
greeting_vi = f"Xin chào! {note}"
greeting_en = "Hello! Someone is near the robot."
combined = f"{greeting_vi}\\n{greeting_en}"
append_history("__presence__", combined)
# prepare greeting audio
tts = tts_get_audio_for_text(greeting_vi, detected_lang="vi")
# telegram notify
if TELEGRAM_TOKEN and TELEGRAM_CHATID:
try:
send_telegram_message("⚠️ Robot phát hiện: " + note)
except Exception:
logger.exception("telegram notify failed")
resp = {"greeting": combined}
if tts.get("audio_base64"):
resp.update(tts)
else:
# if no TTS available, return a sample music url (client can play)
resp["music_url"] = os.getenv("HF_MUSIC_URL", "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3")
return jsonify(resp)
@app.route("/history", methods=["GET"])
def history_route():
return jsonify(read_history())
@app.route("/clear_history", methods=["POST"])
def clear_history_route():
clear_history()
return jsonify({"cleared": True})
# startup warmup
def warmup():
logger.info("Warmup: attempting lightweight calls (non-blocking)")
def _w():
try:
if HF_API_TOKEN:
try:
hf_text_generate("Xin chào. Hãy trả lời ngắn gọn: Xin chào!")
except Exception:
pass
try:
if HF_TTS_MODEL:
hf_tts_bytes("Xin chào chủ nhân")
except Exception:
pass
except Exception:
logger.exception("warmup errors")
threading.Thread(target=_w, daemon=True).start()
@app.before_first_request
def before_first():
warmup()
if __name__ == "__main__":
logger.info("Starting KC Robot AI v5.5 FINAL")
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))