File size: 3,910 Bytes
f040e57 ebd396c b50691a f040e57 74f44de ebd396c f040e57 ebd396c f040e57 74f44de f040e57 ebd396c f040e57 b50691a f040e57 74f44de f040e57 ebd396c 74f44de ebd396c f040e57 74f44de ebd396c f040e57 74f44de f040e57 ebd396c f040e57 ebd396c f040e57 74f44de | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | import os
import asyncio
import re
import tempfile
from pyrogram import enums
from config import USER_CWD, ACTIVE_PROCESSES
# Expanded Security Regex
DANGEROUS_REGEX = re.compile(r'\b(rm -rf /|mkfs|dd|:\(\)\{|>\s*/dev/sda)\b')
async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 60):
user_id = message.from_user.id
current_dir = USER_CWD.get(user_id, "/app")
# Handle directory changes with Tilde expansion
if cmd.startswith("cd ") or cmd == "cd":
target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "~"
new_dir = os.path.abspath(os.path.expanduser(os.path.join(current_dir, target_dir)))
if os.path.isdir(new_dir):
USER_CWD[user_id] = new_dir
await status_msg.edit_text(f"📁 <b>Directory changed to:</b>\n<code>{new_dir}</code>", parse_mode=enums.ParseMode.HTML)
else:
await status_msg.edit_text(f"❌ <b>Directory not found:</b> <code>{target_dir}</code>", parse_mode=enums.ParseMode.HTML)
return
if DANGEROUS_REGEX.search(cmd):
await status_msg.edit_text("🚫 <b>Security Alert:</b> Command blocked.", parse_mode=enums.ParseMode.HTML)
return
try:
process = await asyncio.create_subprocess_shell(
cmd,
cwd=current_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
ACTIVE_PROCESSES[user_id] = process
output_buffer = ""
last_edit_time = 0
edit_lock = asyncio.Lock()
async def read_stream(stream):
nonlocal output_buffer, last_edit_time
while True:
line = await stream.readline()
if not line:
break
async with edit_lock:
output_buffer += line.decode('utf-8', errors='replace')
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time > 2.0:
display_text = output_buffer[-3500:]
try:
await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{display_text}</pre>", parse_mode=enums.ParseMode.HTML)
last_edit_time = current_time
except Exception:
pass
# Wrap streams AND wait in the timeout block
async def run_process():
await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr))
await process.wait()
await asyncio.wait_for(run_process(), timeout=timeout)
except asyncio.TimeoutError:
try:
process.kill()
except Exception:
pass
output_buffer += f"\n\n⚠️ Execution timed out after {timeout}s."
except Exception as e:
output_buffer += f"\n\n❌ Error: {str(e)}"
finally:
ACTIVE_PROCESSES.pop(user_id, None)
if not output_buffer.strip():
output_buffer = "[Command executed silently]"
# Use unique temporary files to prevent collisions
if len(output_buffer) > 4000:
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding="utf-8") as temp_file:
temp_file.write(output_buffer)
temp_filepath = temp_file.name
await status_msg.delete()
await message.reply_document(temp_filepath, caption=f"<b>Command:</b> <code>{cmd}</code>", parse_mode=enums.ParseMode.HTML)
os.remove(temp_filepath)
else:
try:
# Safely attempt the final edit, ignoring MessageNotModified errors
await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{output_buffer}</pre>", parse_mode=enums.ParseMode.HTML)
except Exception:
pass |