#!/usr/bin/env python3 """ Telegram bot wrapping the REAL Claude Code CLI. - Streams output to Telegram in real-time by editing messages - Merges stdout+stderr to capture everything - Robust timeout handling """ import os, asyncio, json, subprocess, logging, threading, time from flask import Flask, jsonify from telegram import Update, BotCommand, Bot from telegram.ext import Application, CommandHandler, MessageHandler, filters from telegram.constants import ChatAction from telegram.request import HTTPXRequest import httpx # ──── Config ──── TELEGRAM_BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"] ALLOWED_USERS = [u.strip() for u in os.environ.get("TELEGRAM_ALLOWED_USERS", "").split(",") if u.strip()] PROXY_URL = os.environ.get("PROXY_URL", "") PROXY_SECRET = os.environ.get("PROXY_SECRET", "") TELEGRAM_BASE_URL = f"{PROXY_URL}/bot" if PROXY_URL else "https://api.telegram.org/bot" MODEL = os.environ.get("MODEL", "z-ai/glm-5.1") START_TIME = time.time() # Claude Code environment CLAUDE_ENV = { **os.environ, "ANTHROPIC_BASE_URL": "http://localhost:4000", "ANTHROPIC_API_KEY": "sk-ant-proxy-key-for-local-translation", "ANTHROPIC_MODEL": "claude-sonnet-4-20250514", "DISABLE_PROMPT_CACHING": "true", "CLAUDE_CODE_SKIP_OOBE": "1", "HOME": "/home/user", "PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin"), "TERM": "xterm-256color", "LANG": "en_US.UTF-8", } logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("claude-bot") # ──── Flask health ──── flask_app = Flask(__name__) @flask_app.route("/") @flask_app.route("/health") def health(): uptime = int(time.time() - START_TIME) m, s = divmod(uptime, 60) h, m = divmod(m, 60) try: r = httpx.get("http://localhost:4000/health", timeout=5) proxy_ok = r.status_code == 200 except: proxy_ok = False try: v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV) claude_version = v.stdout.strip() or v.stderr.strip() or "?" except: claude_version = "?" return jsonify({ "status": "ok", "engine": "Claude Code CLI (real)", "claude_version": claude_version, "model_backend": MODEL, "proxy": "ok" if proxy_ok else "down", "uptime": f"{h}h{m}m{s}s", }) # ──── Claude Code runner with streaming ──── async def run_claude_streaming(prompt: str, status_msg, chat) -> str: """Run claude -p and stream output to Telegram by editing the status message.""" cmd = [ "claude", "-p", prompt, "--dangerously-skip-permissions", "--output-format", "text", ] log.info(f"Running: claude -p '{prompt[:80]}...'") proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, # Merge stderr → stdout (avoids deadlocks) cwd="/home/user/workspace", env=CLAUDE_ENV, ) accumulated = "" last_edit = time.time() last_typing = time.time() try: while True: try: chunk = await asyncio.wait_for(proc.stdout.read(1024), timeout=4) except asyncio.TimeoutError: # Keep typing indicator alive now = time.time() if now - last_typing > 3: try: await chat.send_action(ChatAction.TYPING) except: pass last_typing = now # Check if process died if proc.returncode is not None: break continue if not chunk: break text = chunk.decode("utf-8", errors="replace") accumulated += text # Stream to Telegram: edit message every 3 seconds now = time.time() if now - last_edit > 3 and accumulated.strip(): display = accumulated.strip() # Take last 3900 chars to fit Telegram limit if len(display) > 3900: display = "...\n" + display[-3900:] try: await status_msg.edit_text(display[:4096]) except Exception as e: log.debug(f"Edit failed: {e}") last_edit = now except asyncio.CancelledError: proc.kill() raise except Exception as e: log.error(f"Stream read error: {e}") # Wait for process to finish try: await asyncio.wait_for(proc.wait(), timeout=10) except asyncio.TimeoutError: proc.kill() exit_code = proc.returncode log.info(f"Claude Code exited with code {exit_code}, output length: {len(accumulated)}") result = accumulated.strip() if result: return result else: return f"❌ No output (exit code: {exit_code})" # ──── Telegram Handlers ──── async def cmd_start(update: Update, context): uid = str(update.effective_user.id) if ALLOWED_USERS and uid not in ALLOWED_USERS: await update.message.reply_text("⛔") return try: v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV) version = v.stdout.strip() or "?" except: version = "?" await update.message.reply_text( "🤖 *Claude Code* — Real CLI Agent\n\n" f"📦 Version: `{version}`\n" f"🧠 Backend: `{MODEL}`\n" "💻 Workspace: `/home/user/workspace`\n" "🔧 Permissions: Full (bash, read, write, edit)\n\n" "*Commands:*\n" "/start — New session\n" "/clear — Clear workspace & session\n" "/files — List workspace files\n" "/bash `cmd` — Quick bash command\n\n" "Send any coding task! 🚀", parse_mode="Markdown", ) async def cmd_clear(update: Update, context): uid = str(update.effective_user.id) if ALLOWED_USERS and uid not in ALLOWED_USERS: return subprocess.run(["bash", "-c", "rm -rf /home/user/workspace/* /home/user/.claude/projects/*"], capture_output=True, cwd="/home/user") await update.message.reply_text("🧹 Workspace and session cleared.") async def cmd_files(update: Update, context): uid = str(update.effective_user.id) if ALLOWED_USERS and uid not in ALLOWED_USERS: return r = subprocess.run( ["bash", "-c", "find /home/user/workspace -type f 2>/dev/null | head -50 || echo '(empty)'"], capture_output=True, text=True, timeout=10 ) files = r.stdout.strip() or "(empty)" await update.message.reply_text(f"📁 Workspace:\n```\n{files}\n```", parse_mode="Markdown") async def cmd_bash(update: Update, context): uid = str(update.effective_user.id) if ALLOWED_USERS and uid not in ALLOWED_USERS: return cmd = update.message.text.replace("/bash", "", 1).strip() if not cmd: await update.message.reply_text("Usage: `/bash ls -la`", parse_mode="Markdown") return await update.message.chat.send_action(ChatAction.TYPING) try: r = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True, timeout=60, cwd="/home/user/workspace") output = (r.stdout + r.stderr).strip() or "(no output)" await send_long(update, f"```\n{output[:4000]}\n```") except subprocess.TimeoutExpired: await update.message.reply_text("⏰ Timed out (60s).") async def handle_message(update: Update, context): uid = str(update.effective_user.id) if ALLOWED_USERS and uid not in ALLOWED_USERS: await update.message.reply_text("⛔") return text = update.message.text if not text: return # Send initial streaming message status_msg = await update.message.reply_text("⏳ Claude Code is working...") try: # Run with streaming (edits the status_msg in real-time) result = await asyncio.wait_for( run_claude_streaming(text, status_msg, update.message.chat), timeout=330, # 5.5 min total timeout ) # Send final result if len(result) <= 4096: try: await status_msg.edit_text(result, parse_mode="Markdown") except: try: await status_msg.edit_text(result) except: await send_long(update, result) else: # Delete status msg and send in chunks try: await status_msg.delete() except: pass await send_long(update, result) except asyncio.TimeoutError: await status_msg.edit_text("⏰ Timed out after 5 minutes.") except Exception as e: log.exception("Error in handle_message") try: await status_msg.edit_text(f"❌ Error: {e}") except: await update.message.reply_text(f"❌ Error: {e}") async def send_long(update: Update, text: str): """Send long text in chunks.""" if not text: text = "✅ Done." chunks = [text[i:i + 4096] for i in range(0, len(text), 4096)] for chunk in chunks[:15]: try: await update.message.reply_text(chunk, parse_mode="Markdown") except: try: await update.message.reply_text(chunk) except Exception as e: log.error(f"Send failed: {e}") # ──── Main ──── def run_flask(): port = int(os.environ.get("FLASK_PORT", "7860")) flask_app.run(host="0.0.0.0", port=port, use_reloader=False) async def main(): log.info("Starting Claude Code Telegram Bot") flask_thread = threading.Thread(target=run_flask, daemon=True) flask_thread.start() request = HTTPXRequest( connection_pool_size=8, connect_timeout=30.0, read_timeout=60.0, write_timeout=30.0, ) bot = Bot( token=TELEGRAM_BOT_TOKEN, base_url=TELEGRAM_BASE_URL, request=request, ) application = Application.builder().bot(bot).build() application.add_handler(CommandHandler("start", cmd_start)) application.add_handler(CommandHandler("clear", cmd_clear)) application.add_handler(CommandHandler("files", cmd_files)) application.add_handler(CommandHandler("bash", cmd_bash)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) await application.initialize() await application.start() await application.updater.start_polling(drop_pending_updates=True) log.info("✅ Bot started — polling Telegram") try: await bot.set_my_commands([ BotCommand("start", "New session"), BotCommand("clear", "Clear workspace"), BotCommand("files", "List files"), BotCommand("bash", "Run bash command"), ]) except: pass stop = asyncio.Event() await stop.wait() if __name__ == "__main__": asyncio.run(main())