ClaudeCode / app.py
ayoub5550's picture
Upload app.py with huggingface_hub
df2e540 verified
Raw
History Blame Contribute Delete
11.3 kB
#!/usr/bin/env python3
"""
Telegram bot wrapping the REAL Claude Code CLI.
- Streams output to Telegram in real-time by editing messages
- Merges stdout+stderr to capture everything
- Robust timeout handling
"""
import os, asyncio, json, subprocess, logging, threading, time
from flask import Flask, jsonify
from telegram import Update, BotCommand, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters
from telegram.constants import ChatAction
from telegram.request import HTTPXRequest
import httpx
# ──── Config ────
TELEGRAM_BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
ALLOWED_USERS = [u.strip() for u in os.environ.get("TELEGRAM_ALLOWED_USERS", "").split(",") if u.strip()]
PROXY_URL = os.environ.get("PROXY_URL", "")
PROXY_SECRET = os.environ.get("PROXY_SECRET", "")
TELEGRAM_BASE_URL = f"{PROXY_URL}/bot" if PROXY_URL else "https://api.telegram.org/bot"
MODEL = os.environ.get("MODEL", "z-ai/glm-5.1")
START_TIME = time.time()
# Claude Code environment
CLAUDE_ENV = {
**os.environ,
"ANTHROPIC_BASE_URL": "http://localhost:4000",
"ANTHROPIC_API_KEY": "sk-ant-proxy-key-for-local-translation",
"ANTHROPIC_MODEL": "claude-sonnet-4-20250514",
"DISABLE_PROMPT_CACHING": "true",
"CLAUDE_CODE_SKIP_OOBE": "1",
"HOME": "/home/user",
"PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin"),
"TERM": "xterm-256color",
"LANG": "en_US.UTF-8",
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("claude-bot")
# ──── Flask health ────
flask_app = Flask(__name__)
@flask_app.route("/")
@flask_app.route("/health")
def health():
uptime = int(time.time() - START_TIME)
m, s = divmod(uptime, 60)
h, m = divmod(m, 60)
try:
r = httpx.get("http://localhost:4000/health", timeout=5)
proxy_ok = r.status_code == 200
except:
proxy_ok = False
try:
v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV)
claude_version = v.stdout.strip() or v.stderr.strip() or "?"
except:
claude_version = "?"
return jsonify({
"status": "ok",
"engine": "Claude Code CLI (real)",
"claude_version": claude_version,
"model_backend": MODEL,
"proxy": "ok" if proxy_ok else "down",
"uptime": f"{h}h{m}m{s}s",
})
# ──── Claude Code runner with streaming ────
async def run_claude_streaming(prompt: str, status_msg, chat) -> str:
"""Run claude -p and stream output to Telegram by editing the status message."""
cmd = [
"claude", "-p", prompt,
"--dangerously-skip-permissions",
"--output-format", "text",
]
log.info(f"Running: claude -p '{prompt[:80]}...'")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT, # Merge stderr β†’ stdout (avoids deadlocks)
cwd="/home/user/workspace",
env=CLAUDE_ENV,
)
accumulated = ""
last_edit = time.time()
last_typing = time.time()
try:
while True:
try:
chunk = await asyncio.wait_for(proc.stdout.read(1024), timeout=4)
except asyncio.TimeoutError:
# Keep typing indicator alive
now = time.time()
if now - last_typing > 3:
try:
await chat.send_action(ChatAction.TYPING)
except:
pass
last_typing = now
# Check if process died
if proc.returncode is not None:
break
continue
if not chunk:
break
text = chunk.decode("utf-8", errors="replace")
accumulated += text
# Stream to Telegram: edit message every 3 seconds
now = time.time()
if now - last_edit > 3 and accumulated.strip():
display = accumulated.strip()
# Take last 3900 chars to fit Telegram limit
if len(display) > 3900:
display = "...\n" + display[-3900:]
try:
await status_msg.edit_text(display[:4096])
except Exception as e:
log.debug(f"Edit failed: {e}")
last_edit = now
except asyncio.CancelledError:
proc.kill()
raise
except Exception as e:
log.error(f"Stream read error: {e}")
# Wait for process to finish
try:
await asyncio.wait_for(proc.wait(), timeout=10)
except asyncio.TimeoutError:
proc.kill()
exit_code = proc.returncode
log.info(f"Claude Code exited with code {exit_code}, output length: {len(accumulated)}")
result = accumulated.strip()
if result:
return result
else:
return f"❌ No output (exit code: {exit_code})"
# ──── Telegram Handlers ────
async def cmd_start(update: Update, context):
uid = str(update.effective_user.id)
if ALLOWED_USERS and uid not in ALLOWED_USERS:
await update.message.reply_text("β›”")
return
try:
v = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10, env=CLAUDE_ENV)
version = v.stdout.strip() or "?"
except:
version = "?"
await update.message.reply_text(
"πŸ€– *Claude Code* β€” Real CLI Agent\n\n"
f"πŸ“¦ Version: `{version}`\n"
f"🧠 Backend: `{MODEL}`\n"
"πŸ’» Workspace: `/home/user/workspace`\n"
"πŸ”§ Permissions: Full (bash, read, write, edit)\n\n"
"*Commands:*\n"
"/start β€” New session\n"
"/clear β€” Clear workspace & session\n"
"/files β€” List workspace files\n"
"/bash `cmd` β€” Quick bash command\n\n"
"Send any coding task! πŸš€",
parse_mode="Markdown",
)
async def cmd_clear(update: Update, context):
uid = str(update.effective_user.id)
if ALLOWED_USERS and uid not in ALLOWED_USERS:
return
subprocess.run(["bash", "-c", "rm -rf /home/user/workspace/* /home/user/.claude/projects/*"],
capture_output=True, cwd="/home/user")
await update.message.reply_text("🧹 Workspace and session cleared.")
async def cmd_files(update: Update, context):
uid = str(update.effective_user.id)
if ALLOWED_USERS and uid not in ALLOWED_USERS:
return
r = subprocess.run(
["bash", "-c", "find /home/user/workspace -type f 2>/dev/null | head -50 || echo '(empty)'"],
capture_output=True, text=True, timeout=10
)
files = r.stdout.strip() or "(empty)"
await update.message.reply_text(f"πŸ“ Workspace:\n```\n{files}\n```", parse_mode="Markdown")
async def cmd_bash(update: Update, context):
uid = str(update.effective_user.id)
if ALLOWED_USERS and uid not in ALLOWED_USERS:
return
cmd = update.message.text.replace("/bash", "", 1).strip()
if not cmd:
await update.message.reply_text("Usage: `/bash ls -la`", parse_mode="Markdown")
return
await update.message.chat.send_action(ChatAction.TYPING)
try:
r = subprocess.run(["bash", "-c", cmd], capture_output=True, text=True,
timeout=60, cwd="/home/user/workspace")
output = (r.stdout + r.stderr).strip() or "(no output)"
await send_long(update, f"```\n{output[:4000]}\n```")
except subprocess.TimeoutExpired:
await update.message.reply_text("⏰ Timed out (60s).")
async def handle_message(update: Update, context):
uid = str(update.effective_user.id)
if ALLOWED_USERS and uid not in ALLOWED_USERS:
await update.message.reply_text("β›”")
return
text = update.message.text
if not text:
return
# Send initial streaming message
status_msg = await update.message.reply_text("⏳ Claude Code is working...")
try:
# Run with streaming (edits the status_msg in real-time)
result = await asyncio.wait_for(
run_claude_streaming(text, status_msg, update.message.chat),
timeout=330, # 5.5 min total timeout
)
# Send final result
if len(result) <= 4096:
try:
await status_msg.edit_text(result, parse_mode="Markdown")
except:
try:
await status_msg.edit_text(result)
except:
await send_long(update, result)
else:
# Delete status msg and send in chunks
try:
await status_msg.delete()
except:
pass
await send_long(update, result)
except asyncio.TimeoutError:
await status_msg.edit_text("⏰ Timed out after 5 minutes.")
except Exception as e:
log.exception("Error in handle_message")
try:
await status_msg.edit_text(f"❌ Error: {e}")
except:
await update.message.reply_text(f"❌ Error: {e}")
async def send_long(update: Update, text: str):
"""Send long text in chunks."""
if not text:
text = "βœ… Done."
chunks = [text[i:i + 4096] for i in range(0, len(text), 4096)]
for chunk in chunks[:15]:
try:
await update.message.reply_text(chunk, parse_mode="Markdown")
except:
try:
await update.message.reply_text(chunk)
except Exception as e:
log.error(f"Send failed: {e}")
# ──── Main ────
def run_flask():
port = int(os.environ.get("FLASK_PORT", "7860"))
flask_app.run(host="0.0.0.0", port=port, use_reloader=False)
async def main():
log.info("Starting Claude Code Telegram Bot")
flask_thread = threading.Thread(target=run_flask, daemon=True)
flask_thread.start()
request = HTTPXRequest(
connection_pool_size=8,
connect_timeout=30.0,
read_timeout=60.0,
write_timeout=30.0,
)
bot = Bot(
token=TELEGRAM_BOT_TOKEN,
base_url=TELEGRAM_BASE_URL,
request=request,
)
application = Application.builder().bot(bot).build()
application.add_handler(CommandHandler("start", cmd_start))
application.add_handler(CommandHandler("clear", cmd_clear))
application.add_handler(CommandHandler("files", cmd_files))
application.add_handler(CommandHandler("bash", cmd_bash))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
await application.initialize()
await application.start()
await application.updater.start_polling(drop_pending_updates=True)
log.info("βœ… Bot started β€” polling Telegram")
try:
await bot.set_my_commands([
BotCommand("start", "New session"),
BotCommand("clear", "Clear workspace"),
BotCommand("files", "List files"),
BotCommand("bash", "Run bash command"),
])
except:
pass
stop = asyncio.Event()
await stop.wait()
if __name__ == "__main__":
asyncio.run(main())