import os
import asyncio
import re
import html # ā” FIX: Added html module to sanitize output
from config import USER_CWD, ACTIVE_PROCESSES
# Regex Blacklist: Catches dangerous base commands regardless of spacing or flags
DANGEROUS_REGEX = re.compile(r'\b(rm|mkfs|dd|shutdown|reboot|nano|vi|vim)\b')
async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 30):
user_id = message.from_user.id
current_dir = USER_CWD.get(user_id, "/app")
# 1. Custom 'cd' handler for persistent directories
if cmd.startswith("cd ") or cmd == "cd":
target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "/app"
new_dir = os.path.abspath(os.path.join(current_dir, target_dir))
if os.path.isdir(new_dir):
USER_CWD[user_id] = new_dir
await status_msg.edit_text(f"š Directory changed to:\n{new_dir}", parse_mode="HTML")
else:
await status_msg.edit_text(f"ā Directory not found: {target_dir}", parse_mode="HTML")
return
# 2. Security Check
if DANGEROUS_REGEX.search(cmd):
await status_msg.edit_text("š« Security Alert: Command blocked by security policy.", parse_mode="HTML")
return
# 3. Execution Setup
try:
process = await asyncio.create_subprocess_shell(
cmd,
cwd=current_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
ACTIVE_PROCESSES[user_id] = process
output_buffer = ""
last_edit_time = 0
# 4. Live Streaming Reader (Updates Telegram message every 2 seconds)
async def read_stream(stream):
nonlocal output_buffer, last_edit_time
while True:
line = await stream.readline()
if not line:
break
output_buffer += line.decode('utf-8', errors='replace')
# Rate limit edits to prevent API flood
current_time = asyncio.get_event_loop().time()
if current_time - last_edit_time > 2.0:
# ā” FIX: Escape HTML characters to prevent Telegram API crash
display_text = html.escape(output_buffer[-3500:])
try:
await status_msg.edit_text(f"$ {cmd}\n
{display_text}", parse_mode="HTML")
last_edit_time = current_time
except Exception:
pass # Ignore "Message is not modified" errors
await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr))
await asyncio.wait_for(process.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
process.kill()
except Exception:
pass
output_buffer += f"\n\nā ļø Execution timed out after {timeout}s."
except Exception as e:
output_buffer += f"\n\nā Error: {str(e)}"
finally:
ACTIVE_PROCESSES.pop(user_id, None)
# 5. Final Output Handling
if not output_buffer.strip():
output_buffer = "[Command executed silently]"
# If the output is massive, send as a uniquely named .txt file
if len(output_buffer) > 4000:
filename = f"output_{message.id}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(output_buffer)
await status_msg.delete()
await message.reply_document(filename, caption=f"Command: {cmd}", parse_mode="HTML")
os.remove(filename)
else:
# ā” FIX: Escape HTML characters for the final output as well
safe_final = html.escape(output_buffer)
await status_msg.edit_text(f"$ {cmd}\n{safe_final}", parse_mode="HTML")