# ========================================================== # KC ROBOT AI - APP.PY (V2.0 MAX FINAL) # Cloud AI Robot with Gemini 2.5 Flash + ESP32 + Telegram # ========================================================== from flask import Flask, request, jsonify, render_template_string from google import genai import requests import os import time from gtts import gTTS from langdetect import detect import tempfile import base64 # ========================================================== # CONFIGURATION # ========================================================== # Load environment variables from secrets (Cloud Run or Hugging Face) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") # Create Flask app app = Flask(__name__) # ========================================================== # SETUP GEMINI CLIENT # ========================================================== if not GEMINI_API_KEY: print("❌ ERROR: No Gemini API Key found. Please add GEMINI_API_KEY in Secrets.") client = None else: client = genai.Client(api_key=GEMINI_API_KEY) # ========================================================== # TELEGRAM UTILITIES # ========================================================== def send_telegram_message(text): if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: print("⚠️ Telegram not configured.") return url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" payload = {"chat_id": TELEGRAM_CHAT_ID, "text": text} try: requests.post(url, json=payload, timeout=5) except Exception as e: print("Telegram Error:", e) # ========================================================== # GEMINI AI RESPONSE # ========================================================== def ask_gemini(prompt: str): if not client: return "⚠️ Gemini API key missing. Please configure in Secrets." try: response = client.models.generate_content( model=GEMINI_MODEL, contents=prompt ) if hasattr(response, "text"): return response.text.strip() elif "text" in response: return response["text"].strip() else: return "⚠️ No response text from Gemini." except Exception as e: print("Gemini Error:", e) return f"⚠️ Gemini Error: {e}" # ========================================================== # LANGUAGE DETECTION & TTS # ========================================================== def text_to_speech(text): try: lang = detect(text) if lang not in ["vi", "en"]: lang = "en" tts = gTTS(text=text, lang=lang) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") tts.save(tmp.name) with open(tmp.name, "rb") as f: audio_b64 = base64.b64encode(f.read()).decode("utf-8") os.unlink(tmp.name) return audio_b64 except Exception as e: print("TTS Error:", e) return None # ========================================================== # SIMPLE HTML INTERFACE (for testing) # ========================================================== HTML_PAGE = """ KC Robot AI v2.0

🤖 KC Robot AI v2.0 MAX FINAL


""" @app.route("/") def home(): return render_template_string(HTML_PAGE) # ========================================================== # API ENDPOINTS # ========================================================== @app.route("/api/chat", methods=["POST"]) def api_chat(): data = request.get_json() if not data or "message" not in data: return jsonify({"error": "Missing 'message'"}), 400 user_message = data["message"] print(f"🧠 User said: {user_message}") send_telegram_message(f"User: {user_message}") ai_reply = ask_gemini(user_message) send_telegram_message(f"Robot: {ai_reply}") audio_b64 = text_to_speech(ai_reply) return jsonify({"reply": ai_reply, "audio": audio_b64}) # ESP32 sensor endpoint @app.route("/api/sensor", methods=["POST"]) def sensor_data(): data = request.get_json() if not data: return jsonify({"error": "No data"}), 400 msg = f"👁️ ESP32 Sensor update: {data}" send_telegram_message(msg) return jsonify({"status": "received"}) # Health check @app.route("/ping") def ping(): return jsonify({"status": "ok", "model": GEMINI_MODEL}) # ========================================================== # MAIN ENTRY POINT # ========================================================== if __name__ == "__main__": port = int(os.getenv("PORT", 8080)) print(f"🚀 KC Robot AI v2.0 running on port {port}") app.run(host="0.0.0.0", port=port)