kc20ai / app.py
kcrobot20's picture
initial commit
84ad4e7 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
KCrobot AI — Vmax Final (voice-first)
- Default Gemini model: gemini-1.5-pro (fallbacks implemented)
- Read secrets from env (HF "New secret"):
GEMINI_API_KEY, GEMINI_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHAT_ID,
ELEVEN_API_KEY, ELEVEN_VOICE_ID, GOOGLE_SPEECH_LANG
- Voice-first: ESP32 uploads audio -> /chat_audio -> STT -> Gemini -> TTS -> MP3
- If STT libs missing, /chat_audio returns 501 (server-side STT optional)
- Endpoints:
GET / -> simple web UI (chat secondary)
POST /chat_text -> {"q":"...","voice":true}
POST /chat_audio -> upload wav (multipart 'file' or raw bytes)
POST /esp/send_text-> wrapper for /chat_text
GET /play_latest -> latest_reply.mp3
GET /_history -> recent history
POST /notify -> forward to Telegram
GET /health -> health check
"""
from __future__ import annotations
import os
import io
import json
import re
import time
import math
import logging
import threading
import pathlib
from datetime import datetime
from typing import Tuple, Dict, Any, Optional
from flask import Flask, request, jsonify, send_file, render_template_string
import requests
# --- Attempt to import Google GenAI SDK (new) or older lib
USE_GENAI_SDK = False
GENAI_CLIENT = None
try:
# new official package pattern: `from google import genai`
from google import genai # type: ignore
USE_GENAI_SDK = True
except Exception:
try:
import google.generativeai as genai # type: ignore
USE_GENAI_SDK = True
except Exception:
genai = None
USE_GENAI_SDK = False
# --- TTS/STT libs (optional)
try:
from gtts import gTTS # type: ignore
GTTS_AVAILABLE = True
except Exception:
GTTS_AVAILABLE = False
try:
import speech_recognition as sr # type: ignore
from pydub import AudioSegment # type: ignore
STT_AVAILABLE = True
except Exception:
sr = None # type: ignore
AudioSegment = None # type: ignore
STT_AVAILABLE = False
# -------------------------
# Config from env (HF New secret)
# -------------------------
CFG = {
"GEMINI_API_KEY": os.getenv("GEMINI_API_KEY", "").strip(),
"GEMINI_MODEL": os.getenv("GEMINI_MODEL", "gemini-1.5-pro").strip(),
"TELEGRAM_TOKEN": os.getenv("TELEGRAM_TOKEN", "").strip(),
"TELEGRAM_CHAT_ID": os.getenv("TELEGRAM_CHAT_ID", "").strip(),
"ELEVEN_API_KEY": os.getenv("ELEVEN_API_KEY", "").strip(),
"ELEVEN_VOICE_ID": os.getenv("ELEVEN_VOICE_ID", "").strip(),
"GOOGLE_SPEECH_LANG": os.getenv("GOOGLE_SPEECH_LANG", "vi-VN").strip(),
}
# Model fallback list: prefer configured model, then alternatives
MODEL_FALLBACK_LIST = [
CFG.get("GEMINI_MODEL") or "gemini-1.5-pro",
"gemini-1.5-flash",
"gemini-2.5-flash",
"gemini-2.5-pro",
]
# dedupe keep order
seen = set()
MODEL_FALLBACK = []
for m in MODEL_FALLBACK_LIST:
if m and m not in seen:
seen.add(m)
MODEL_FALLBACK.append(m)
# Setup genai client if SDK present and key present
GEMINI_KEY = CFG.get("GEMINI_API_KEY") or ""
if USE_GENAI_SDK and GEMINI_KEY:
try:
# Try new SDK client style
GENAI_CLIENT = genai.Client(api_key=GEMINI_KEY) # type: ignore
except Exception:
try:
# fallback older style configure
genai.configure(api_key=GEMINI_KEY) # type: ignore
GENAI_CLIENT = genai # type: ignore
except Exception:
GENAI_CLIENT = None
# -------------------------
# Storage & logging
# -------------------------
BASE = pathlib.Path.cwd()
DATA_DIR = BASE / "data"
DATA_DIR.mkdir(exist_ok=True)
USAGE_FILE = DATA_DIR / "usage.json"
HISTORY_FILE = DATA_DIR / "history.json"
CFG_SNAPSHOT = DATA_DIR / "cfg_snapshot.json"
LATEST_MP3 = DATA_DIR / "latest_reply.mp3"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kcrobot_vmax")
# save non-secret snapshot
try:
CFG_SNAPSHOT.write_text(json.dumps({k: bool(CFG.get(k)) for k in CFG}, indent=2), encoding="utf-8")
except Exception:
pass
# -------------------------
# Helpers: json safe, usage, history
# -------------------------
def load_json_safe(path: pathlib.Path, default):
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
logger.debug("load_json_safe error %s -> %s", path, e)
return default
def save_json_safe(path: pathlib.Path, data):
try:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return True
except Exception as e:
logger.exception("save_json_safe failed for %s: %s", path, e)
return False
def today_str():
return datetime.utcnow().strftime("%Y-%m-%d")
def load_usage():
return load_json_safe(USAGE_FILE, {"date": today_str(), "requests_today": 0, "tokens_month": 0})
def save_usage(u):
return save_json_safe(USAGE_FILE, u)
def increment_usage(tokens=1):
u = load_usage()
if u.get("date") != today_str():
u = {"date": today_str(), "requests_today": 0, "tokens_month": u.get("tokens_month", 0)}
u["requests_today"] = u.get("requests_today", 0) + 1
u["tokens_month"] = u.get("tokens_month", 0) + int(tokens)
save_usage(u)
def append_history(entry: dict):
h = load_json_safe(HISTORY_FILE, [])
h.append(entry)
if len(h) > 500:
h = h[-500:]
save_json_safe(HISTORY_FILE, h)
# -------------------------
# Language detection
# -------------------------
VIET_CHAR_RE = re.compile(
r"[àáạảãâầấậẩẫăằắặẳẵđèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹ]",
re.I,
)
def detect_lang(text: str) -> str:
if not text or not isinstance(text, str):
return "en"
if VIET_CHAR_RE.search(text):
return "vi"
low = text.lower()
eng_signs = ["hello", "what", "how", "please", "thank", "do you", "who", "where", "when", "why", "weather", "today"]
for w in eng_signs:
if w in low:
return "en"
return "en"
# -------------------------
# Gemini single model call (SDK preferred, REST fallback)
# -------------------------
def gemini_call_single(model: str, prompt: str, max_output_tokens: int = 1024, temperature: float = 0.2) -> Tuple[bool, str, int]:
"""
Try calling a single model. Return (ok, text_or_error, http_status_or_0)
"""
# SDK path
if GENAI_CLIENT:
try:
if hasattr(GENAI_CLIENT, "models") and hasattr(GENAI_CLIENT.models, "generate_content"):
resp = GENAI_CLIENT.models.generate_content(model=model, contents=prompt,
max_output_tokens=max_output_tokens, temperature=temperature) # type: ignore
txt = getattr(resp, "text", None)
if txt:
return True, txt, 200
return True, str(resp), 200
if hasattr(GENAI_CLIENT, "generate_content"):
resp = GENAI_CLIENT.generate_content(prompt, max_output_tokens=max_output_tokens, temperature=temperature)
if hasattr(resp, "text") and resp.text:
return True, resp.text, 200
return True, str(resp), 200
except requests.exceptions.HTTPError as he:
try:
code = he.response.status_code
except Exception:
code = 0
return False, str(he), code
except Exception as e:
return False, str(e), 0
# REST fallback
key = CFG.get("GEMINI_API_KEY") or ""
if not key:
return False, "Gemini API key not configured", 0
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generate"
headers = {"Content-Type": "application/json"}
payload = {
"prompt": {
"messages": [
{"author": "system", "content": {"text": "You are a helpful assistant."}},
{"author": "user", "content": {"text": prompt}},
]
},
"maxOutputTokens": max_output_tokens,
"temperature": temperature,
}
try:
r = requests.post(url, params={"key": key}, json=payload, headers=headers, timeout=30)
status = r.status_code
if status == 404:
return False, f"404 model not found: {model}", 404
if status >= 400:
return False, f"HTTP {status}: {r.text}", status
j = r.json()
# try parse candidates
cand = j.get("candidates")
if cand and isinstance(cand, list):
c0 = cand[0]
content = c0.get("content")
if isinstance(content, list):
parts = []
for c in content:
if isinstance(c, dict) and "text" in c:
parts.append(c["text"])
if parts:
return True, "".join(parts), status
if isinstance(j, dict) and "output" in j and isinstance(j["output"], str):
return True, j["output"], status
return True, json.dumps(j)[:2000], status
except requests.exceptions.HTTPError as he:
try:
code = he.response.status_code
except Exception:
code = 0
return False, str(he), code
except Exception as e:
return False, str(e), 0
def call_gemini_with_fallbacks(prompt: str, max_output_tokens: int = 1024, temperature: float = 0.2) -> Dict[str, Any]:
"""
Try models in MODEL_FALLBACK sequence; return dict with ok/text/model or error.
"""
if not CFG.get("GEMINI_API_KEY"):
return {"ok": False, "error": "Gemini API key not configured (set GEMINI_API_KEY in New secret)"}
last_error = None
for model in MODEL_FALLBACK:
if not model:
continue
ok, text_or_err, status = gemini_call_single(model, prompt, max_output_tokens, temperature)
if ok:
return {"ok": True, "text": text_or_err, "model": model}
last_error = {"model": model, "status": status, "error": text_or_err}
logger.warning("Model %s failed: %s (status=%s)", model, text_or_err, status)
# continue to next for robustness; some errors (403) may persist but try anyway
return {"ok": False, "error": f"All models failed. Last: {last_error}", "last": last_error}
# -------------------------
# TTS: ElevenLabs optional -> gTTS fallback
# -------------------------
def tts_elevenlabs_bytes(text: str, voice_id: str, api_key: str) -> bytes:
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
headers = {"xi-api-key": api_key, "Content-Type": "application/json"}
payload = {"text": text, "voice_settings": {"stability": 0.6, "similarity_boost": 0.75}}
r = requests.post(url, json=payload, headers=headers, timeout=30)
r.raise_for_status()
return r.content
def tts_gtts_bytes(text: str, lang: str = "vi") -> bytes:
if not GTTS_AVAILABLE:
raise RuntimeError("gTTS not available in environment")
t = gTTS(text=text, lang=lang)
bio = io.BytesIO()
t.write_to_fp(bio)
bio.seek(0)
return bio.read()
def synthesize_and_save(answer: str, lang_hint: str = "vi") -> Tuple[bool, str]:
try:
mp3_bytes = None
if CFG.get("ELEVEN_API_KEY") and CFG.get("ELEVEN_VOICE_ID"):
try:
mp3_bytes = tts_elevenlabs_bytes(answer, CFG["ELEVEN_VOICE_ID"], CFG["ELEVEN_API_KEY"])
logger.info("TTS: used ElevenLabs")
except Exception:
logger.exception("ElevenLabs TTS failed; falling back to gTTS")
mp3_bytes = None
if mp3_bytes is None:
lang_code = "vi" if lang_hint and str(lang_hint).startswith("vi") else "en"
mp3_bytes = tts_gtts_bytes(answer, lang=lang_code)
logger.info("TTS: used gTTS")
LATEST_MP3.write_bytes(mp3_bytes)
return True, str(LATEST_MP3)
except Exception as e:
logger.exception("synthesize_and_save failed")
return False, f"TTS error: {e}"
# -------------------------
# STT: server-side speech-to-text (optional)
# -------------------------
def speech_to_text(wav_bytes: bytes) -> Tuple[bool, str]:
if not STT_AVAILABLE:
return False, "STT not available on server (SpeechRecognition & pydub missing)"
try:
recognizer = sr.Recognizer()
with sr.AudioFile(io.BytesIO(wav_bytes)) as src:
audio = recognizer.record(src)
text = recognizer.recognize_google(audio, language=CFG.get("GOOGLE_SPEECH_LANG", "vi-VN"))
return True, text
except Exception as e:
logger.exception("speech_to_text failed")
return False, str(e)
# -------------------------
# Telegram helper (optional)
# -------------------------
def send_telegram_message(text: str) -> bool:
token = CFG.get("TELEGRAM_TOKEN") or ""
chat = CFG.get("TELEGRAM_CHAT_ID") or ""
if not token or not chat:
logger.debug("Telegram not configured")
return False
try:
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, json={"chat_id": chat, "text": text}, timeout=8)
return True
except Exception:
logger.exception("send_telegram_message failed")
return False
# -------------------------
# Flask app & minimal UI (chat is secondary)
# -------------------------
app = Flask(__name__)
INDEX_HTML = """
<!doctype html>
<html>
<head><meta charset="utf-8"><title>KCrobot AI — Vmax (Voice-first)</title>
<style>
body{font-family:Arial;background:#071025;color:#fff;padding:18px}
.container{max-width:900px;margin:0 auto}
textarea{width:100%;padding:10px;border-radius:8px;background:#061427;color:#fff;border:1px solid #133}
button{padding:10px 14px;border-radius:8px;background:#0ea5ff;color:#012;border:none;cursor:pointer}
#resp{white-space:pre-wrap;margin-top:12px;background:#021220;padding:12px;border-radius:6px}
.small{font-size:0.9rem;color:#9fb3c8}
</style></head><body>
<div class="container">
<h1>🤖 KCrobot AI — Vmax (Voice-first)</h1>
<p class="small">Model fallback: {{models}} — Gemini key: {{gemini}} — Telegram: {{tg}}</p>
<p>Giao diện chat là phụ — ưu tiên voice (ESP32 gửi audio). Bạn có thể thử gõ "Xin chào" để nghe trả lời.</p>
<textarea id="q" rows="4" placeholder="Nhập tiếng Việt / English..."></textarea>
<p><label><input id="voice" type="checkbox" checked> Voice ON</label>
<button onclick="send()">Gửi & Nghe</button></p>
<div id="resp"></div>
<audio id="audio" controls style="display:none"></audio>
<script>
async function send(){
const q=document.getElementById('q').value;
const voice=document.getElementById('voice').checked;
if(!q){ alert('Nhập nội dung'); return; }
document.getElementById('resp').innerText='⏳ Đang xử lý...';
const res = await fetch('/chat_text', {
method:'POST', headers:{'Content-Type':'application/json'},
body: JSON.stringify({q:q, voice:voice})
});
const j = await res.json();
if(j.error){ document.getElementById('resp').innerText='Error: '+j.error; return; }
document.getElementById('resp').innerText = j.answer;
if(j.play_url){
const audio=document.getElementById('audio');
audio.src = j.play_url + '?t=' + Date.now();
audio.style.display='block';
audio.play();
}
}
</script>
</div></body></html>
"""
@app.route("/", methods=["GET"])
def root():
models = ", ".join(MODEL_FALLBACK)
gemini_set = bool(CFG.get("GEMINI_API_KEY"))
tg = bool(CFG.get("TELEGRAM_TOKEN") and CFG.get("TELEGRAM_CHAT_ID"))
return render_template_string(INDEX_HTML, models=models, gemini=("✅" if gemini_set else "❌"), tg=("✅" if tg else "❌"))
@app.route("/_history", methods=["GET"])
def history_endpoint():
h = load_json_safe(HISTORY_FILE, [])
return jsonify(h[-50:])
@app.route("/chat_text", methods=["POST"])
def chat_text():
data = request.get_json(silent=True) or {}
q = data.get("q") or data.get("question") or ""
voice_on = bool(data.get("voice", True))
if not q or not str(q).strip():
return jsonify({"error": "missing 'q'"}), 400
lang = detect_lang(q)
prompt = (f"Bạn là robot trợ lý KCrobot, trả lời bằng tiếng Việt tự nhiên: {q}" if lang == "vi"
else f"You are KCrobot assistant. Answer naturally in English: {q}")
gem = call_gemini_with_fallbacks(prompt)
if not gem.get("ok"):
answer = f"[Gemini error] {gem.get('error')}"
logger.warning("Gemini failed: %s", gem.get("error"))
else:
answer = gem.get("text", "")
increment_usage(max(1, len(answer.split())))
append_history({"ts": time.time(), "q": q, "answer": answer, "lang": lang, "model": gem.get("model")})
play_url = None
if voice_on:
ok, path_or_err = synthesize_and_save(answer, lang_hint=lang)
if ok:
play_url = "/play_latest"
try:
threading.Thread(target=send_telegram_message, args=(f"Q: {q}\nA: {answer}",)).start()
except Exception:
pass
resp = {"answer": answer}
if play_url:
resp["play_url"] = play_url
return jsonify(resp)
@app.route("/esp/send_text", methods=["POST"])
def esp_send_text():
return chat_text()
@app.route("/chat_audio", methods=["POST"])
def chat_audio():
wav_bytes = None
if 'file' in request.files:
f = request.files['file']
wav_bytes = f.read()
else:
wav_bytes = request.get_data()
if not wav_bytes:
return jsonify({"error": "no audio provided"}), 400
try:
ts = int(time.time())
(DATA_DIR / f"uploaded_{ts}.wav").write_bytes(wav_bytes)
except Exception:
logger.exception("saving uploaded wav failed")
if not STT_AVAILABLE:
return jsonify({"error": "STT not available on server (SpeechRecognition & pydub missing)"}), 501
ok, text_or_err = speech_to_text(wav_bytes)
if not ok:
return jsonify({"error": "STT failed", "details": text_or_err}), 500
text = text_or_err
lang = detect_lang(text)
prompt = (f"Bạn là robot trợ lý KCrobot, trả lời bằng tiếng Việt tự nhiên: {text}" if lang == "vi"
else f"You are KCrobot assistant. Answer naturally in English: {text}")
gem = call_gemini_with_fallbacks(prompt)
if not gem.get("ok"):
answer = f"[Gemini error] {gem.get('error')}"
logger.warning("Gemini failed on audio: %s", gem.get("error"))
else:
answer = gem.get("text", "")
synth_ok, synth_path = synthesize_and_save(answer, lang_hint=lang)
append_history({"ts": time.time(), "q": text, "answer": answer, "lang": lang, "model": gem.get("model")})
try:
threading.Thread(target=send_telegram_message, args=(f"Q(STT): {text}\nA: {answer}",)).start()
except Exception:
pass
resp = {"question": text, "answer": answer}
if synth_ok:
resp["play_url"] = "/play_latest"
return jsonify(resp)
@app.route("/play_latest", methods=["GET"])
def play_latest():
if not LATEST_MP3.exists():
return jsonify({"error": "no audio"}), 404
return send_file(str(LATEST_MP3), mimetype="audio/mpeg")
@app.route("/notify", methods=["POST"])
def notify():
data = request.get_json(silent=True) or {}
event = data.get("event", "event")
msg = data.get("msg", "")
try:
threading.Thread(target=send_telegram_message, args=(f"[Robot Notify] {event}: {msg}",)).start()
except Exception:
pass
return jsonify({"sent": True})
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"time": time.time(),
"gemini_key_present": bool(CFG.get("GEMINI_API_KEY")),
"model_list": MODEL_FALLBACK,
"stt_available": STT_AVAILABLE,
"gtts_available": GTTS_AVAILABLE,
"sdk_present": USE_GENAI_SDK
})
# -------------------------
# Start server
# -------------------------
if __name__ == "__main__":
load_json_safe(HISTORY_FILE, [])
load_json_safe(USAGE_FILE, {})
logger.info("KCrobot Vmax starting. Gemini key present: %s, SDK present: %s, STT available: %s",
bool(CFG.get("GEMINI_API_KEY")), USE_GENAI_SDK, STT_AVAILABLE)
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")), debug=False)