Spaces:
Sleeping
Sleeping
| """ | |
| RobotAI v9.9 — Gemini Brain + gTTS + Telegram + ESP32 API + Hugging Face | |
| Cập nhật 2025-11 | |
| Features: | |
| - Web UI (song ngữ vi/en) | |
| - ESP32 endpoints: /chat, /tts, /stt | |
| - Telegram integration (polling) | |
| - Gemini cloud AI + gTTS speech | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| import re | |
| import time | |
| import logging | |
| import threading | |
| import base64 | |
| from flask import Flask, request, jsonify, render_template_string, send_file, redirect, url_for | |
| # Gemini SDK | |
| import google.generativeai as genai | |
| # gTTS for TTS (female-like voice) | |
| from gtts import gTTS | |
| # Telegram support | |
| try: | |
| from telegram import Bot | |
| from telegram.ext import ApplicationBuilder, MessageHandler, CommandHandler, filters | |
| TELEGRAM_LIB = "v20" | |
| except Exception: | |
| try: | |
| from telegram import Bot | |
| from telegram.ext import Updater, MessageHandler, Filters, CommandHandler | |
| TELEGRAM_LIB = "v13" | |
| except Exception: | |
| TELEGRAM_LIB = None | |
| # ---------------- Config ---------------- | |
| CONFIG_FILE = "config.json" | |
| AUDIO_DIR = "audio_cache" | |
| os.makedirs(AUDIO_DIR, exist_ok=True) | |
| app = Flask(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("RobotAI") | |
| DEFAULT_MODEL = "gemini-2.5-flash" | |
| USE_GEMINI = False | |
| # ----------------- Config helpers ----------------- | |
| def load_config(): | |
| cfg = { | |
| "GEMINI_API_KEY": os.environ.get("GEMINI_API_KEY", ""), | |
| "GEMINI_MODEL": os.environ.get("GEMINI_MODEL", DEFAULT_MODEL), | |
| "TELEGRAM_TOKEN": os.environ.get("TELEGRAM_TOKEN", ""), | |
| "TELEGRAM_CHAT_ID": os.environ.get("TELEGRAM_CHAT_ID", "") | |
| } | |
| if os.path.exists(CONFIG_FILE): | |
| try: | |
| with open(CONFIG_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| cfg.update(data) | |
| except Exception: | |
| logger.exception("Load config error") | |
| return cfg | |
| def save_config(cfg): | |
| try: | |
| with open(CONFIG_FILE, "w", encoding="utf-8") as f: | |
| json.dump(cfg, f, ensure_ascii=False, indent=2) | |
| return True | |
| except Exception: | |
| logger.exception("Save config failed") | |
| return False | |
| # ----------------- Gemini init + wrapper ----------------- | |
| def init_gemini(): | |
| global USE_GEMINI | |
| cfg = load_config() | |
| key = cfg.get("GEMINI_API_KEY") or "" | |
| if not key: | |
| logger.warning("Gemini API key missing.") | |
| USE_GEMINI = False | |
| return | |
| try: | |
| genai.configure(api_key=key) | |
| USE_GEMINI = True | |
| logger.info("✅ Gemini connected OK.") | |
| except Exception: | |
| USE_GEMINI = False | |
| logger.exception("Gemini init error") | |
| init_gemini() | |
| def gemini_answer(prompt: str) -> str: | |
| cfg = load_config() | |
| model = cfg.get("GEMINI_MODEL", DEFAULT_MODEL) | |
| try: | |
| if hasattr(genai, "GenerativeModel"): | |
| m = genai.GenerativeModel(model) | |
| resp = m.generate_content(prompt) | |
| return getattr(resp, "text", str(resp)) | |
| elif hasattr(genai, "responses") and hasattr(genai.responses, "create"): | |
| r = genai.responses.create(model=model, input=prompt) | |
| return getattr(r, "output_text", str(r)) | |
| except Exception: | |
| logger.exception("Gemini call error") | |
| return "⚠️ Gemini không phản hồi — kiểm tra API key / library." | |
| # ----------------- Language detection & TTS ----------------- | |
| VIET_CHARS = "ăâđêôơưáàảãạắằẳẵặấầẩẫậéèẻẽẹíìỉĩịóòỏõọốồổỗộớờởỡợúùủũụưứừửữựýỳỷỹỵ" | |
| def detect_lang(text: str) -> str: | |
| return "vi" if any(ch in VIET_CHARS for ch in text.lower()) else "en" | |
| def clean_text_for_tts(text: str) -> str: | |
| if not text: | |
| return "Xin chào" | |
| cleaned = re.sub(r"[.,!?;:()\"'“”‘’\[\]{}<>\/\\\|@#\$%\^&\*\+=~`–—\-]", " ", text) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| return cleaned or "Xin chào" | |
| def speak_text(text: str, lang: str = "vi") -> str: | |
| try: | |
| tts_text = clean_text_for_tts(text) | |
| fname = f"tts_{uuid.uuid4().hex[:8]}.mp3" | |
| path = os.path.join(AUDIO_DIR, fname) | |
| tts = gTTS(text=tts_text, lang=lang, slow=False) | |
| tts.save(path) | |
| return path | |
| except Exception: | |
| logger.exception("TTS generation failed") | |
| return None | |
| def cleanup_audio_older_than(seconds: int = 3600): | |
| now = time.time() | |
| for f in os.listdir(AUDIO_DIR): | |
| p = os.path.join(AUDIO_DIR, f) | |
| try: | |
| if os.path.isfile(p) and (now - os.path.getmtime(p) > seconds): | |
| os.remove(p) | |
| except Exception: | |
| pass | |
| # ----------------- Web UI ----------------- | |
| INDEX_HTML = """<!doctype html><html><head> | |
| <meta charset="utf-8"><title>RobotAI v9.9</title> | |
| <style> | |
| body{font-family:Arial;background:#f5faff;padding:12px} | |
| #chat{background:#fff;border:1px solid #ddd;padding:12px;min-height:260px;border-radius:8px;overflow:auto} | |
| .you{color:#0b66c3;margin:6px 0} | |
| .bot{color:#0b8a5f;margin:6px 0} | |
| .button{background:#0b66c3;color:white;border:none;padding:8px 12px;border-radius:6px;cursor:pointer} | |
| .audio-controls{margin-top:6px} | |
| </style> | |
| </head><body> | |
| <h2>🤖 RobotAI v9.9 — Gemini Brain + Voice + Telegram</h2> | |
| <div>Gemini: <b>{{ gemini_status }}</b> | Model: <b>{{ model }}</b> | <a href="/config">Config</a></div> | |
| <hr> | |
| <textarea id="text" rows="4" style="width:100%;padding:8px;border-radius:6px;border:1px solid #ccc"></textarea><br><br> | |
| <button class="button" onclick="send()">Gửi</button> <button class="button" onclick="clearChat()">Xóa</button> | |
| <div id="chat" style="margin-top:12px"></div> | |
| <script> | |
| let currentAudio = null; | |
| function append(cls, txt){const c=document.getElementById('chat');c.innerHTML+='<div class="'+cls+'">'+txt+'</div>';c.scrollTop=c.scrollHeight;} | |
| function stopCurrentAudio(){if(currentAudio){try{currentAudio.pause();currentAudio.currentTime=0;}catch(e){}currentAudio=null;}} | |
| function clearChat(){document.getElementById('chat').innerHTML='';stopCurrentAudio();} | |
| async function send(){ | |
| let txt=document.getElementById('text').value.trim();if(!txt)return; | |
| append('you','Bạn: '+txt);document.getElementById('text').value=''; | |
| stopCurrentAudio(); | |
| try{ | |
| const res=await fetch('/api/chat',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({text:txt})}); | |
| const j=await res.json(); | |
| append('bot','🤖: '+(j.reply||'(no reply)')); | |
| if(j.tts_url){ | |
| currentAudio=new Audio(j.tts_url);currentAudio.autoplay=true; | |
| const audioEl=document.createElement('audio');audioEl.src=j.tts_url;audioEl.controls=true;audioEl.className='audio-controls'; | |
| document.getElementById('chat').appendChild(audioEl); | |
| try{currentAudio.play();}catch(e){} | |
| } | |
| }catch(e){append('bot','[Lỗi mạng] '+e);} | |
| } | |
| </script> | |
| </body></html> | |
| """ | |
| CONFIG_HTML = """<!doctype html><html><head><meta charset="utf-8"><title>Config</title></head> | |
| <body style="font-family:Arial;padding:12px"> | |
| <h3>⚙️ Config RobotAI</h3> | |
| <form method="post" action="/config"> | |
| Gemini API Key:<br><textarea name="GEMINI_API_KEY" rows="2" cols="80">{{ GEMINI_API_KEY }}</textarea><br><br> | |
| Gemini Model:<br><input name="GEMINI_MODEL" value="{{ GEMINI_MODEL }}" size="50"><br><br> | |
| Telegram Token:<br><input name="TELEGRAM_TOKEN" value="{{ TELEGRAM_TOKEN }}" size="60"><br><br> | |
| Telegram Chat ID:<br><input name="TELEGRAM_CHAT_ID" value="{{ TELEGRAM_CHAT_ID }}" size="30"><br><br> | |
| <button type="submit">Lưu</button> | |
| </form> | |
| <p><a href="/">⬅ Trở về</a></p> | |
| </body></html> | |
| """ | |
| # ----------------- Routes ----------------- | |
| def home(): | |
| cfg = load_config() | |
| return render_template_string(INDEX_HTML, | |
| gemini_status="✅ Kết nối" if USE_GEMINI else "❌ Chưa kết nối", | |
| model=cfg.get("GEMINI_MODEL")) | |
| def config_page(): | |
| if request.method == "POST": | |
| data = {k: request.form.get(k,"").strip() for k in ["GEMINI_API_KEY","GEMINI_MODEL","TELEGRAM_TOKEN","TELEGRAM_CHAT_ID"]} | |
| save_config(data); init_gemini() | |
| try: start_telegram_bot_thread() | |
| except: logger.exception("start telegram thread failed") | |
| return redirect(url_for("config_page")) | |
| return render_template_string(CONFIG_HTML, **load_config()) | |
| def api_chat(): | |
| payload = request.get_json(force=True) | |
| text = (payload.get("text") or "").strip() | |
| if not text: return jsonify({"error":"empty"}),400 | |
| lang = detect_lang(text) | |
| try: | |
| reply = gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+text) if USE_GEMINI else "⚠️ Chưa kết nối Gemini." | |
| except Exception: | |
| logger.exception("gemini call"); reply="⚠️ Lỗi khi gọi Gemini." | |
| tts_path=None | |
| try: | |
| if reply: tts_path=speak_text(reply,lang) | |
| except: logger.exception("tts failed") | |
| tts_url=f"/api/tts/{os.path.basename(tts_path)}" if tts_path else None | |
| threading.Thread(target=cleanup_audio_older_than,daemon=True).start() | |
| return jsonify({"reply":reply,"tts_url":tts_url}) | |
| def get_tts(fname): | |
| path=os.path.join(AUDIO_DIR,fname) | |
| if not os.path.exists(path): return jsonify({"error":"not found"}),404 | |
| return send_file(path,mimetype="audio/mpeg") | |
| # ----------------- ESP32 API endpoints ----------------- | |
| def esp32_chat(): | |
| data=request.get_json(force=True) | |
| text=(data.get("text") or "").strip() | |
| if not text: return jsonify({"error":"empty text"}),400 | |
| lang=detect_lang(text) | |
| try: | |
| prefix="Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:" | |
| reply=gemini_answer(prefix+text) if USE_GEMINI else "⚠️ Chưa kết nối Gemini." | |
| except Exception: | |
| logger.exception("ESP32 chat failed"); reply="⚠️ Lỗi khi gọi Gemini." | |
| return jsonify({"reply":reply}) | |
| def esp32_tts(): | |
| data=request.get_json(force=True) | |
| text=(data.get("text") or "").strip() | |
| if not text: return jsonify({"error":"empty text"}),400 | |
| lang=detect_lang(text) | |
| try: | |
| path=speak_text(text,lang) | |
| with open(path,"rb") as f: audio_b64=base64.b64encode(f.read()).decode("utf-8") | |
| return jsonify({"audioContent":audio_b64}) | |
| except Exception: | |
| logger.exception("ESP32 tts failed") | |
| return jsonify({"error":"tts failed"}),500 | |
| def esp32_stt(): | |
| return jsonify({"text":"xin chào"}) | |
| # ----------------- Telegram ----------------- | |
| TG_THREAD=None | |
| def send_to_telegram_sync(token,chat_id,text,tts_path=None): | |
| try: | |
| bot=Bot(token=token) | |
| bot.send_message(chat_id=chat_id,text=text) | |
| if tts_path and os.path.exists(tts_path): | |
| with open(tts_path,"rb") as fh: bot.send_audio(chat_id=chat_id,audio=fh) | |
| except Exception: logger.exception("telegram send failed") | |
| def start_telegram_bot_thread(): | |
| global TG_THREAD | |
| cfg=load_config(); token=cfg.get("TELEGRAM_TOKEN","") | |
| if not token: return | |
| if TG_THREAD and TG_THREAD.is_alive(): return | |
| def runner(): | |
| try: | |
| if TELEGRAM_LIB=="v20": | |
| app_builder=ApplicationBuilder().token(token).build() | |
| async def handle(update,context): | |
| txt=update.message.text or "" | |
| lang=detect_lang(txt) | |
| reply=gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+txt) | |
| await update.message.reply_text(reply) | |
| app_builder.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle)) | |
| app_builder.run_polling() | |
| elif TELEGRAM_LIB=="v13": | |
| updater=Updater(token=token,use_context=True) | |
| dp=updater.dispatcher | |
| def handle(update,context): | |
| txt=update.message.text or "" | |
| lang=detect_lang(txt) | |
| reply=gemini_answer(("Trả lời bằng tiếng Việt:" if lang=="vi" else "Answer in English:")+txt) | |
| update.message.reply_text(reply) | |
| dp.add_handler(MessageHandler(Filters.text & (~Filters.command),handle)) | |
| updater.start_polling(); updater.idle() | |
| except Exception: logger.exception("telegram thread error") | |
| TG_THREAD=threading.Thread(target=runner,daemon=True) | |
| TG_THREAD.start() | |
| try: start_telegram_bot_thread() | |
| except: logger.exception("Starting telegram failed") | |
| # ----------------- Run ----------------- | |
| if __name__=="__main__": | |
| port=int(os.environ.get("PORT",7860)) | |
| logger.info(f"Starting RobotAI v9.9 on port {port}") | |
| app.run(host="0.0.0.0",port=port) | |