""" Telegram Bot v7 — Full Featured - Commands: /model, /settings, /capability, /schedule, etc. - Cross-platform: web UI sync via SSE push - Scheduled messages - All media types """ import os, json, asyncio, base64, traceback, socket, re import httpx import config as cfg TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}" _histories: dict = {} # Settings per chat _chat_settings: dict = {} # chat_id -> {"model": ..., "voice": bool} MODELS = { "lite": "LongCat-Flash-Lite", "chat": "LongCat-Flash-Chat", "thinking": "LongCat-Flash-Thinking-2601", "flash": "LongCat-Flash-Lite", } COMMANDS_HELP = """ *PraisonChat Commands:* *Chat:* /start — Welcome message /clear — Clear conversation history /status — Show system status /help — This help message *Model Settings:* /model — Show current model /model lite — Switch to Flash Lite (fastest, 50M/day) /model chat — Switch to Flash Chat /model thinking — Switch to Flash Thinking (deep reasoning) *Capabilities:* /can — Show what I can do /memory — Show saved memories /skills — Show saved skills *Scheduling:* "Send me hello in 2 minutes" — schedule a message "Remind me about X in 1 hour" — set a reminder *Settings:* /voice on|off — Toggle voice responses /lang en|hi|es — Set voice language """ def _url(method): return TELEGRAM_API.format(token=cfg.get_telegram_token(), method=method) def check_reachable(): try: socket.setdefaulttimeout(5) socket.gethostbyname("api.telegram.org") return True, "OK" except Exception as e: return False, str(e) async def _post(method, timeout=15, **kwargs): async with httpx.AsyncClient(timeout=timeout) as c: r = await c.post(_url(method), **kwargs) return r.json() async def send_message(chat_id, text, parse_mode="Markdown"): if not text or not str(text).strip(): return True text = str(text) chunks = [text[i:i+3900] for i in range(0, len(text), 3900)] for chunk in chunks: try: r = await _post("sendMessage", json={"chat_id": chat_id, "text": chunk, "parse_mode": parse_mode}) if not r.get("ok"): await _post("sendMessage", json={"chat_id": chat_id, "text": chunk}) except Exception as e: print(f"[TG] send_message error: {e}") return True async def send_typing(chat_id): try: await _post("sendChatAction", json={"chat_id": chat_id, "action": "typing"}) except Exception: pass async def send_voice(chat_id, audio_b64): try: audio_bytes = base64.b64decode(audio_b64) async with httpx.AsyncClient(timeout=60) as c: r = await c.post(_url("sendVoice"), files={"voice": ("voice.mp3", audio_bytes, "audio/mpeg")}, data={"chat_id": str(chat_id)}) return r.json().get("ok", False) except Exception as e: print(f"[TG] voice error: {e}") return False async def send_photo(chat_id, img_b64, filename, ext): try: img_bytes = base64.b64decode(img_b64) async with httpx.AsyncClient(timeout=30) as c: r = await c.post(_url("sendPhoto"), files={"photo": (filename, img_bytes, f"image/{ext}")}, data={"chat_id": str(chat_id), "caption": f"🖼️ {filename}"}) return r.json().get("ok", False) except Exception as e: print(f"[TG] photo error: {e}") return False async def send_document(chat_id, file_b64, filename): try: file_bytes = base64.b64decode(file_b64) async with httpx.AsyncClient(timeout=60) as c: r = await c.post(_url("sendDocument"), files={"document": (filename, file_bytes, "application/octet-stream")}, data={"chat_id": str(chat_id), "caption": f"📄 {filename}"}) return r.json().get("ok", False) except Exception as e: print(f"[TG] doc error: {e}") return False async def set_webhook(base_url): url = base_url.rstrip("/") + "/telegram/webhook" return await _post("setWebhook", json={"url": url, "allowed_updates": ["message"], "drop_pending_updates": True}) async def delete_webhook(): return await _post("deleteWebhook", json={}) async def get_bot_info(): return await _post("getMe", timeout=10) async def get_webhook_info(): return await _post("getWebhookInfo", timeout=10) def get_chat_model(chat_id): return _chat_settings.get(chat_id, {}).get("model", cfg.get_model()) def get_voice_lang(chat_id): return _chat_settings.get(chat_id, {}).get("lang", "en") def is_voice_on(chat_id): return _chat_settings.get(chat_id, {}).get("voice", False) async def handle_command(chat_id, text, username) -> bool: """Handle /commands. Returns True if handled.""" parts = text.split() cmd = parts[0].lower().split("@")[0] # handle /cmd@botname if cmd == "/start": await send_message(chat_id, f"👋 Hello *{username}*! I'm *PraisonChat* 🦞\n\n" "I'm an autonomous AI agent. I can search the web, generate files, " "create voice audio, make charts, run code, and much more.\n\n" "Type /help to see all commands, or just ask me anything!") return True if cmd == "/clear": _histories[chat_id] = [] await send_message(chat_id, "✅ History cleared!") return True if cmd == "/help": await send_message(chat_id, COMMANDS_HELP) return True if cmd == "/status": model = get_chat_model(chat_id) api = cfg.get_longcat_key() await send_message(chat_id, f"✅ *PraisonChat Online*\n" f"Model: `{model}`\n" f"API: `{'set ✅' if api else 'missing ❌'}`\n" f"Voice: `{'on 🔊' if is_voice_on(chat_id) else 'off'}`\n" f"History: `{len(_histories.get(chat_id,[]))} messages`") return True if cmd == "/model": if len(parts) == 1: model = get_chat_model(chat_id) await send_message(chat_id, f"Current model: `{model}`\n\n" f"Switch with:\n" f"/model lite — Flash Lite (fastest)\n" f"/model chat — Flash Chat\n" f"/model thinking — Flash Thinking (deep)") else: choice = parts[1].lower() if choice in MODELS: new_model = MODELS[choice] if chat_id not in _chat_settings: _chat_settings[chat_id] = {} _chat_settings[chat_id]["model"] = new_model await send_message(chat_id, f"✅ Model switched to: `{new_model}`") else: await send_message(chat_id, f"Unknown model. Options: `lite`, `chat`, `thinking`") return True if cmd == "/voice": if len(parts) < 2: state = is_voice_on(chat_id) await send_message(chat_id, f"Voice is currently: `{'on 🔊' if state else 'off'}`\nUse /voice on or /voice off") else: on = parts[1].lower() == "on" if chat_id not in _chat_settings: _chat_settings[chat_id] = {} _chat_settings[chat_id]["voice"] = on await send_message(chat_id, f"Voice: `{'on 🔊' if on else 'off'}`") return True if cmd == "/lang": if len(parts) >= 2: lang = parts[1].lower() if chat_id not in _chat_settings: _chat_settings[chat_id] = {} _chat_settings[chat_id]["lang"] = lang await send_message(chat_id, f"✅ Voice language set to: `{lang}`") else: await send_message(chat_id, f"Usage: /lang en|hi|es|fr|de|ja|zh") return True if cmd == "/can": await send_message(chat_id, "🦞 *What I can do:*\n\n" "🔍 Search the web in real-time\n" "🕐 Get current date & time\n" "📰 Find latest news & events\n" "🗣️ Generate voice audio (MP3)\n" "🖼️ Create images & charts\n" "📄 Generate PDF documents\n" "📱 Create QR codes\n" "🐍 Execute Python code\n" "📦 Install Python packages\n" "⏰ Schedule messages (say 'in 5 minutes')\n" "🤖 Spawn research sub-agents\n" "🧠 Remember things across conversations\n" "⚡ Create and reuse skills\n\n" "Just ask — I'll figure out how to do it!") return True if cmd == "/memory": from memory import list_memories, load_memory mems = list_memories() if not mems: await send_message(chat_id, "No memories saved yet.") else: text = "*Saved Memories:*\n\n" for key in mems[:10]: content = load_memory(key) text += f"• `{key}`: {content[:80]}…\n" await send_message(chat_id, text) return True if cmd == "/skills": from memory import list_skills skills = list_skills() if not skills: await send_message(chat_id, "No skills saved yet.") else: text = "*Saved Skills:*\n\n" for s in skills[:10]: text += f"• `{s['name']}`: {s.get('description','')[:80]}\n" await send_message(chat_id, text) return True return False async def handle_update(update, api_key, model): from agent_system import orchestrator msg = update.get("message") or update.get("edited_message") if not msg: return chat_id = msg["chat"]["id"] text = msg.get("text","").strip() username = msg.get("from",{}).get("first_name","User") if not text: return print(f"[TG] {username} ({chat_id}): {text[:60]}") # Handle commands if text.startswith("/"): if await handle_command(chat_id, text, username): return # Unknown command await send_message(chat_id, f"Unknown command. Type /help for all commands.") return # Get per-chat model chat_model = get_chat_model(chat_id) voice_on = is_voice_on(chat_id) voice_lang = get_voice_lang(chat_id) await send_typing(chat_id) # Add voice instruction if voice mode on actual_msg = text if voice_on and "voice" not in text.lower(): actual_msg = text + " (also generate a voice audio response)" history = _histories.get(chat_id, []) full_response = "" audio_b64 = None has_media = False try: async for chunk_json in orchestrator.stream_response( actual_msg, history, api_key, chat_model, session_id=f"tg_{chat_id}" ): try: ev = json.loads(chunk_json) except Exception: continue t = ev.get("type","") if t == "token": full_response += ev.get("content","") elif t == "audio_response": audio_b64 = ev.get("audio_b64") has_media = True elif t == "image_response": img_b64 = ev.get("image_b64","") filename = ev.get("filename","image.png") ext = ev.get("ext","png") if img_b64: await send_typing(chat_id) await send_photo(chat_id, img_b64, filename, ext) has_media = True elif t == "file_response": file_b64 = ev.get("file_b64","") filename = ev.get("filename","file") if file_b64: await send_typing(chat_id) await send_document(chat_id, file_b64, filename) has_media = True elif t == "error": await send_message(chat_id, f"❌ {ev.get('message','Error')[:300]}") return except Exception as e: print(f"[TG] exception: {e}\n{traceback.format_exc()}") await send_message(chat_id, f"❌ Error: {str(e)[:200]}") return # Clean and send text clean = full_response clean = re.sub(r'<[^>]+>.*?]+>', '', clean, flags=re.DOTALL) clean = re.sub(r'\[SPEAK:.*?\]', '', clean, flags=re.DOTALL) clean = clean.replace("**","*").strip() if clean: await send_message(chat_id, clean) elif not has_media: await send_message(chat_id, "_(Done)_") if audio_b64: await send_typing(chat_id) await send_voice(chat_id, audio_b64) # Update history history.append({"role":"user","content":text}) history.append({"role":"assistant","content":full_response or "(media)"}) _histories[chat_id] = history[-30:] print(f"[TG] Done → {username}: {len(full_response)} chars") async def deliver_scheduled(chat_id, message): """Deliver a scheduled message to Telegram.""" await send_typing(chat_id) await send_message(chat_id, f"⏰ *Scheduled delivery:*\n\n{message}")