File size: 4,003 Bytes
1612c96 1c2ac70 1612c96 1c2ac70 1612c96 1c2ac70 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | import os
import asyncio
import re
import html # ⚡ FIX: Added html module to sanitize output
from config import USER_CWD, ACTIVE_PROCESSES
# Regex Blacklist: Catches dangerous base commands regardless of spacing or flags
DANGEROUS_REGEX = re.compile(r'\b(rm|mkfs|dd|shutdown|reboot|nano|vi|vim)\b')
async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 30):
user_id = message.from_user.id
current_dir = USER_CWD.get(user_id, "/app")
# 1. Custom 'cd' handler for persistent directories
if cmd.startswith("cd ") or cmd == "cd":
target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "/app"
new_dir = os.path.abspath(os.path.join(current_dir, target_dir))
if os.path.isdir(new_dir):
USER_CWD[user_id] = new_dir
await status_msg.edit_text(f"📁 <b>Directory changed to:</b>\n<code>{new_dir}</code>", parse_mode="HTML")
else:
await status_msg.edit_text(f"❌ <b>Directory not found:</b> <code>{target_dir}</code>", parse_mode="HTML")
return
# 2. Security Check
if DANGEROUS_REGEX.search(cmd):
await status_msg.edit_text("🚫 <b>Security Alert:</b> Command blocked by security policy.", parse_mode="HTML")
return
# 3. Execution Setup
try:
process = await asyncio.create_subprocess_shell(
cmd,
cwd=current_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
ACTIVE_PROCESSES[user_id] = process
output_buffer = ""
last_edit_time = 0
# 4. Live Streaming Reader (Updates Telegram message every 2 seconds)
async def read_stream(stream):
nonlocal output_buffer, last_edit_time
while True:
line = await stream.readline()
if not line:
break
output_buffer += line.decode('utf-8', errors='replace')
# Rate limit edits to prevent API flood
current_time = asyncio.get_event_loop().time()
if current_time - last_edit_time > 2.0:
# ⚡ FIX: Escape HTML characters to prevent Telegram API crash
display_text = html.escape(output_buffer[-3500:])
try:
await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{display_text}</pre>", parse_mode="HTML")
last_edit_time = current_time
except Exception:
pass # Ignore "Message is not modified" errors
await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr))
await asyncio.wait_for(process.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
process.kill()
except Exception:
pass
output_buffer += f"\n\n⚠️ Execution timed out after {timeout}s."
except Exception as e:
output_buffer += f"\n\n❌ Error: {str(e)}"
finally:
ACTIVE_PROCESSES.pop(user_id, None)
# 5. Final Output Handling
if not output_buffer.strip():
output_buffer = "[Command executed silently]"
# If the output is massive, send as a uniquely named .txt file
if len(output_buffer) > 4000:
filename = f"output_{message.id}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(output_buffer)
await status_msg.delete()
await message.reply_document(filename, caption=f"<b>Command:</b> <code>{cmd}</code>", parse_mode="HTML")
os.remove(filename)
else:
# ⚡ FIX: Escape HTML characters for the final output as well
safe_final = html.escape(output_buffer)
await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{safe_final}</pre>", parse_mode="HTML") |