| import os |
| import asyncio |
| import re |
| import tempfile |
| from pyrogram import enums |
| from config import USER_CWD, ACTIVE_PROCESSES |
|
|
| |
| DANGEROUS_REGEX = re.compile(r'\b(rm -rf /|mkfs|dd|:\(\)\{|>\s*/dev/sda)\b') |
|
|
| async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 60): |
| user_id = message.from_user.id |
| current_dir = USER_CWD.get(user_id, "/app") |
|
|
| |
| if cmd.startswith("cd ") or cmd == "cd": |
| target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "~" |
| new_dir = os.path.abspath(os.path.expanduser(os.path.join(current_dir, target_dir))) |
| |
| if os.path.isdir(new_dir): |
| USER_CWD[user_id] = new_dir |
| await status_msg.edit_text(f"๐ <b>Directory changed to:</b>\n<code>{new_dir}</code>", parse_mode=enums.ParseMode.HTML) |
| else: |
| await status_msg.edit_text(f"โ <b>Directory not found:</b> <code>{target_dir}</code>", parse_mode=enums.ParseMode.HTML) |
| return |
|
|
| if DANGEROUS_REGEX.search(cmd): |
| await status_msg.edit_text("๐ซ <b>Security Alert:</b> Command blocked.", parse_mode=enums.ParseMode.HTML) |
| return |
|
|
| try: |
| process = await asyncio.create_subprocess_shell( |
| cmd, |
| cwd=current_dir, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| ACTIVE_PROCESSES[user_id] = process |
|
|
| output_buffer = "" |
| last_edit_time = 0 |
| edit_lock = asyncio.Lock() |
| |
| async def read_stream(stream): |
| nonlocal output_buffer, last_edit_time |
| while True: |
| line = await stream.readline() |
| if not line: |
| break |
| |
| async with edit_lock: |
| output_buffer += line.decode('utf-8', errors='replace') |
| current_time = asyncio.get_running_loop().time() |
| |
| if current_time - last_edit_time > 2.0: |
| display_text = output_buffer[-3500:] |
| try: |
| await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{display_text}</pre>", parse_mode=enums.ParseMode.HTML) |
| last_edit_time = current_time |
| except Exception: |
| pass |
|
|
| |
| async def run_process(): |
| await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr)) |
| await process.wait() |
|
|
| await asyncio.wait_for(run_process(), timeout=timeout) |
|
|
| except asyncio.TimeoutError: |
| try: |
| process.kill() |
| except Exception: |
| pass |
| output_buffer += f"\n\nโ ๏ธ Execution timed out after {timeout}s." |
| except Exception as e: |
| output_buffer += f"\n\nโ Error: {str(e)}" |
| finally: |
| ACTIVE_PROCESSES.pop(user_id, None) |
|
|
| if not output_buffer.strip(): |
| output_buffer = "[Command executed silently]" |
|
|
| |
| if len(output_buffer) > 4000: |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding="utf-8") as temp_file: |
| temp_file.write(output_buffer) |
| temp_filepath = temp_file.name |
| |
| await status_msg.delete() |
| await message.reply_document(temp_filepath, caption=f"<b>Command:</b> <code>{cmd}</code>", parse_mode=enums.ParseMode.HTML) |
| os.remove(temp_filepath) |
| else: |
| try: |
| |
| await status_msg.edit_text(f"<b>$</b> <code>{cmd}</code>\n<pre>{output_buffer}</pre>", parse_mode=enums.ParseMode.HTML) |
| except Exception: |
| pass |