"""
RobotAI v9.9 — Gemini Brain + gTTS + Telegram + ESP32 API + Hugging Face
Cập nhật 2025-11
Features:
- Web UI (song ngữ vi/en)
- ESP32 endpoints: /chat, /tts, /stt
- Telegram integration (polling)
- Gemini cloud AI + gTTS speech
"""
import os
import json
import uuid
import re
import time
import logging
import threading
import base64
from flask import Flask, request, jsonify, render_template_string, send_file, redirect, url_for
# Gemini SDK
import google.generativeai as genai
# gTTS for TTS (female-like voice)
from gtts import gTTS
# Telegram support
try:
from telegram import Bot
from telegram.ext import ApplicationBuilder, MessageHandler, CommandHandler, filters
TELEGRAM_LIB = "v20"
except Exception:
try:
from telegram import Bot
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler
TELEGRAM_LIB = "v13"
except Exception:
TELEGRAM_LIB = None
# ---------------- Config ----------------
CONFIG_FILE = "config.json"
AUDIO_DIR = "audio_cache"
os.makedirs(AUDIO_DIR, exist_ok=True)
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("RobotAI")
DEFAULT_MODEL = "gemini-2.5-flash"
USE_GEMINI = False
# ----------------- Config helpers -----------------
def load_config():
cfg = {
"GEMINI_API_KEY": os.environ.get("GEMINI_API_KEY", ""),
"GEMINI_MODEL": os.environ.get("GEMINI_MODEL", DEFAULT_MODEL),
"TELEGRAM_TOKEN": os.environ.get("TELEGRAM_TOKEN", ""),
"TELEGRAM_CHAT_ID": os.environ.get("TELEGRAM_CHAT_ID", "")
}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
cfg.update(data)
except Exception:
logger.exception("Load config error")
return cfg
def save_config(cfg):
try:
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
return True
except Exception:
logger.exception("Save config failed")
return False
# ----------------- Gemini init + wrapper -----------------
def init_gemini():
global USE_GEMINI
cfg = load_config()
key = cfg.get("GEMINI_API_KEY") or ""
if not key:
logger.warning("Gemini API key missing.")
USE_GEMINI = False
return
try:
genai.configure(api_key=key)
USE_GEMINI = True
logger.info("✅ Gemini connected OK.")
except Exception:
USE_GEMINI = False
logger.exception("Gemini init error")
init_gemini()
def gemini_answer(prompt: str) -> str:
cfg = load_config()
model = cfg.get("GEMINI_MODEL", DEFAULT_MODEL)
try:
if hasattr(genai, "GenerativeModel"):
m = genai.GenerativeModel(model)
resp = m.generate_content(prompt)
return getattr(resp, "text", str(resp))
elif hasattr(genai, "responses") and hasattr(genai.responses, "create"):
r = genai.responses.create(model=model, input=prompt)
return getattr(r, "output_text", str(r))
except Exception:
logger.exception("Gemini call error")
return "⚠️ Gemini không phản hồi — kiểm tra API key / library."
# ----------------- Language detection & TTS -----------------
VIET_CHARS = "ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọốồổỗộớờởỡợúùủũụưứừửữựýỳỷỹỵ"
def detect_lang(text: str) -> str:
return "vi" if any(ch in VIET_CHARS for ch in text.lower()) else "en"
def clean_text_for_tts(text: str) -> str:
if not text:
return "Xin chào"
cleaned = re.sub(r"[.,!?;:()\"'“”‘’\[\]{}<>\/\\\|@#\$%\^&\*\+=~`–—\-]", " ", text)
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned or "Xin chào"
def speak_text(text: str, lang: str = "vi") -> str:
try:
tts_text = clean_text_for_tts(text)
fname = f"tts_{uuid.uuid4().hex[:8]}.mp3"
path = os.path.join(AUDIO_DIR, fname)
tts = gTTS(text=tts_text, lang=lang, slow=False)
tts.save(path)
return path
except Exception:
logger.exception("TTS generation failed")
return None
def cleanup_audio_older_than(seconds: int = 3600):
now = time.time()
for f in os.listdir(AUDIO_DIR):
p = os.path.join(AUDIO_DIR, f)
try:
if os.path.isfile(p) and (now - os.path.getmtime(p) > seconds):
os.remove(p)
except Exception:
pass
# ----------------- Web UI -----------------
INDEX_HTML = """
RobotAI v9.9
🤖 RobotAI v9.9 — Gemini Brain + Voice + Telegram
Gemini:
{{ gemini_status }} | Model:
{{ model }} |
Config
"""
CONFIG_HTML = """Config
⚙️ Config RobotAI
⬅ Trở về
"""
# ----------------- Routes -----------------
@app.route("/")
def home():
cfg = load_config()
return render_template_string(INDEX_HTML,
gemini_status="✅ Kết nối" if USE_GEMINI else "❌ Chưa kết nối",
model=cfg.get("GEMINI_MODEL"))
@app.route("/config", methods=["GET","POST"])
def config_page():
if request.method == "POST":
data = {k: request.form.get(k,"").strip() for k in ["GEMINI_API_KEY","GEMINI_MODEL","TELEGRAM_TOKEN","TELEGRAM_CHAT_ID"]}
save_config(data); init_gemini()
try: start_telegram_bot_thread()
except: logger.exception("start telegram thread failed")
return redirect(url_for("config_page"))
return render_template_string(CONFIG_HTML, **load_config())
@app.route("/api/chat", methods=["POST"])
def api_chat():
payload = request.get_json(force=True)
text = (payload.get("text") or "").strip()
if not text: return jsonify({"error":"empty"}),400
lang = detect_lang(text)
try:
reply = gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+text) if USE_GEMINI else "⚠️ Chưa kết nối Gemini."
except Exception:
logger.exception("gemini call"); reply="⚠️ Lỗi khi gọi Gemini."
tts_path=None
try:
if reply: tts_path=speak_text(reply,lang)
except: logger.exception("tts failed")
tts_url=f"/api/tts/{os.path.basename(tts_path)}" if tts_path else None
threading.Thread(target=cleanup_audio_older_than,daemon=True).start()
return jsonify({"reply":reply,"tts_url":tts_url})
@app.route("/api/tts/")
def get_tts(fname):
path=os.path.join(AUDIO_DIR,fname)
if not os.path.exists(path): return jsonify({"error":"not found"}),404
return send_file(path,mimetype="audio/mpeg")
# ----------------- ESP32 API endpoints -----------------
@app.route("/chat", methods=["POST"])
def esp32_chat():
data=request.get_json(force=True)
text=(data.get("text") or "").strip()
if not text: return jsonify({"error":"empty text"}),400
lang=detect_lang(text)
try:
prefix="Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:"
reply=gemini_answer(prefix+text) if USE_GEMINI else "⚠️ Chưa kết nối Gemini."
except Exception:
logger.exception("ESP32 chat failed"); reply="⚠️ Lỗi khi gọi Gemini."
return jsonify({"reply":reply})
@app.route("/tts", methods=["POST"])
def esp32_tts():
data=request.get_json(force=True)
text=(data.get("text") or "").strip()
if not text: return jsonify({"error":"empty text"}),400
lang=detect_lang(text)
try:
path=speak_text(text,lang)
with open(path,"rb") as f: audio_b64=base64.b64encode(f.read()).decode("utf-8")
return jsonify({"audioContent":audio_b64})
except Exception:
logger.exception("ESP32 tts failed")
return jsonify({"error":"tts failed"}),500
@app.route("/stt", methods=["POST"])
def esp32_stt():
return jsonify({"text":"xin chào"})
# ----------------- Telegram -----------------
TG_THREAD=None
def send_to_telegram_sync(token,chat_id,text,tts_path=None):
try:
bot=Bot(token=token)
bot.send_message(chat_id=chat_id,text=text)
if tts_path and os.path.exists(tts_path):
with open(tts_path,"rb") as fh: bot.send_audio(chat_id=chat_id,audio=fh)
except Exception: logger.exception("telegram send failed")
def start_telegram_bot_thread():
global TG_THREAD
cfg=load_config(); token=cfg.get("TELEGRAM_TOKEN","")
if not token: return
if TG_THREAD and TG_THREAD.is_alive(): return
def runner():
try:
if TELEGRAM_LIB=="v20":
app_builder=ApplicationBuilder().token(token).build()
async def handle(update,context):
txt=update.message.text or ""
lang=detect_lang(txt)
reply=gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+txt)
await update.message.reply_text(reply)
app_builder.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle))
app_builder.run_polling()
elif TELEGRAM_LIB=="v13":
updater=Updater(token=token,use_context=True)
dp=updater.dispatcher
def handle(update,context):
txt=update.message.text or ""
lang=detect_lang(txt)
reply=gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+txt)
update.message.reply_text(reply)
dp.add_handler(MessageHandler(Filters.text & (~Filters.command),handle))
updater.start_polling(); updater.idle()
except Exception: logger.exception("telegram thread error")
TG_THREAD=threading.Thread(target=runner,daemon=True)
TG_THREAD.start()
try: start_telegram_bot_thread()
except: logger.exception("Starting telegram failed")
# ----------------- Run -----------------
if __name__=="__main__":
port=int(os.environ.get("PORT",7860))
logger.info(f"Starting RobotAI v9.9 on port {port}")
app.run(host="0.0.0.0",port=port)