| |
| |
| """ |
| KCrobot AI — Vmax Final (voice-first) |
| - Default Gemini model: gemini-1.5-pro (fallbacks implemented) |
| - Read secrets from env (HF "New secret"): |
| GEMINI_API_KEY, GEMINI_MODEL, TELEGRAM_TOKEN, TELEGRAM_CHAT_ID, |
| ELEVEN_API_KEY, ELEVEN_VOICE_ID, GOOGLE_SPEECH_LANG |
| - Voice-first: ESP32 uploads audio -> /chat_audio -> STT -> Gemini -> TTS -> MP3 |
| - If STT libs missing, /chat_audio returns 501 (server-side STT optional) |
| - Endpoints: |
| GET / -> simple web UI (chat secondary) |
| POST /chat_text -> {"q":"...","voice":true} |
| POST /chat_audio -> upload wav (multipart 'file' or raw bytes) |
| POST /esp/send_text-> wrapper for /chat_text |
| GET /play_latest -> latest_reply.mp3 |
| GET /_history -> recent history |
| POST /notify -> forward to Telegram |
| GET /health -> health check |
| """ |
| from __future__ import annotations |
|
|
| import os |
| import io |
| import json |
| import re |
| import time |
| import math |
| import logging |
| import threading |
| import pathlib |
| from datetime import datetime |
| from typing import Tuple, Dict, Any, Optional |
|
|
| from flask import Flask, request, jsonify, send_file, render_template_string |
| import requests |
|
|
| |
| USE_GENAI_SDK = False |
| GENAI_CLIENT = None |
| try: |
| |
| from google import genai |
| USE_GENAI_SDK = True |
| except Exception: |
| try: |
| import google.generativeai as genai |
| USE_GENAI_SDK = True |
| except Exception: |
| genai = None |
| USE_GENAI_SDK = False |
|
|
| |
| try: |
| from gtts import gTTS |
| GTTS_AVAILABLE = True |
| except Exception: |
| GTTS_AVAILABLE = False |
|
|
| try: |
| import speech_recognition as sr |
| from pydub import AudioSegment |
| STT_AVAILABLE = True |
| except Exception: |
| sr = None |
| AudioSegment = None |
| STT_AVAILABLE = False |
|
|
| |
| |
| |
| CFG = { |
| "GEMINI_API_KEY": os.getenv("GEMINI_API_KEY", "").strip(), |
| "GEMINI_MODEL": os.getenv("GEMINI_MODEL", "gemini-1.5-pro").strip(), |
| "TELEGRAM_TOKEN": os.getenv("TELEGRAM_TOKEN", "").strip(), |
| "TELEGRAM_CHAT_ID": os.getenv("TELEGRAM_CHAT_ID", "").strip(), |
| "ELEVEN_API_KEY": os.getenv("ELEVEN_API_KEY", "").strip(), |
| "ELEVEN_VOICE_ID": os.getenv("ELEVEN_VOICE_ID", "").strip(), |
| "GOOGLE_SPEECH_LANG": os.getenv("GOOGLE_SPEECH_LANG", "vi-VN").strip(), |
| } |
|
|
| |
| MODEL_FALLBACK_LIST = [ |
| CFG.get("GEMINI_MODEL") or "gemini-1.5-pro", |
| "gemini-1.5-flash", |
| "gemini-2.5-flash", |
| "gemini-2.5-pro", |
| ] |
|
|
| |
| seen = set() |
| MODEL_FALLBACK = [] |
| for m in MODEL_FALLBACK_LIST: |
| if m and m not in seen: |
| seen.add(m) |
| MODEL_FALLBACK.append(m) |
|
|
| |
| GEMINI_KEY = CFG.get("GEMINI_API_KEY") or "" |
| if USE_GENAI_SDK and GEMINI_KEY: |
| try: |
| |
| GENAI_CLIENT = genai.Client(api_key=GEMINI_KEY) |
| except Exception: |
| try: |
| |
| genai.configure(api_key=GEMINI_KEY) |
| GENAI_CLIENT = genai |
| except Exception: |
| GENAI_CLIENT = None |
|
|
| |
| |
| |
| BASE = pathlib.Path.cwd() |
| DATA_DIR = BASE / "data" |
| DATA_DIR.mkdir(exist_ok=True) |
| USAGE_FILE = DATA_DIR / "usage.json" |
| HISTORY_FILE = DATA_DIR / "history.json" |
| CFG_SNAPSHOT = DATA_DIR / "cfg_snapshot.json" |
| LATEST_MP3 = DATA_DIR / "latest_reply.mp3" |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("kcrobot_vmax") |
|
|
| |
| try: |
| CFG_SNAPSHOT.write_text(json.dumps({k: bool(CFG.get(k)) for k in CFG}, indent=2), encoding="utf-8") |
| except Exception: |
| pass |
|
|
| |
| |
| |
| def load_json_safe(path: pathlib.Path, default): |
| try: |
| if path.exists(): |
| return json.loads(path.read_text(encoding="utf-8")) |
| except Exception as e: |
| logger.debug("load_json_safe error %s -> %s", path, e) |
| return default |
|
|
| def save_json_safe(path: pathlib.Path, data): |
| try: |
| path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") |
| return True |
| except Exception as e: |
| logger.exception("save_json_safe failed for %s: %s", path, e) |
| return False |
|
|
| def today_str(): |
| return datetime.utcnow().strftime("%Y-%m-%d") |
|
|
| def load_usage(): |
| return load_json_safe(USAGE_FILE, {"date": today_str(), "requests_today": 0, "tokens_month": 0}) |
|
|
| def save_usage(u): |
| return save_json_safe(USAGE_FILE, u) |
|
|
| def increment_usage(tokens=1): |
| u = load_usage() |
| if u.get("date") != today_str(): |
| u = {"date": today_str(), "requests_today": 0, "tokens_month": u.get("tokens_month", 0)} |
| u["requests_today"] = u.get("requests_today", 0) + 1 |
| u["tokens_month"] = u.get("tokens_month", 0) + int(tokens) |
| save_usage(u) |
|
|
| def append_history(entry: dict): |
| h = load_json_safe(HISTORY_FILE, []) |
| h.append(entry) |
| if len(h) > 500: |
| h = h[-500:] |
| save_json_safe(HISTORY_FILE, h) |
|
|
| |
| |
| |
| VIET_CHAR_RE = re.compile( |
| r"[àáạảãâầấậẩẫăằắặẳẵđèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹ]", |
| re.I, |
| ) |
|
|
| def detect_lang(text: str) -> str: |
| if not text or not isinstance(text, str): |
| return "en" |
| if VIET_CHAR_RE.search(text): |
| return "vi" |
| low = text.lower() |
| eng_signs = ["hello", "what", "how", "please", "thank", "do you", "who", "where", "when", "why", "weather", "today"] |
| for w in eng_signs: |
| if w in low: |
| return "en" |
| return "en" |
|
|
| |
| |
| |
| def gemini_call_single(model: str, prompt: str, max_output_tokens: int = 1024, temperature: float = 0.2) -> Tuple[bool, str, int]: |
| """ |
| Try calling a single model. Return (ok, text_or_error, http_status_or_0) |
| """ |
| |
| if GENAI_CLIENT: |
| try: |
| if hasattr(GENAI_CLIENT, "models") and hasattr(GENAI_CLIENT.models, "generate_content"): |
| resp = GENAI_CLIENT.models.generate_content(model=model, contents=prompt, |
| max_output_tokens=max_output_tokens, temperature=temperature) |
| txt = getattr(resp, "text", None) |
| if txt: |
| return True, txt, 200 |
| return True, str(resp), 200 |
| if hasattr(GENAI_CLIENT, "generate_content"): |
| resp = GENAI_CLIENT.generate_content(prompt, max_output_tokens=max_output_tokens, temperature=temperature) |
| if hasattr(resp, "text") and resp.text: |
| return True, resp.text, 200 |
| return True, str(resp), 200 |
| except requests.exceptions.HTTPError as he: |
| try: |
| code = he.response.status_code |
| except Exception: |
| code = 0 |
| return False, str(he), code |
| except Exception as e: |
| return False, str(e), 0 |
|
|
| |
| key = CFG.get("GEMINI_API_KEY") or "" |
| if not key: |
| return False, "Gemini API key not configured", 0 |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generate" |
| headers = {"Content-Type": "application/json"} |
| payload = { |
| "prompt": { |
| "messages": [ |
| {"author": "system", "content": {"text": "You are a helpful assistant."}}, |
| {"author": "user", "content": {"text": prompt}}, |
| ] |
| }, |
| "maxOutputTokens": max_output_tokens, |
| "temperature": temperature, |
| } |
| try: |
| r = requests.post(url, params={"key": key}, json=payload, headers=headers, timeout=30) |
| status = r.status_code |
| if status == 404: |
| return False, f"404 model not found: {model}", 404 |
| if status >= 400: |
| return False, f"HTTP {status}: {r.text}", status |
| j = r.json() |
| |
| cand = j.get("candidates") |
| if cand and isinstance(cand, list): |
| c0 = cand[0] |
| content = c0.get("content") |
| if isinstance(content, list): |
| parts = [] |
| for c in content: |
| if isinstance(c, dict) and "text" in c: |
| parts.append(c["text"]) |
| if parts: |
| return True, "".join(parts), status |
| if isinstance(j, dict) and "output" in j and isinstance(j["output"], str): |
| return True, j["output"], status |
| return True, json.dumps(j)[:2000], status |
| except requests.exceptions.HTTPError as he: |
| try: |
| code = he.response.status_code |
| except Exception: |
| code = 0 |
| return False, str(he), code |
| except Exception as e: |
| return False, str(e), 0 |
|
|
| def call_gemini_with_fallbacks(prompt: str, max_output_tokens: int = 1024, temperature: float = 0.2) -> Dict[str, Any]: |
| """ |
| Try models in MODEL_FALLBACK sequence; return dict with ok/text/model or error. |
| """ |
| if not CFG.get("GEMINI_API_KEY"): |
| return {"ok": False, "error": "Gemini API key not configured (set GEMINI_API_KEY in New secret)"} |
| last_error = None |
| for model in MODEL_FALLBACK: |
| if not model: |
| continue |
| ok, text_or_err, status = gemini_call_single(model, prompt, max_output_tokens, temperature) |
| if ok: |
| return {"ok": True, "text": text_or_err, "model": model} |
| last_error = {"model": model, "status": status, "error": text_or_err} |
| logger.warning("Model %s failed: %s (status=%s)", model, text_or_err, status) |
| |
| return {"ok": False, "error": f"All models failed. Last: {last_error}", "last": last_error} |
|
|
| |
| |
| |
| def tts_elevenlabs_bytes(text: str, voice_id: str, api_key: str) -> bytes: |
| url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" |
| headers = {"xi-api-key": api_key, "Content-Type": "application/json"} |
| payload = {"text": text, "voice_settings": {"stability": 0.6, "similarity_boost": 0.75}} |
| r = requests.post(url, json=payload, headers=headers, timeout=30) |
| r.raise_for_status() |
| return r.content |
|
|
| def tts_gtts_bytes(text: str, lang: str = "vi") -> bytes: |
| if not GTTS_AVAILABLE: |
| raise RuntimeError("gTTS not available in environment") |
| t = gTTS(text=text, lang=lang) |
| bio = io.BytesIO() |
| t.write_to_fp(bio) |
| bio.seek(0) |
| return bio.read() |
|
|
| def synthesize_and_save(answer: str, lang_hint: str = "vi") -> Tuple[bool, str]: |
| try: |
| mp3_bytes = None |
| if CFG.get("ELEVEN_API_KEY") and CFG.get("ELEVEN_VOICE_ID"): |
| try: |
| mp3_bytes = tts_elevenlabs_bytes(answer, CFG["ELEVEN_VOICE_ID"], CFG["ELEVEN_API_KEY"]) |
| logger.info("TTS: used ElevenLabs") |
| except Exception: |
| logger.exception("ElevenLabs TTS failed; falling back to gTTS") |
| mp3_bytes = None |
| if mp3_bytes is None: |
| lang_code = "vi" if lang_hint and str(lang_hint).startswith("vi") else "en" |
| mp3_bytes = tts_gtts_bytes(answer, lang=lang_code) |
| logger.info("TTS: used gTTS") |
| LATEST_MP3.write_bytes(mp3_bytes) |
| return True, str(LATEST_MP3) |
| except Exception as e: |
| logger.exception("synthesize_and_save failed") |
| return False, f"TTS error: {e}" |
|
|
| |
| |
| |
| def speech_to_text(wav_bytes: bytes) -> Tuple[bool, str]: |
| if not STT_AVAILABLE: |
| return False, "STT not available on server (SpeechRecognition & pydub missing)" |
| try: |
| recognizer = sr.Recognizer() |
| with sr.AudioFile(io.BytesIO(wav_bytes)) as src: |
| audio = recognizer.record(src) |
| text = recognizer.recognize_google(audio, language=CFG.get("GOOGLE_SPEECH_LANG", "vi-VN")) |
| return True, text |
| except Exception as e: |
| logger.exception("speech_to_text failed") |
| return False, str(e) |
|
|
| |
| |
| |
| def send_telegram_message(text: str) -> bool: |
| token = CFG.get("TELEGRAM_TOKEN") or "" |
| chat = CFG.get("TELEGRAM_CHAT_ID") or "" |
| if not token or not chat: |
| logger.debug("Telegram not configured") |
| return False |
| try: |
| url = f"https://api.telegram.org/bot{token}/sendMessage" |
| requests.post(url, json={"chat_id": chat, "text": text}, timeout=8) |
| return True |
| except Exception: |
| logger.exception("send_telegram_message failed") |
| return False |
|
|
| |
| |
| |
| app = Flask(__name__) |
|
|
| INDEX_HTML = """ |
| <!doctype html> |
| <html> |
| <head><meta charset="utf-8"><title>KCrobot AI — Vmax (Voice-first)</title> |
| <style> |
| body{font-family:Arial;background:#071025;color:#fff;padding:18px} |
| .container{max-width:900px;margin:0 auto} |
| textarea{width:100%;padding:10px;border-radius:8px;background:#061427;color:#fff;border:1px solid #133} |
| button{padding:10px 14px;border-radius:8px;background:#0ea5ff;color:#012;border:none;cursor:pointer} |
| #resp{white-space:pre-wrap;margin-top:12px;background:#021220;padding:12px;border-radius:6px} |
| .small{font-size:0.9rem;color:#9fb3c8} |
| </style></head><body> |
| <div class="container"> |
| <h1>🤖 KCrobot AI — Vmax (Voice-first)</h1> |
| <p class="small">Model fallback: {{models}} — Gemini key: {{gemini}} — Telegram: {{tg}}</p> |
| <p>Giao diện chat là phụ — ưu tiên voice (ESP32 gửi audio). Bạn có thể thử gõ "Xin chào" để nghe trả lời.</p> |
| <textarea id="q" rows="4" placeholder="Nhập tiếng Việt / English..."></textarea> |
| <p><label><input id="voice" type="checkbox" checked> Voice ON</label> |
| <button onclick="send()">Gửi & Nghe</button></p> |
| <div id="resp"></div> |
| <audio id="audio" controls style="display:none"></audio> |
| <script> |
| async function send(){ |
| const q=document.getElementById('q').value; |
| const voice=document.getElementById('voice').checked; |
| if(!q){ alert('Nhập nội dung'); return; } |
| document.getElementById('resp').innerText='⏳ Đang xử lý...'; |
| const res = await fetch('/chat_text', { |
| method:'POST', headers:{'Content-Type':'application/json'}, |
| body: JSON.stringify({q:q, voice:voice}) |
| }); |
| const j = await res.json(); |
| if(j.error){ document.getElementById('resp').innerText='Error: '+j.error; return; } |
| document.getElementById('resp').innerText = j.answer; |
| if(j.play_url){ |
| const audio=document.getElementById('audio'); |
| audio.src = j.play_url + '?t=' + Date.now(); |
| audio.style.display='block'; |
| audio.play(); |
| } |
| } |
| </script> |
| </div></body></html> |
| """ |
|
|
| @app.route("/", methods=["GET"]) |
| def root(): |
| models = ", ".join(MODEL_FALLBACK) |
| gemini_set = bool(CFG.get("GEMINI_API_KEY")) |
| tg = bool(CFG.get("TELEGRAM_TOKEN") and CFG.get("TELEGRAM_CHAT_ID")) |
| return render_template_string(INDEX_HTML, models=models, gemini=("✅" if gemini_set else "❌"), tg=("✅" if tg else "❌")) |
|
|
| @app.route("/_history", methods=["GET"]) |
| def history_endpoint(): |
| h = load_json_safe(HISTORY_FILE, []) |
| return jsonify(h[-50:]) |
|
|
| @app.route("/chat_text", methods=["POST"]) |
| def chat_text(): |
| data = request.get_json(silent=True) or {} |
| q = data.get("q") or data.get("question") or "" |
| voice_on = bool(data.get("voice", True)) |
| if not q or not str(q).strip(): |
| return jsonify({"error": "missing 'q'"}), 400 |
| lang = detect_lang(q) |
| prompt = (f"Bạn là robot trợ lý KCrobot, trả lời bằng tiếng Việt tự nhiên: {q}" if lang == "vi" |
| else f"You are KCrobot assistant. Answer naturally in English: {q}") |
| gem = call_gemini_with_fallbacks(prompt) |
| if not gem.get("ok"): |
| answer = f"[Gemini error] {gem.get('error')}" |
| logger.warning("Gemini failed: %s", gem.get("error")) |
| else: |
| answer = gem.get("text", "") |
| increment_usage(max(1, len(answer.split()))) |
| append_history({"ts": time.time(), "q": q, "answer": answer, "lang": lang, "model": gem.get("model")}) |
| play_url = None |
| if voice_on: |
| ok, path_or_err = synthesize_and_save(answer, lang_hint=lang) |
| if ok: |
| play_url = "/play_latest" |
| try: |
| threading.Thread(target=send_telegram_message, args=(f"Q: {q}\nA: {answer}",)).start() |
| except Exception: |
| pass |
| resp = {"answer": answer} |
| if play_url: |
| resp["play_url"] = play_url |
| return jsonify(resp) |
|
|
| @app.route("/esp/send_text", methods=["POST"]) |
| def esp_send_text(): |
| return chat_text() |
|
|
| @app.route("/chat_audio", methods=["POST"]) |
| def chat_audio(): |
| wav_bytes = None |
| if 'file' in request.files: |
| f = request.files['file'] |
| wav_bytes = f.read() |
| else: |
| wav_bytes = request.get_data() |
| if not wav_bytes: |
| return jsonify({"error": "no audio provided"}), 400 |
| try: |
| ts = int(time.time()) |
| (DATA_DIR / f"uploaded_{ts}.wav").write_bytes(wav_bytes) |
| except Exception: |
| logger.exception("saving uploaded wav failed") |
| if not STT_AVAILABLE: |
| return jsonify({"error": "STT not available on server (SpeechRecognition & pydub missing)"}), 501 |
| ok, text_or_err = speech_to_text(wav_bytes) |
| if not ok: |
| return jsonify({"error": "STT failed", "details": text_or_err}), 500 |
| text = text_or_err |
| lang = detect_lang(text) |
| prompt = (f"Bạn là robot trợ lý KCrobot, trả lời bằng tiếng Việt tự nhiên: {text}" if lang == "vi" |
| else f"You are KCrobot assistant. Answer naturally in English: {text}") |
| gem = call_gemini_with_fallbacks(prompt) |
| if not gem.get("ok"): |
| answer = f"[Gemini error] {gem.get('error')}" |
| logger.warning("Gemini failed on audio: %s", gem.get("error")) |
| else: |
| answer = gem.get("text", "") |
| synth_ok, synth_path = synthesize_and_save(answer, lang_hint=lang) |
| append_history({"ts": time.time(), "q": text, "answer": answer, "lang": lang, "model": gem.get("model")}) |
| try: |
| threading.Thread(target=send_telegram_message, args=(f"Q(STT): {text}\nA: {answer}",)).start() |
| except Exception: |
| pass |
| resp = {"question": text, "answer": answer} |
| if synth_ok: |
| resp["play_url"] = "/play_latest" |
| return jsonify(resp) |
|
|
| @app.route("/play_latest", methods=["GET"]) |
| def play_latest(): |
| if not LATEST_MP3.exists(): |
| return jsonify({"error": "no audio"}), 404 |
| return send_file(str(LATEST_MP3), mimetype="audio/mpeg") |
|
|
| @app.route("/notify", methods=["POST"]) |
| def notify(): |
| data = request.get_json(silent=True) or {} |
| event = data.get("event", "event") |
| msg = data.get("msg", "") |
| try: |
| threading.Thread(target=send_telegram_message, args=(f"[Robot Notify] {event}: {msg}",)).start() |
| except Exception: |
| pass |
| return jsonify({"sent": True}) |
|
|
| @app.route("/health", methods=["GET"]) |
| def health(): |
| return jsonify({ |
| "status": "ok", |
| "time": time.time(), |
| "gemini_key_present": bool(CFG.get("GEMINI_API_KEY")), |
| "model_list": MODEL_FALLBACK, |
| "stt_available": STT_AVAILABLE, |
| "gtts_available": GTTS_AVAILABLE, |
| "sdk_present": USE_GENAI_SDK |
| }) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| load_json_safe(HISTORY_FILE, []) |
| load_json_safe(USAGE_FILE, {}) |
| logger.info("KCrobot Vmax starting. Gemini key present: %s, SDK present: %s, STT available: %s", |
| bool(CFG.get("GEMINI_API_KEY")), USE_GENAI_SDK, STT_AVAILABLE) |
| app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")), debug=False) |