JARVIS / telegram_bot.py
Khanna, Videh Rakesh Rakesh
feat: JARVIS security, cross-device, intelligence & polish overhaul
e2a2dda
"""
JARVIS Telegram Bot β€” Talk to JARVIS from any phone.
Send text or voice messages. No app install needed.
Setup:
1. Message @BotFather on Telegram β†’ /newbot β†’ name it "JARVIS"
2. Copy the token β†’ paste in .env as TELEGRAM_BOT_TOKEN
3. Run this script
"""
import os
import json
import asyncio
import tempfile
import httpx
from dotenv import load_dotenv
load_dotenv()
TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
JARVIS_URL = os.getenv("JARVIS_URL", "http://localhost:8000")
HF_SPACE_URL = os.getenv("HF_SPACE_URL", "https://v1deh-jarvis.hf.space")
TELEGRAM_API = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
# Whitelist: comma-separated Telegram chat IDs allowed to use the bot.
# If empty, ALL users can interact (insecure for public bots).
_ALLOWED_IDS_RAW = os.getenv("TELEGRAM_ALLOWED_CHAT_IDS", "")
_ALLOWED_CHAT_IDS = {
int(x.strip()) for x in _ALLOWED_IDS_RAW.split(",") if x.strip().lstrip("-").isdigit()
} if _ALLOWED_IDS_RAW.strip() else set()
# ─── Helpers ─────────────────────────────────────
async def get_jarvis_url():
"""Find working JARVIS server."""
for url in [JARVIS_URL, HF_SPACE_URL]:
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{url}/api/status", timeout=5)
if resp.status_code == 200:
return url
except Exception:
continue
return None
async def ask_jarvis(text: str, user_id: str = "default") -> str:
"""Send a message to JARVIS via WebSocket and get response."""
import websockets
base_url = await get_jarvis_url()
if not base_url:
return "I can't reach the JARVIS server right now, sir."
ws_url = base_url.replace("http://", "ws://").replace("https://", "wss://") + "/ws"
response_parts = []
try:
async with websockets.connect(ws_url) as ws:
await ws.send(json.dumps({
"type": "message",
"content": text,
"backend": "auto",
"stm": True,
"user_id": user_id,
}))
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=60)
data = json.loads(msg)
if data.get("type") == "stream":
response_parts.append(data.get("content", ""))
elif data.get("type") == "stream_end":
break
elif data.get("type") == "error":
response_parts.append(data.get("content", "Error"))
break
except Exception as e:
return f"Connection error: {e}"
return "".join(response_parts) or "No response."
# ─── Telegram API ────────────────────────────────
async def send_message(chat_id: int, text: str):
"""Send a message via Telegram."""
# Split long messages (Telegram limit: 4096 chars)
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
async with httpx.AsyncClient() as client:
for chunk in chunks:
await client.post(f"{TELEGRAM_API}/sendMessage", json={
"chat_id": chat_id,
"text": chunk,
"parse_mode": "Markdown",
})
async def send_typing(chat_id: int):
"""Show 'typing...' indicator."""
async with httpx.AsyncClient() as client:
await client.post(f"{TELEGRAM_API}/sendChatAction", json={
"chat_id": chat_id,
"action": "typing",
})
async def download_voice(file_id: str) -> str | None:
"""Download a Telegram voice message and transcribe it."""
async with httpx.AsyncClient() as client:
# Get file path
resp = await client.get(f"{TELEGRAM_API}/getFile", params={"file_id": file_id})
file_path = resp.json().get("result", {}).get("file_path")
if not file_path:
return None
# Download file
file_url = f"https://api.telegram.org/file/bot{TELEGRAM_TOKEN}/{file_path}"
resp = await client.get(file_url)
# Save to temp file
with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as f:
f.write(resp.content)
temp_path = f.name
# Transcribe using speech_recognition
try:
import subprocess
# Convert OGG to WAV
wav_path = temp_path.replace(".ogg", ".wav")
subprocess.run(
["ffmpeg", "-i", temp_path, "-ar", "16000", "-ac", "1", wav_path, "-y"],
capture_output=True, timeout=10,
)
import speech_recognition as sr
recognizer = sr.Recognizer()
with sr.AudioFile(wav_path) as source:
audio = recognizer.record(source)
text = recognizer.recognize_google(audio)
return text
except Exception as e:
return None
finally:
os.unlink(temp_path)
if os.path.exists(temp_path.replace(".ogg", ".wav")):
os.unlink(temp_path.replace(".ogg", ".wav"))
# ─── Main Bot Loop ──────────────────────────────
async def handle_update(update: dict):
"""Handle a single Telegram update."""
message = update.get("message")
if not message:
return
chat_id = message["chat"]["id"]
user_name = message.get("from", {}).get("first_name", "Sir")
# Enforce chat ID whitelist when configured
if _ALLOWED_CHAT_IDS and chat_id not in _ALLOWED_CHAT_IDS:
await send_message(chat_id, "Unauthorized. Your chat ID is not in the allow list.")
return
# Handle voice messages
if "voice" in message:
await send_typing(chat_id)
text = await download_voice(message["voice"]["file_id"])
if not text:
await send_message(chat_id, "I couldn't understand the voice message, sir. Try sending text instead.")
return
await send_message(chat_id, f"_Heard: {text}_")
# Handle text messages
elif "text" in message:
text = message["text"]
# Handle /start command
if text == "/start":
await send_message(chat_id, (
"*J.A.R.V.I.S. Online* πŸ”΅\n\n"
"Good to see you. I'm your AI assistant.\n\n"
"Just send me a message β€” text or voice.\n\n"
"Commands:\n"
"/status β€” Check system status\n"
"/weather [city] β€” Get weather\n"
"/search [query] β€” Web search\n"
"/memory β€” What I remember about you\n\n"
"_Say anything, sir. I'm listening._"
))
return
if text == "/status":
text = "What is the system status?"
elif text.startswith("/weather"):
city = text.replace("/weather", "").strip() or "New York"
text = f"What is the weather in {city}?"
elif text.startswith("/search"):
query = text.replace("/search", "").strip()
text = f"Search the web for: {query}"
elif text == "/memory":
text = "What do you remember about me?"
else:
return
# Send to JARVIS
await send_typing(chat_id)
tg_user_id = f"telegram_{chat_id}"
response = await ask_jarvis(text, user_id=tg_user_id)
await send_message(chat_id, response)
async def poll():
"""Long-polling loop for Telegram updates."""
print("=" * 50)
print(" J.A.R.V.I.S. Telegram Bot")
print(f" Server: {JARVIS_URL}")
print(" Waiting for messages...")
print("=" * 50)
offset = 0
async with httpx.AsyncClient(timeout=60) as client:
while True:
try:
resp = await client.get(
f"{TELEGRAM_API}/getUpdates",
params={"offset": offset, "timeout": 30},
)
updates = resp.json().get("result", [])
for update in updates:
offset = update["update_id"] + 1
asyncio.create_task(handle_update(update))
except httpx.ReadTimeout:
continue
except Exception as e:
print(f"Poll error: {e}")
await asyncio.sleep(5)
def main():
if not TELEGRAM_TOKEN:
print("ERROR: Set TELEGRAM_BOT_TOKEN in .env")
print()
print("How to get a token:")
print("1. Open Telegram β†’ search @BotFather")
print("2. Send /newbot")
print("3. Name it: JARVIS")
print("4. Copy the token β†’ paste in ~/jarvis/.env")
return
asyncio.run(poll())
if __name__ == "__main__":
main()