import os import asyncio import re import html # ⚔ FIX: Added html module to sanitize output from config import USER_CWD, ACTIVE_PROCESSES # Regex Blacklist: Catches dangerous base commands regardless of spacing or flags DANGEROUS_REGEX = re.compile(r'\b(rm|mkfs|dd|shutdown|reboot|nano|vi|vim)\b') async def run_shell_command_live(client, message, cmd: str, status_msg, timeout: int = 30): user_id = message.from_user.id current_dir = USER_CWD.get(user_id, "/app") # 1. Custom 'cd' handler for persistent directories if cmd.startswith("cd ") or cmd == "cd": target_dir = cmd.split(" ", 1)[1].strip() if " " in cmd else "/app" new_dir = os.path.abspath(os.path.join(current_dir, target_dir)) if os.path.isdir(new_dir): USER_CWD[user_id] = new_dir await status_msg.edit_text(f"šŸ“ Directory changed to:\n{new_dir}", parse_mode="HTML") else: await status_msg.edit_text(f"āŒ Directory not found: {target_dir}", parse_mode="HTML") return # 2. Security Check if DANGEROUS_REGEX.search(cmd): await status_msg.edit_text("🚫 Security Alert: Command blocked by security policy.", parse_mode="HTML") return # 3. Execution Setup try: process = await asyncio.create_subprocess_shell( cmd, cwd=current_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) ACTIVE_PROCESSES[user_id] = process output_buffer = "" last_edit_time = 0 # 4. Live Streaming Reader (Updates Telegram message every 2 seconds) async def read_stream(stream): nonlocal output_buffer, last_edit_time while True: line = await stream.readline() if not line: break output_buffer += line.decode('utf-8', errors='replace') # Rate limit edits to prevent API flood current_time = asyncio.get_event_loop().time() if current_time - last_edit_time > 2.0: # ⚔ FIX: Escape HTML characters to prevent Telegram API crash display_text = html.escape(output_buffer[-3500:]) try: await status_msg.edit_text(f"$ {cmd}\n
{display_text}
", parse_mode="HTML") last_edit_time = current_time except Exception: pass # Ignore "Message is not modified" errors await asyncio.gather(read_stream(process.stdout), read_stream(process.stderr)) await asyncio.wait_for(process.wait(), timeout=timeout) except asyncio.TimeoutError: try: process.kill() except Exception: pass output_buffer += f"\n\nāš ļø Execution timed out after {timeout}s." except Exception as e: output_buffer += f"\n\nāŒ Error: {str(e)}" finally: ACTIVE_PROCESSES.pop(user_id, None) # 5. Final Output Handling if not output_buffer.strip(): output_buffer = "[Command executed silently]" # If the output is massive, send as a uniquely named .txt file if len(output_buffer) > 4000: filename = f"output_{message.id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(output_buffer) await status_msg.delete() await message.reply_document(filename, caption=f"Command: {cmd}", parse_mode="HTML") os.remove(filename) else: # ⚔ FIX: Escape HTML characters for the final output as well safe_final = html.escape(output_buffer) await status_msg.edit_text(f"$ {cmd}\n
{safe_final}
", parse_mode="HTML")