import os import asyncio import re import tempfile from pyrogram import enums from config import USER_CWD, ACTIVE_PROCESSES # Expanded Security Regex DANGEROUS_REGEX = re.compile(r'\b(rm -rf /|mkfs|dd|:\(\)\{|>\s*/dev/sda)\b') async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 60): user_id = message.from_user.id current_dir = USER_CWD.get(user_id, "/app") # Handle directory changes with Tilde expansion if cmd.startswith("cd ") or cmd == "cd": target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "~" new_dir = os.path.abspath(os.path.expanduser(os.path.join(current_dir, target_dir))) if os.path.isdir(new_dir): USER_CWD[user_id] = new_dir await status_msg.edit_text(f"šŸ“ Directory changed to:\n{new_dir}", parse_mode=enums.ParseMode.HTML) else: await status_msg.edit_text(f"āŒ Directory not found: {target_dir}", parse_mode=enums.ParseMode.HTML) return if DANGEROUS_REGEX.search(cmd): await status_msg.edit_text("🚫 Security Alert: Command blocked.", parse_mode=enums.ParseMode.HTML) return try: process = await asyncio.create_subprocess_shell( cmd, cwd=current_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) ACTIVE_PROCESSES[user_id] = process output_buffer = "" last_edit_time = 0 edit_lock = asyncio.Lock() async def read_stream(stream): nonlocal output_buffer, last_edit_time while True: line = await stream.readline() if not line: break async with edit_lock: output_buffer += line.decode('utf-8', errors='replace') current_time = asyncio.get_running_loop().time() if current_time - last_edit_time > 2.0: display_text = output_buffer[-3500:] try: await status_msg.edit_text(f"$ {cmd}\n
{display_text}
", parse_mode=enums.ParseMode.HTML) last_edit_time = current_time except Exception: pass # Wrap streams AND wait in the timeout block async def run_process(): await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr)) await process.wait() await asyncio.wait_for(run_process(), timeout=timeout) except asyncio.TimeoutError: try: process.kill() except Exception: pass output_buffer += f"\n\nāš ļø Execution timed out after {timeout}s." except Exception as e: output_buffer += f"\n\nāŒ Error: {str(e)}" finally: ACTIVE_PROCESSES.pop(user_id, None) if not output_buffer.strip(): output_buffer = "[Command executed silently]" # Use unique temporary files to prevent collisions if len(output_buffer) > 4000: with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding="utf-8") as temp_file: temp_file.write(output_buffer) temp_filepath = temp_file.name await status_msg.delete() await message.reply_document(temp_filepath, caption=f"Command: {cmd}", parse_mode=enums.ParseMode.HTML) os.remove(temp_filepath) else: try: # Safely attempt the final edit, ignoring MessageNotModified errors await status_msg.edit_text(f"$ {cmd}\n
{output_buffer}
", parse_mode=enums.ParseMode.HTML) except Exception: pass