| """ |
| Telegram Bot v7 β Full Featured |
| - Commands: /model, /settings, /capability, /schedule, etc. |
| - Cross-platform: web UI sync via SSE push |
| - Scheduled messages |
| - All media types |
| """ |
| import os, json, asyncio, base64, traceback, socket, re |
| import httpx |
| import config as cfg |
|
|
| TELEGRAM_API = "https://api.telegram.org/bot{token}/{method}" |
| _histories: dict = {} |
|
|
| |
| _chat_settings: dict = {} |
|
|
| MODELS = { |
| "lite": "LongCat-Flash-Lite", |
| "chat": "LongCat-Flash-Chat", |
| "thinking": "LongCat-Flash-Thinking-2601", |
| "flash": "LongCat-Flash-Lite", |
| } |
|
|
| COMMANDS_HELP = """ |
| *PraisonChat Commands:* |
| |
| *Chat:* |
| /start β Welcome message |
| /clear β Clear conversation history |
| /status β Show system status |
| /help β This help message |
| |
| *Model Settings:* |
| /model β Show current model |
| /model lite β Switch to Flash Lite (fastest, 50M/day) |
| /model chat β Switch to Flash Chat |
| /model thinking β Switch to Flash Thinking (deep reasoning) |
| |
| *Capabilities:* |
| /can β Show what I can do |
| /memory β Show saved memories |
| /skills β Show saved skills |
| |
| *Scheduling:* |
| "Send me hello in 2 minutes" β schedule a message |
| "Remind me about X in 1 hour" β set a reminder |
| |
| *Settings:* |
| /voice on|off β Toggle voice responses |
| /lang en|hi|es β Set voice language |
| """ |
|
|
|
|
| def _url(method): |
| return TELEGRAM_API.format(token=cfg.get_telegram_token(), method=method) |
|
|
| def check_reachable(): |
| try: |
| socket.setdefaulttimeout(5) |
| socket.gethostbyname("api.telegram.org") |
| return True, "OK" |
| except Exception as e: |
| return False, str(e) |
|
|
| async def _post(method, timeout=15, **kwargs): |
| async with httpx.AsyncClient(timeout=timeout) as c: |
| r = await c.post(_url(method), **kwargs) |
| return r.json() |
|
|
| async def send_message(chat_id, text, parse_mode="Markdown"): |
| if not text or not str(text).strip(): return True |
| text = str(text) |
| chunks = [text[i:i+3900] for i in range(0, len(text), 3900)] |
| for chunk in chunks: |
| try: |
| r = await _post("sendMessage", |
| json={"chat_id": chat_id, "text": chunk, "parse_mode": parse_mode}) |
| if not r.get("ok"): |
| await _post("sendMessage", json={"chat_id": chat_id, "text": chunk}) |
| except Exception as e: |
| print(f"[TG] send_message error: {e}") |
| return True |
|
|
| async def send_typing(chat_id): |
| try: |
| await _post("sendChatAction", json={"chat_id": chat_id, "action": "typing"}) |
| except Exception: |
| pass |
|
|
| async def send_voice(chat_id, audio_b64): |
| try: |
| audio_bytes = base64.b64decode(audio_b64) |
| async with httpx.AsyncClient(timeout=60) as c: |
| r = await c.post(_url("sendVoice"), |
| files={"voice": ("voice.mp3", audio_bytes, "audio/mpeg")}, |
| data={"chat_id": str(chat_id)}) |
| return r.json().get("ok", False) |
| except Exception as e: |
| print(f"[TG] voice error: {e}") |
| return False |
|
|
| async def send_photo(chat_id, img_b64, filename, ext): |
| try: |
| img_bytes = base64.b64decode(img_b64) |
| async with httpx.AsyncClient(timeout=30) as c: |
| r = await c.post(_url("sendPhoto"), |
| files={"photo": (filename, img_bytes, f"image/{ext}")}, |
| data={"chat_id": str(chat_id), "caption": f"πΌοΈ {filename}"}) |
| return r.json().get("ok", False) |
| except Exception as e: |
| print(f"[TG] photo error: {e}") |
| return False |
|
|
| async def send_document(chat_id, file_b64, filename): |
| try: |
| file_bytes = base64.b64decode(file_b64) |
| async with httpx.AsyncClient(timeout=60) as c: |
| r = await c.post(_url("sendDocument"), |
| files={"document": (filename, file_bytes, "application/octet-stream")}, |
| data={"chat_id": str(chat_id), "caption": f"π {filename}"}) |
| return r.json().get("ok", False) |
| except Exception as e: |
| print(f"[TG] doc error: {e}") |
| return False |
|
|
| async def set_webhook(base_url): |
| url = base_url.rstrip("/") + "/telegram/webhook" |
| return await _post("setWebhook", |
| json={"url": url, "allowed_updates": ["message"], "drop_pending_updates": True}) |
|
|
| async def delete_webhook(): |
| return await _post("deleteWebhook", json={}) |
|
|
| async def get_bot_info(): |
| return await _post("getMe", timeout=10) |
|
|
| async def get_webhook_info(): |
| return await _post("getWebhookInfo", timeout=10) |
|
|
|
|
| def get_chat_model(chat_id): |
| return _chat_settings.get(chat_id, {}).get("model", cfg.get_model()) |
|
|
| def get_voice_lang(chat_id): |
| return _chat_settings.get(chat_id, {}).get("lang", "en") |
|
|
| def is_voice_on(chat_id): |
| return _chat_settings.get(chat_id, {}).get("voice", False) |
|
|
|
|
| async def handle_command(chat_id, text, username) -> bool: |
| """Handle /commands. Returns True if handled.""" |
| parts = text.split() |
| cmd = parts[0].lower().split("@")[0] |
|
|
| if cmd == "/start": |
| await send_message(chat_id, |
| f"π Hello *{username}*! I'm *PraisonChat* π¦\n\n" |
| "I'm an autonomous AI agent. I can search the web, generate files, " |
| "create voice audio, make charts, run code, and much more.\n\n" |
| "Type /help to see all commands, or just ask me anything!") |
| return True |
|
|
| if cmd == "/clear": |
| _histories[chat_id] = [] |
| await send_message(chat_id, "β
History cleared!") |
| return True |
|
|
| if cmd == "/help": |
| await send_message(chat_id, COMMANDS_HELP) |
| return True |
|
|
| if cmd == "/status": |
| model = get_chat_model(chat_id) |
| api = cfg.get_longcat_key() |
| await send_message(chat_id, |
| f"β
*PraisonChat Online*\n" |
| f"Model: `{model}`\n" |
| f"API: `{'set β
' if api else 'missing β'}`\n" |
| f"Voice: `{'on π' if is_voice_on(chat_id) else 'off'}`\n" |
| f"History: `{len(_histories.get(chat_id,[]))} messages`") |
| return True |
|
|
| if cmd == "/model": |
| if len(parts) == 1: |
| model = get_chat_model(chat_id) |
| await send_message(chat_id, |
| f"Current model: `{model}`\n\n" |
| f"Switch with:\n" |
| f"/model lite β Flash Lite (fastest)\n" |
| f"/model chat β Flash Chat\n" |
| f"/model thinking β Flash Thinking (deep)") |
| else: |
| choice = parts[1].lower() |
| if choice in MODELS: |
| new_model = MODELS[choice] |
| if chat_id not in _chat_settings: |
| _chat_settings[chat_id] = {} |
| _chat_settings[chat_id]["model"] = new_model |
| await send_message(chat_id, f"β
Model switched to: `{new_model}`") |
| else: |
| await send_message(chat_id, f"Unknown model. Options: `lite`, `chat`, `thinking`") |
| return True |
|
|
| if cmd == "/voice": |
| if len(parts) < 2: |
| state = is_voice_on(chat_id) |
| await send_message(chat_id, f"Voice is currently: `{'on π' if state else 'off'}`\nUse /voice on or /voice off") |
| else: |
| on = parts[1].lower() == "on" |
| if chat_id not in _chat_settings: |
| _chat_settings[chat_id] = {} |
| _chat_settings[chat_id]["voice"] = on |
| await send_message(chat_id, f"Voice: `{'on π' if on else 'off'}`") |
| return True |
|
|
| if cmd == "/lang": |
| if len(parts) >= 2: |
| lang = parts[1].lower() |
| if chat_id not in _chat_settings: |
| _chat_settings[chat_id] = {} |
| _chat_settings[chat_id]["lang"] = lang |
| await send_message(chat_id, f"β
Voice language set to: `{lang}`") |
| else: |
| await send_message(chat_id, f"Usage: /lang en|hi|es|fr|de|ja|zh") |
| return True |
|
|
| if cmd == "/can": |
| await send_message(chat_id, |
| "π¦ *What I can do:*\n\n" |
| "π Search the web in real-time\n" |
| "π Get current date & time\n" |
| "π° Find latest news & events\n" |
| "π£οΈ Generate voice audio (MP3)\n" |
| "πΌοΈ Create images & charts\n" |
| "π Generate PDF documents\n" |
| "π± Create QR codes\n" |
| "π Execute Python code\n" |
| "π¦ Install Python packages\n" |
| "β° Schedule messages (say 'in 5 minutes')\n" |
| "π€ Spawn research sub-agents\n" |
| "π§ Remember things across conversations\n" |
| "β‘ Create and reuse skills\n\n" |
| "Just ask β I'll figure out how to do it!") |
| return True |
|
|
| if cmd == "/memory": |
| from memory import list_memories, load_memory |
| mems = list_memories() |
| if not mems: |
| await send_message(chat_id, "No memories saved yet.") |
| else: |
| text = "*Saved Memories:*\n\n" |
| for key in mems[:10]: |
| content = load_memory(key) |
| text += f"β’ `{key}`: {content[:80]}β¦\n" |
| await send_message(chat_id, text) |
| return True |
|
|
| if cmd == "/skills": |
| from memory import list_skills |
| skills = list_skills() |
| if not skills: |
| await send_message(chat_id, "No skills saved yet.") |
| else: |
| text = "*Saved Skills:*\n\n" |
| for s in skills[:10]: |
| text += f"β’ `{s['name']}`: {s.get('description','')[:80]}\n" |
| await send_message(chat_id, text) |
| return True |
|
|
| return False |
|
|
|
|
| async def handle_update(update, api_key, model): |
| from agent_system import orchestrator |
|
|
| msg = update.get("message") or update.get("edited_message") |
| if not msg: return |
|
|
| chat_id = msg["chat"]["id"] |
| text = msg.get("text","").strip() |
| username = msg.get("from",{}).get("first_name","User") |
|
|
| if not text: return |
| print(f"[TG] {username} ({chat_id}): {text[:60]}") |
|
|
| |
| if text.startswith("/"): |
| if await handle_command(chat_id, text, username): |
| return |
| |
| await send_message(chat_id, f"Unknown command. Type /help for all commands.") |
| return |
|
|
| |
| chat_model = get_chat_model(chat_id) |
| voice_on = is_voice_on(chat_id) |
| voice_lang = get_voice_lang(chat_id) |
|
|
| await send_typing(chat_id) |
|
|
| |
| actual_msg = text |
| if voice_on and "voice" not in text.lower(): |
| actual_msg = text + " (also generate a voice audio response)" |
|
|
| history = _histories.get(chat_id, []) |
| full_response = "" |
| audio_b64 = None |
| has_media = False |
|
|
| try: |
| async for chunk_json in orchestrator.stream_response( |
| actual_msg, history, api_key, chat_model, session_id=f"tg_{chat_id}" |
| ): |
| try: |
| ev = json.loads(chunk_json) |
| except Exception: |
| continue |
|
|
| t = ev.get("type","") |
|
|
| if t == "token": |
| full_response += ev.get("content","") |
| elif t == "audio_response": |
| audio_b64 = ev.get("audio_b64") |
| has_media = True |
| elif t == "image_response": |
| img_b64 = ev.get("image_b64","") |
| filename = ev.get("filename","image.png") |
| ext = ev.get("ext","png") |
| if img_b64: |
| await send_typing(chat_id) |
| await send_photo(chat_id, img_b64, filename, ext) |
| has_media = True |
| elif t == "file_response": |
| file_b64 = ev.get("file_b64","") |
| filename = ev.get("filename","file") |
| if file_b64: |
| await send_typing(chat_id) |
| await send_document(chat_id, file_b64, filename) |
| has_media = True |
| elif t == "error": |
| await send_message(chat_id, f"β {ev.get('message','Error')[:300]}") |
| return |
|
|
| except Exception as e: |
| print(f"[TG] exception: {e}\n{traceback.format_exc()}") |
| await send_message(chat_id, f"β Error: {str(e)[:200]}") |
| return |
|
|
| |
| clean = full_response |
| clean = re.sub(r'<[^>]+>.*?</[^>]+>', '', clean, flags=re.DOTALL) |
| clean = re.sub(r'\[SPEAK:.*?\]', '', clean, flags=re.DOTALL) |
| clean = clean.replace("**","*").strip() |
|
|
| if clean: |
| await send_message(chat_id, clean) |
| elif not has_media: |
| await send_message(chat_id, "_(Done)_") |
|
|
| if audio_b64: |
| await send_typing(chat_id) |
| await send_voice(chat_id, audio_b64) |
|
|
| |
| history.append({"role":"user","content":text}) |
| history.append({"role":"assistant","content":full_response or "(media)"}) |
| _histories[chat_id] = history[-30:] |
| print(f"[TG] Done β {username}: {len(full_response)} chars") |
|
|
|
|
| async def deliver_scheduled(chat_id, message): |
| """Deliver a scheduled message to Telegram.""" |
| await send_typing(chat_id) |
| await send_message(chat_id, f"β° *Scheduled delivery:*\n\n{message}") |