PraisonAI / app /telegram_bot.py
Sanyam400's picture
Update app/telegram_bot.py
dc97d11 verified
"""
Telegram Bot v7 β€” Full Featured
- Commands: /model, /settings, /capability, /schedule, etc.
- Cross-platform: web UI sync via SSE push
- Scheduled messages
- All media types
"""
import os, json, asyncio, base64, traceback, socket, re
import httpx
import config as cfg
TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}"
_histories: dict = {}
# Settings per chat
_chat_settings: dict = {} # chat_id -> {"model": ..., "voice": bool}
MODELS = {
"lite": "LongCat-Flash-Lite",
"chat": "LongCat-Flash-Chat",
"thinking": "LongCat-Flash-Thinking-2601",
"flash": "LongCat-Flash-Lite",
}
COMMANDS_HELP = """
*PraisonChat Commands:*
*Chat:*
/start β€” Welcome message
/clear β€” Clear conversation history
/status β€” Show system status
/help β€” This help message
*Model Settings:*
/model β€” Show current model
/model lite β€” Switch to Flash Lite (fastest, 50M/day)
/model chat β€” Switch to Flash Chat
/model thinking β€” Switch to Flash Thinking (deep reasoning)
*Capabilities:*
/can β€” Show what I can do
/memory β€” Show saved memories
/skills β€” Show saved skills
*Scheduling:*
"Send me hello in 2 minutes" β€” schedule a message
"Remind me about X in 1 hour" β€” set a reminder
*Settings:*
/voice on|off β€” Toggle voice responses
/lang en|hi|es β€” Set voice language
"""
def _url(method):
return TELEGRAM_API.format(token=cfg.get_telegram_token(), method=method)
def check_reachable():
try:
socket.setdefaulttimeout(5)
socket.gethostbyname("api.telegram.org")
return True, "OK"
except Exception as e:
return False, str(e)
async def _post(method, timeout=15, **kwargs):
async with httpx.AsyncClient(timeout=timeout) as c:
r = await c.post(_url(method), **kwargs)
return r.json()
async def send_message(chat_id, text, parse_mode="Markdown"):
if not text or not str(text).strip(): return True
text = str(text)
chunks = [text[i:i+3900] for i in range(0, len(text), 3900)]
for chunk in chunks:
try:
r = await _post("sendMessage",
json={"chat_id": chat_id, "text": chunk, "parse_mode": parse_mode})
if not r.get("ok"):
await _post("sendMessage", json={"chat_id": chat_id, "text": chunk})
except Exception as e:
print(f"[TG] send_message error: {e}")
return True
async def send_typing(chat_id):
try:
await _post("sendChatAction", json={"chat_id": chat_id, "action": "typing"})
except Exception:
pass
async def send_voice(chat_id, audio_b64):
try:
audio_bytes = base64.b64decode(audio_b64)
async with httpx.AsyncClient(timeout=60) as c:
r = await c.post(_url("sendVoice"),
files={"voice": ("voice.mp3", audio_bytes, "audio/mpeg")},
data={"chat_id": str(chat_id)})
return r.json().get("ok", False)
except Exception as e:
print(f"[TG] voice error: {e}")
return False
async def send_photo(chat_id, img_b64, filename, ext):
try:
img_bytes = base64.b64decode(img_b64)
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post(_url("sendPhoto"),
files={"photo": (filename, img_bytes, f"image/{ext}")},
data={"chat_id": str(chat_id), "caption": f"πŸ–ΌοΈ {filename}"})
return r.json().get("ok", False)
except Exception as e:
print(f"[TG] photo error: {e}")
return False
async def send_document(chat_id, file_b64, filename):
try:
file_bytes = base64.b64decode(file_b64)
async with httpx.AsyncClient(timeout=60) as c:
r = await c.post(_url("sendDocument"),
files={"document": (filename, file_bytes, "application/octet-stream")},
data={"chat_id": str(chat_id), "caption": f"πŸ“„ {filename}"})
return r.json().get("ok", False)
except Exception as e:
print(f"[TG] doc error: {e}")
return False
async def set_webhook(base_url):
url = base_url.rstrip("/") + "/telegram/webhook"
return await _post("setWebhook",
json={"url": url, "allowed_updates": ["message"], "drop_pending_updates": True})
async def delete_webhook():
return await _post("deleteWebhook", json={})
async def get_bot_info():
return await _post("getMe", timeout=10)
async def get_webhook_info():
return await _post("getWebhookInfo", timeout=10)
def get_chat_model(chat_id):
return _chat_settings.get(chat_id, {}).get("model", cfg.get_model())
def get_voice_lang(chat_id):
return _chat_settings.get(chat_id, {}).get("lang", "en")
def is_voice_on(chat_id):
return _chat_settings.get(chat_id, {}).get("voice", False)
async def handle_command(chat_id, text, username) -> bool:
"""Handle /commands. Returns True if handled."""
parts = text.split()
cmd = parts[0].lower().split("@")[0] # handle /cmd@botname
if cmd == "/start":
await send_message(chat_id,
f"πŸ‘‹ Hello *{username}*! I'm *PraisonChat* 🦞\n\n"
"I'm an autonomous AI agent. I can search the web, generate files, "
"create voice audio, make charts, run code, and much more.\n\n"
"Type /help to see all commands, or just ask me anything!")
return True
if cmd == "/clear":
_histories[chat_id] = []
await send_message(chat_id, "βœ… History cleared!")
return True
if cmd == "/help":
await send_message(chat_id, COMMANDS_HELP)
return True
if cmd == "/status":
model = get_chat_model(chat_id)
api = cfg.get_longcat_key()
await send_message(chat_id,
f"βœ… *PraisonChat Online*\n"
f"Model: `{model}`\n"
f"API: `{'set βœ…' if api else 'missing ❌'}`\n"
f"Voice: `{'on πŸ”Š' if is_voice_on(chat_id) else 'off'}`\n"
f"History: `{len(_histories.get(chat_id,[]))} messages`")
return True
if cmd == "/model":
if len(parts) == 1:
model = get_chat_model(chat_id)
await send_message(chat_id,
f"Current model: `{model}`\n\n"
f"Switch with:\n"
f"/model lite β€” Flash Lite (fastest)\n"
f"/model chat β€” Flash Chat\n"
f"/model thinking β€” Flash Thinking (deep)")
else:
choice = parts[1].lower()
if choice in MODELS:
new_model = MODELS[choice]
if chat_id not in _chat_settings:
_chat_settings[chat_id] = {}
_chat_settings[chat_id]["model"] = new_model
await send_message(chat_id, f"βœ… Model switched to: `{new_model}`")
else:
await send_message(chat_id, f"Unknown model. Options: `lite`, `chat`, `thinking`")
return True
if cmd == "/voice":
if len(parts) < 2:
state = is_voice_on(chat_id)
await send_message(chat_id, f"Voice is currently: `{'on πŸ”Š' if state else 'off'}`\nUse /voice on or /voice off")
else:
on = parts[1].lower() == "on"
if chat_id not in _chat_settings:
_chat_settings[chat_id] = {}
_chat_settings[chat_id]["voice"] = on
await send_message(chat_id, f"Voice: `{'on πŸ”Š' if on else 'off'}`")
return True
if cmd == "/lang":
if len(parts) >= 2:
lang = parts[1].lower()
if chat_id not in _chat_settings:
_chat_settings[chat_id] = {}
_chat_settings[chat_id]["lang"] = lang
await send_message(chat_id, f"βœ… Voice language set to: `{lang}`")
else:
await send_message(chat_id, f"Usage: /lang en|hi|es|fr|de|ja|zh")
return True
if cmd == "/can":
await send_message(chat_id,
"🦞 *What I can do:*\n\n"
"πŸ” Search the web in real-time\n"
"πŸ• Get current date & time\n"
"πŸ“° Find latest news & events\n"
"πŸ—£οΈ Generate voice audio (MP3)\n"
"πŸ–ΌοΈ Create images & charts\n"
"πŸ“„ Generate PDF documents\n"
"πŸ“± Create QR codes\n"
"🐍 Execute Python code\n"
"πŸ“¦ Install Python packages\n"
"⏰ Schedule messages (say 'in 5 minutes')\n"
"πŸ€– Spawn research sub-agents\n"
"🧠 Remember things across conversations\n"
"⚑ Create and reuse skills\n\n"
"Just ask β€” I'll figure out how to do it!")
return True
if cmd == "/memory":
from memory import list_memories, load_memory
mems = list_memories()
if not mems:
await send_message(chat_id, "No memories saved yet.")
else:
text = "*Saved Memories:*\n\n"
for key in mems[:10]:
content = load_memory(key)
text += f"β€’ `{key}`: {content[:80]}…\n"
await send_message(chat_id, text)
return True
if cmd == "/skills":
from memory import list_skills
skills = list_skills()
if not skills:
await send_message(chat_id, "No skills saved yet.")
else:
text = "*Saved Skills:*\n\n"
for s in skills[:10]:
text += f"β€’ `{s['name']}`: {s.get('description','')[:80]}\n"
await send_message(chat_id, text)
return True
return False
async def handle_update(update, api_key, model):
from agent_system import orchestrator
msg = update.get("message") or update.get("edited_message")
if not msg: return
chat_id = msg["chat"]["id"]
text = msg.get("text","").strip()
username = msg.get("from",{}).get("first_name","User")
if not text: return
print(f"[TG] {username} ({chat_id}): {text[:60]}")
# Handle commands
if text.startswith("/"):
if await handle_command(chat_id, text, username):
return
# Unknown command
await send_message(chat_id, f"Unknown command. Type /help for all commands.")
return
# Get per-chat model
chat_model = get_chat_model(chat_id)
voice_on = is_voice_on(chat_id)
voice_lang = get_voice_lang(chat_id)
await send_typing(chat_id)
# Add voice instruction if voice mode on
actual_msg = text
if voice_on and "voice" not in text.lower():
actual_msg = text + " (also generate a voice audio response)"
history = _histories.get(chat_id, [])
full_response = ""
audio_b64 = None
has_media = False
try:
async for chunk_json in orchestrator.stream_response(
actual_msg, history, api_key, chat_model, session_id=f"tg_{chat_id}"
):
try:
ev = json.loads(chunk_json)
except Exception:
continue
t = ev.get("type","")
if t == "token":
full_response += ev.get("content","")
elif t == "audio_response":
audio_b64 = ev.get("audio_b64")
has_media = True
elif t == "image_response":
img_b64 = ev.get("image_b64","")
filename = ev.get("filename","image.png")
ext = ev.get("ext","png")
if img_b64:
await send_typing(chat_id)
await send_photo(chat_id, img_b64, filename, ext)
has_media = True
elif t == "file_response":
file_b64 = ev.get("file_b64","")
filename = ev.get("filename","file")
if file_b64:
await send_typing(chat_id)
await send_document(chat_id, file_b64, filename)
has_media = True
elif t == "error":
await send_message(chat_id, f"❌ {ev.get('message','Error')[:300]}")
return
except Exception as e:
print(f"[TG] exception: {e}\n{traceback.format_exc()}")
await send_message(chat_id, f"❌ Error: {str(e)[:200]}")
return
# Clean and send text
clean = full_response
clean = re.sub(r'<[^>]+>.*?</[^>]+>', '', clean, flags=re.DOTALL)
clean = re.sub(r'\[SPEAK:.*?\]', '', clean, flags=re.DOTALL)
clean = clean.replace("**","*").strip()
if clean:
await send_message(chat_id, clean)
elif not has_media:
await send_message(chat_id, "_(Done)_")
if audio_b64:
await send_typing(chat_id)
await send_voice(chat_id, audio_b64)
# Update history
history.append({"role":"user","content":text})
history.append({"role":"assistant","content":full_response or "(media)"})
_histories[chat_id] = history[-30:]
print(f"[TG] Done β†’ {username}: {len(full_response)} chars")
async def deliver_scheduled(chat_id, message):
"""Deliver a scheduled message to Telegram."""
await send_typing(chat_id)
await send_message(chat_id, f"⏰ *Scheduled delivery:*\n\n{message}")