import os
import asyncio
import re
import tempfile
from pyrogram import enums
from config import USER_CWD, ACTIVE_PROCESSES
# Expanded Security Regex
DANGEROUS_REGEX = re.compile(r'\b(rm -rf /|mkfs|dd|:\(\)\{|>\s*/dev/sda)\b')
async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 60):
user_id = message.from_user.id
current_dir = USER_CWD.get(user_id, "/app")
# Handle directory changes with Tilde expansion
if cmd.startswith("cd ") or cmd == "cd":
target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "~"
new_dir = os.path.abspath(os.path.expanduser(os.path.join(current_dir, target_dir)))
if os.path.isdir(new_dir):
USER_CWD[user_id] = new_dir
await status_msg.edit_text(f"š Directory changed to:\n{new_dir}", parse_mode=enums.ParseMode.HTML)
else:
await status_msg.edit_text(f"ā Directory not found: {target_dir}", parse_mode=enums.ParseMode.HTML)
return
if DANGEROUS_REGEX.search(cmd):
await status_msg.edit_text("š« Security Alert: Command blocked.", parse_mode=enums.ParseMode.HTML)
return
try:
process = await asyncio.create_subprocess_shell(
cmd,
cwd=current_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
ACTIVE_PROCESSES[user_id] = process
output_buffer = ""
last_edit_time = 0
edit_lock = asyncio.Lock()
async def read_stream(stream):
nonlocal output_buffer, last_edit_time
while True:
line = await stream.readline()
if not line:
break
async with edit_lock:
output_buffer += line.decode('utf-8', errors='replace')
current_time = asyncio.get_running_loop().time()
if current_time - last_edit_time > 2.0:
display_text = output_buffer[-3500:]
try:
await status_msg.edit_text(f"$ {cmd}\n
{display_text}", parse_mode=enums.ParseMode.HTML)
last_edit_time = current_time
except Exception:
pass
# Wrap streams AND wait in the timeout block
async def run_process():
await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr))
await process.wait()
await asyncio.wait_for(run_process(), timeout=timeout)
except asyncio.TimeoutError:
try:
process.kill()
except Exception:
pass
output_buffer += f"\n\nā ļø Execution timed out after {timeout}s."
except Exception as e:
output_buffer += f"\n\nā Error: {str(e)}"
finally:
ACTIVE_PROCESSES.pop(user_id, None)
if not output_buffer.strip():
output_buffer = "[Command executed silently]"
# Use unique temporary files to prevent collisions
if len(output_buffer) > 4000:
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding="utf-8") as temp_file:
temp_file.write(output_buffer)
temp_filepath = temp_file.name
await status_msg.delete()
await message.reply_document(temp_filepath, caption=f"Command: {cmd}", parse_mode=enums.ParseMode.HTML)
os.remove(temp_filepath)
else:
try:
# Safely attempt the final edit, ignoring MessageNotModified errors
await status_msg.edit_text(f"$ {cmd}\n{output_buffer}", parse_mode=enums.ParseMode.HTML)
except Exception:
pass