""" JARVIS Telegram Bot — Talk to JARVIS from any phone. Send text or voice messages. No app install needed. Setup: 1. Message @BotFather on Telegram → /newbot → name it "JARVIS" 2. Copy the token → paste in .env as TELEGRAM_BOT_TOKEN 3. Run this script """ import os import json import asyncio import tempfile import httpx from dotenv import load_dotenv load_dotenv() TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") JARVIS_URL = os.getenv("JARVIS_URL", "http://localhost:8000") HF_SPACE_URL = os.getenv("HF_SPACE_URL", "https://v1deh-jarvis.hf.space") TELEGRAM_API = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" # Whitelist: comma-separated Telegram chat IDs allowed to use the bot. # If empty, ALL users can interact (insecure for public bots). _ALLOWED_IDS_RAW = os.getenv("TELEGRAM_ALLOWED_CHAT_IDS", "") _ALLOWED_CHAT_IDS = { int(x.strip()) for x in _ALLOWED_IDS_RAW.split(",") if x.strip().lstrip("-").isdigit() } if _ALLOWED_IDS_RAW.strip() else set() # ─── Helpers ───────────────────────────────────── async def get_jarvis_url(): """Find working JARVIS server.""" for url in [JARVIS_URL, HF_SPACE_URL]: try: async with httpx.AsyncClient() as client: resp = await client.get(f"{url}/api/status", timeout=5) if resp.status_code == 200: return url except Exception: continue return None async def ask_jarvis(text: str, user_id: str = "default") -> str: """Send a message to JARVIS via WebSocket and get response.""" import websockets base_url = await get_jarvis_url() if not base_url: return "I can't reach the JARVIS server right now, sir." ws_url = base_url.replace("http://", "ws://").replace("https://", "wss://") + "/ws" response_parts = [] try: async with websockets.connect(ws_url) as ws: await ws.send(json.dumps({ "type": "message", "content": text, "backend": "auto", "stm": True, "user_id": user_id, })) while True: msg = await asyncio.wait_for(ws.recv(), timeout=60) data = json.loads(msg) if data.get("type") == "stream": response_parts.append(data.get("content", "")) elif data.get("type") == "stream_end": break elif data.get("type") == "error": response_parts.append(data.get("content", "Error")) break except Exception as e: return f"Connection error: {e}" return "".join(response_parts) or "No response." # ─── Telegram API ──────────────────────────────── async def send_message(chat_id: int, text: str): """Send a message via Telegram.""" # Split long messages (Telegram limit: 4096 chars) chunks = [text[i:i+4000] for i in range(0, len(text), 4000)] async with httpx.AsyncClient() as client: for chunk in chunks: await client.post(f"{TELEGRAM_API}/sendMessage", json={ "chat_id": chat_id, "text": chunk, "parse_mode": "Markdown", }) async def send_typing(chat_id: int): """Show 'typing...' indicator.""" async with httpx.AsyncClient() as client: await client.post(f"{TELEGRAM_API}/sendChatAction", json={ "chat_id": chat_id, "action": "typing", }) async def download_voice(file_id: str) -> str | None: """Download a Telegram voice message and transcribe it.""" async with httpx.AsyncClient() as client: # Get file path resp = await client.get(f"{TELEGRAM_API}/getFile", params={"file_id": file_id}) file_path = resp.json().get("result", {}).get("file_path") if not file_path: return None # Download file file_url = f"https://api.telegram.org/file/bot{TELEGRAM_TOKEN}/{file_path}" resp = await client.get(file_url) # Save to temp file with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as f: f.write(resp.content) temp_path = f.name # Transcribe using speech_recognition try: import subprocess # Convert OGG to WAV wav_path = temp_path.replace(".ogg", ".wav") subprocess.run( ["ffmpeg", "-i", temp_path, "-ar", "16000", "-ac", "1", wav_path, "-y"], capture_output=True, timeout=10, ) import speech_recognition as sr recognizer = sr.Recognizer() with sr.AudioFile(wav_path) as source: audio = recognizer.record(source) text = recognizer.recognize_google(audio) return text except Exception as e: return None finally: os.unlink(temp_path) if os.path.exists(temp_path.replace(".ogg", ".wav")): os.unlink(temp_path.replace(".ogg", ".wav")) # ─── Main Bot Loop ────────────────────────────── async def handle_update(update: dict): """Handle a single Telegram update.""" message = update.get("message") if not message: return chat_id = message["chat"]["id"] user_name = message.get("from", {}).get("first_name", "Sir") # Enforce chat ID whitelist when configured if _ALLOWED_CHAT_IDS and chat_id not in _ALLOWED_CHAT_IDS: await send_message(chat_id, "Unauthorized. Your chat ID is not in the allow list.") return # Handle voice messages if "voice" in message: await send_typing(chat_id) text = await download_voice(message["voice"]["file_id"]) if not text: await send_message(chat_id, "I couldn't understand the voice message, sir. Try sending text instead.") return await send_message(chat_id, f"_Heard: {text}_") # Handle text messages elif "text" in message: text = message["text"] # Handle /start command if text == "/start": await send_message(chat_id, ( "*J.A.R.V.I.S. Online* 🔵\n\n" "Good to see you. I'm your AI assistant.\n\n" "Just send me a message — text or voice.\n\n" "Commands:\n" "/status — Check system status\n" "/weather [city] — Get weather\n" "/search [query] — Web search\n" "/memory — What I remember about you\n\n" "_Say anything, sir. I'm listening._" )) return if text == "/status": text = "What is the system status?" elif text.startswith("/weather"): city = text.replace("/weather", "").strip() or "New York" text = f"What is the weather in {city}?" elif text.startswith("/search"): query = text.replace("/search", "").strip() text = f"Search the web for: {query}" elif text == "/memory": text = "What do you remember about me?" else: return # Send to JARVIS await send_typing(chat_id) tg_user_id = f"telegram_{chat_id}" response = await ask_jarvis(text, user_id=tg_user_id) await send_message(chat_id, response) async def poll(): """Long-polling loop for Telegram updates.""" print("=" * 50) print(" J.A.R.V.I.S. Telegram Bot") print(f" Server: {JARVIS_URL}") print(" Waiting for messages...") print("=" * 50) offset = 0 async with httpx.AsyncClient(timeout=60) as client: while True: try: resp = await client.get( f"{TELEGRAM_API}/getUpdates", params={"offset": offset, "timeout": 30}, ) updates = resp.json().get("result", []) for update in updates: offset = update["update_id"] + 1 asyncio.create_task(handle_update(update)) except httpx.ReadTimeout: continue except Exception as e: print(f"Poll error: {e}") await asyncio.sleep(5) def main(): if not TELEGRAM_TOKEN: print("ERROR: Set TELEGRAM_BOT_TOKEN in .env") print() print("How to get a token:") print("1. Open Telegram → search @BotFather") print("2. Send /newbot") print("3. Name it: JARVIS") print("4. Copy the token → paste in ~/jarvis/.env") return asyncio.run(poll()) if __name__ == "__main__": main()