Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Telegram bot wrapping the REAL Claude Code CLI. | |
| - Streams output to Telegram in real-time by editing messages | |
| - Merges stdout+stderr to capture everything | |
| - Robust timeout handling | |
| """ | |
| import os, asyncio, json, subprocess, logging, threading, time | |
| from flask import Flask, jsonify | |
| from telegram import Update, BotCommand, Bot | |
| from telegram.ext import Application, CommandHandler, MessageHandler, filters | |
| from telegram.constants import ChatAction | |
| from telegram.request import HTTPXRequest | |
| import httpx | |
| # ββββ Config ββββ | |
| TELEGRAM_BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"] | |
| ALLOWED_USERS = [u.strip() for u in os.environ.get("TELEGRAM_ALLOWED_USERS", "").split(",") if u.strip()] | |
| PROXY_URL = os.environ.get("PROXY_URL", "") | |
| PROXY_SECRET = os.environ.get("PROXY_SECRET", "") | |
| TELEGRAM_BASE_URL = f"{PROXY_URL}/bot" if PROXY_URL else "https://api.telegram.org/bot" | |
| MODEL = os.environ.get("MODEL", "z-ai/glm-5.1") | |
| START_TIME = time.time() | |
| # Claude Code environment | |
| CLAUDE_ENV = { | |
| **os.environ, | |
| "ANTHROPIC_BASE_URL": "http://localhost:4000", | |
| "ANTHROPIC_API_KEY": "sk-ant-proxy-key-for-local-translation", | |
| "ANTHROPIC_MODEL": "claude-sonnet-4-20250514", | |
| "DISABLE_PROMPT_CACHING": "true", | |
| "CLAUDE_CODE_SKIP_OOBE": "1", | |
| "HOME": "/home/user", | |
| "PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin"), | |
| "TERM": "xterm-256color", | |
| "LANG": "en_US.UTF-8", | |
| } | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger("claude-bot") | |
| # ββββ Flask health ββββ | |
| flask_app = Flask(__name__) | |
| def health(): | |
| uptime = int(time.time() - START_TIME) | |
| m, s = divmod(uptime, 60) | |
| h, m = divmod(m, 60) | |
| try: | |
| r = httpx.get("http://localhost:4000/health", timeout=5) | |
| proxy_ok = r.status_code == 200 | |
| except: | |
| proxy_ok = False | |
| try: | |
| v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV) | |
| claude_version = v.stdout.strip() or v.stderr.strip() or "?" | |
| except: | |
| claude_version = "?" | |
| return jsonify({ | |
| "status": "ok", | |
| "engine": "Claude Code CLI (real)", | |
| "claude_version": claude_version, | |
| "model_backend": MODEL, | |
| "proxy": "ok" if proxy_ok else "down", | |
| "uptime": f"{h}h{m}m{s}s", | |
| }) | |
| # ββββ Claude Code runner with streaming ββββ | |
| async def run_claude_streaming(prompt: str, status_msg, chat) -> str: | |
| """Run claude -p and stream output to Telegram by editing the status message.""" | |
| cmd = [ | |
| "claude", "-p", prompt, | |
| "--dangerously-skip-permissions", | |
| "--output-format", "text", | |
| ] | |
| log.info(f"Running: claude -p '{prompt[:80]}...'") | |
| proc = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.STDOUT, # Merge stderr β stdout (avoids deadlocks) | |
| cwd="/home/user/workspace", | |
| env=CLAUDE_ENV, | |
| ) | |
| accumulated = "" | |
| last_edit = time.time() | |
| last_typing = time.time() | |
| try: | |
| while True: | |
| try: | |
| chunk = await asyncio.wait_for(proc.stdout.read(1024), timeout=4) | |
| except asyncio.TimeoutError: | |
| # Keep typing indicator alive | |
| now = time.time() | |
| if now - last_typing > 3: | |
| try: | |
| await chat.send_action(ChatAction.TYPING) | |
| except: | |
| pass | |
| last_typing = now | |
| # Check if process died | |
| if proc.returncode is not None: | |
| break | |
| continue | |
| if not chunk: | |
| break | |
| text = chunk.decode("utf-8", errors="replace") | |
| accumulated += text | |
| # Stream to Telegram: edit message every 3 seconds | |
| now = time.time() | |
| if now - last_edit > 3 and accumulated.strip(): | |
| display = accumulated.strip() | |
| # Take last 3900 chars to fit Telegram limit | |
| if len(display) > 3900: | |
| display = "...\n" + display[-3900:] | |
| try: | |
| await status_msg.edit_text(display[:4096]) | |
| except Exception as e: | |
| log.debug(f"Edit failed: {e}") | |
| last_edit = now | |
| except asyncio.CancelledError: | |
| proc.kill() | |
| raise | |
| except Exception as e: | |
| log.error(f"Stream read error: {e}") | |
| # Wait for process to finish | |
| try: | |
| await asyncio.wait_for(proc.wait(), timeout=10) | |
| except asyncio.TimeoutError: | |
| proc.kill() | |
| exit_code = proc.returncode | |
| log.info(f"Claude Code exited with code {exit_code}, output length: {len(accumulated)}") | |
| result = accumulated.strip() | |
| if result: | |
| return result | |
| else: | |
| return f"β No output (exit code: {exit_code})" | |
| # ββββ Telegram Handlers ββββ | |
| async def cmd_start(update: Update, context): | |
| uid = str(update.effective_user.id) | |
| if ALLOWED_USERS and uid not in ALLOWED_USERS: | |
| await update.message.reply_text("β") | |
| return | |
| try: | |
| v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV) | |
| version = v.stdout.strip() or "?" | |
| except: | |
| version = "?" | |
| await update.message.reply_text( | |
| "π€ *Claude Code* β Real CLI Agent\n\n" | |
| f"π¦ Version: `{version}`\n" | |
| f"π§ Backend: `{MODEL}`\n" | |
| "π» Workspace: `/home/user/workspace`\n" | |
| "π§ Permissions: Full (bash, read, write, edit)\n\n" | |
| "*Commands:*\n" | |
| "/start β New session\n" | |
| "/clear β Clear workspace & session\n" | |
| "/files β List workspace files\n" | |
| "/bash `cmd` β Quick bash command\n\n" | |
| "Send any coding task! π", | |
| parse_mode="Markdown", | |
| ) | |
| async def cmd_clear(update: Update, context): | |
| uid = str(update.effective_user.id) | |
| if ALLOWED_USERS and uid not in ALLOWED_USERS: | |
| return | |
| subprocess.run(["bash", "-c", "rm -rf /home/user/workspace/* /home/user/.claude/projects/*"], | |
| capture_output=True, cwd="/home/user") | |
| await update.message.reply_text("π§Ή Workspace and session cleared.") | |
| async def cmd_files(update: Update, context): | |
| uid = str(update.effective_user.id) | |
| if ALLOWED_USERS and uid not in ALLOWED_USERS: | |
| return | |
| r = subprocess.run( | |
| ["bash", "-c", "find /home/user/workspace -type f 2>/dev/null | head -50 || echo '(empty)'"], | |
| capture_output=True, text=True, timeout=10 | |
| ) | |
| files = r.stdout.strip() or "(empty)" | |
| await update.message.reply_text(f"π Workspace:\n```\n{files}\n```", parse_mode="Markdown") | |
| async def cmd_bash(update: Update, context): | |
| uid = str(update.effective_user.id) | |
| if ALLOWED_USERS and uid not in ALLOWED_USERS: | |
| return | |
| cmd = update.message.text.replace("/bash", "", 1).strip() | |
| if not cmd: | |
| await update.message.reply_text("Usage: `/bash ls -la`", parse_mode="Markdown") | |
| return | |
| await update.message.chat.send_action(ChatAction.TYPING) | |
| try: | |
| r = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True, | |
| timeout=60, cwd="/home/user/workspace") | |
| output = (r.stdout + r.stderr).strip() or "(no output)" | |
| await send_long(update, f"```\n{output[:4000]}\n```") | |
| except subprocess.TimeoutExpired: | |
| await update.message.reply_text("β° Timed out (60s).") | |
| async def handle_message(update: Update, context): | |
| uid = str(update.effective_user.id) | |
| if ALLOWED_USERS and uid not in ALLOWED_USERS: | |
| await update.message.reply_text("β") | |
| return | |
| text = update.message.text | |
| if not text: | |
| return | |
| # Send initial streaming message | |
| status_msg = await update.message.reply_text("β³ Claude Code is working...") | |
| try: | |
| # Run with streaming (edits the status_msg in real-time) | |
| result = await asyncio.wait_for( | |
| run_claude_streaming(text, status_msg, update.message.chat), | |
| timeout=330, # 5.5 min total timeout | |
| ) | |
| # Send final result | |
| if len(result) <= 4096: | |
| try: | |
| await status_msg.edit_text(result, parse_mode="Markdown") | |
| except: | |
| try: | |
| await status_msg.edit_text(result) | |
| except: | |
| await send_long(update, result) | |
| else: | |
| # Delete status msg and send in chunks | |
| try: | |
| await status_msg.delete() | |
| except: | |
| pass | |
| await send_long(update, result) | |
| except asyncio.TimeoutError: | |
| await status_msg.edit_text("β° Timed out after 5 minutes.") | |
| except Exception as e: | |
| log.exception("Error in handle_message") | |
| try: | |
| await status_msg.edit_text(f"β Error: {e}") | |
| except: | |
| await update.message.reply_text(f"β Error: {e}") | |
| async def send_long(update: Update, text: str): | |
| """Send long text in chunks.""" | |
| if not text: | |
| text = "β Done." | |
| chunks = [text[i:i + 4096] for i in range(0, len(text), 4096)] | |
| for chunk in chunks[:15]: | |
| try: | |
| await update.message.reply_text(chunk, parse_mode="Markdown") | |
| except: | |
| try: | |
| await update.message.reply_text(chunk) | |
| except Exception as e: | |
| log.error(f"Send failed: {e}") | |
| # ββββ Main ββββ | |
| def run_flask(): | |
| port = int(os.environ.get("FLASK_PORT", "7860")) | |
| flask_app.run(host="0.0.0.0", port=port, use_reloader=False) | |
| async def main(): | |
| log.info("Starting Claude Code Telegram Bot") | |
| flask_thread = threading.Thread(target=run_flask, daemon=True) | |
| flask_thread.start() | |
| request = HTTPXRequest( | |
| connection_pool_size=8, | |
| connect_timeout=30.0, | |
| read_timeout=60.0, | |
| write_timeout=30.0, | |
| ) | |
| bot = Bot( | |
| token=TELEGRAM_BOT_TOKEN, | |
| base_url=TELEGRAM_BASE_URL, | |
| request=request, | |
| ) | |
| application = Application.builder().bot(bot).build() | |
| application.add_handler(CommandHandler("start", cmd_start)) | |
| application.add_handler(CommandHandler("clear", cmd_clear)) | |
| application.add_handler(CommandHandler("files", cmd_files)) | |
| application.add_handler(CommandHandler("bash", cmd_bash)) | |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
| await application.initialize() | |
| await application.start() | |
| await application.updater.start_polling(drop_pending_updates=True) | |
| log.info("β Bot started β polling Telegram") | |
| try: | |
| await bot.set_my_commands([ | |
| BotCommand("start", "New session"), | |
| BotCommand("clear", "Clear workspace"), | |
| BotCommand("files", "List files"), | |
| BotCommand("bash", "Run bash command"), | |
| ]) | |
| except: | |
| pass | |
| stop = asyncio.Event() | |
| await stop.wait() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |