Spaces:
Paused
Paused
File size: 11,152 Bytes
7d4338a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | import asyncio, os, sys, platform, errno
_IS_WIN = platform.system() == "Windows"
if _IS_WIN:
import winpty # pip install pywinpty # type: ignore
import msvcrt
# Make stdin / stdout tolerant to broken UTF-8 so input() never aborts
sys.stdin.reconfigure(errors="replace") # type: ignore
sys.stdout.reconfigure(errors="replace") # type: ignore
# ββββββββββββββββββββββββββββ PUBLIC CLASS ββββββββββββββββββββββββββββ
class TTYSession:
def __init__(self, cmd, *, cwd=None, env=None, encoding="utf-8", echo=False):
self.cmd = cmd if isinstance(cmd, str) else " ".join(cmd)
self.cwd = cwd
self.env = env or os.environ.copy()
self.encoding = encoding
self.echo = echo # β store preference
self._proc = None
self._buf: asyncio.Queue = None # type: ignore
def __del__(self):
# Simple cleanup on object destruction
import nest_asyncio
nest_asyncio.apply()
if hasattr(self, "close"):
try:
asyncio.run(self.close())
except Exception:
pass
# ββ user-facing coroutines ββββββββββββββββββββββββββββββββββββββββ
async def start(self):
self._buf = asyncio.Queue()
if _IS_WIN:
self._proc = await _spawn_winpty(
self.cmd, self.cwd, self.env, self.echo
) # β pass echo
else:
self._proc = await _spawn_posix_pty(
self.cmd, self.cwd, self.env, self.echo
) # β pass echo
self._pump_task = asyncio.create_task(self._pump_stdout())
async def close(self):
# Cancel the pump task if it exists
if hasattr(self, "_pump_task") and self._pump_task:
self._pump_task.cancel()
try:
await self._pump_task
except asyncio.CancelledError:
pass
# Terminate the process if it exists
if self._proc:
self._proc.terminate()
await self._proc.wait()
self._proc = None
self._pump_task = None
async def send(self, data: str | bytes):
if self._proc is None:
raise RuntimeError("TTYSpawn is not started")
if isinstance(data, str):
data = data.encode(self.encoding)
self._proc.stdin.write(data) # type: ignore
await self._proc.stdin.drain() # type: ignore
async def sendline(self, line: str):
await self.send(line + "\n")
async def wait(self):
if self._proc is None:
raise RuntimeError("TTYSpawn is not started")
return await self._proc.wait()
def kill(self):
"""Force-kill the running child process.
This is best-effort: if the process has already terminated (which can
happen if *close()* was called elsewhere or the child exited by
itself) we silently ignore the *ProcessLookupError* raised by
*asyncio.subprocess.Process.kill()*. This prevents race conditions
where multiple coroutines attempt to close the same session.
"""
if self._proc is None:
# Already closed or never started β nothing to do
return
# Only attempt to kill if the process is still running
if getattr(self._proc, "returncode", None) is None:
try:
self._proc.kill()
except ProcessLookupError:
# Child already gone β treat as successfully killed
pass
async def read(self, timeout=None):
# Return any decoded text the child produced, or None on timeout
try:
return await asyncio.wait_for(self._buf.get(), timeout)
except asyncio.TimeoutError:
return None
# backward-compat alias:
readline = read
async def read_full_until_idle(self, idle_timeout, total_timeout):
# Collect child output using iter_until_idle to avoid duplicate logic
return "".join(
[
chunk
async for chunk in self.read_chunks_until_idle(
idle_timeout, total_timeout
)
]
)
async def read_chunks_until_idle(self, idle_timeout, total_timeout):
# Yield each chunk as soon as it arrives until idle or total timeout
import time
start = time.monotonic()
while True:
if time.monotonic() - start > total_timeout:
break
chunk = await self.read(timeout=idle_timeout)
if chunk is None:
break
yield chunk
# ββ internal: stream raw output into the queue ββββββββββββββββββββ
async def _pump_stdout(self):
if self._proc is None:
raise RuntimeError("TTYSpawn is not started")
reader = self._proc.stdout
while True:
chunk = await reader.read(4096) # grab whatever is ready # type: ignore
if not chunk:
break
self._buf.put_nowait(chunk.decode(self.encoding, "replace"))
# ββββββββββββββββββββββββββββ POSIX IMPLEMENTATION ββββββββββββββββββββ
async def _spawn_posix_pty(cmd, cwd, env, echo):
import pty, asyncio, os, termios
master, slave = pty.openpty()
# ββ Disable ECHO on the slave side if requested ββ
if not echo:
attrs = termios.tcgetattr(slave)
attrs[3] &= ~termios.ECHO # lflag
termios.tcsetattr(slave, termios.TCSANOW, attrs)
proc = await asyncio.create_subprocess_shell(
cmd,
stdin=slave,
stdout=slave,
stderr=slave,
cwd=cwd,
env=env,
close_fds=True,
)
os.close(slave)
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
def _on_data():
try:
data = os.read(master, 1 << 16)
except OSError as e:
if e.errno != errno.EIO: # EIO == EOF on some systems
raise
data = b""
if data:
reader.feed_data(data)
else:
reader.feed_eof()
loop.remove_reader(master)
loop.add_reader(master, _on_data)
class _Stdin:
def write(self, d):
os.write(master, d)
async def drain(self):
await asyncio.sleep(0)
proc.stdin = _Stdin() # type: ignore
proc.stdout = reader
return proc
# ββββββββββββββββββββββββββββ WINDOWS IMPLEMENTATION ββββββββββββββββββ
async def _spawn_winpty(cmd, cwd, env, echo):
# Clean PowerShell startup: no logo, no profile, bypass execution policy for deterministic behavior
if cmd.strip().lower().startswith("powershell"):
if "-nolog" not in cmd.lower():
cmd = cmd.replace("powershell.exe", "powershell.exe -NoLogo -NoProfile -ExecutionPolicy Bypass", 1)
cols, rows = 80, 25
child = winpty.PtyProcess.spawn(cmd, dimensions=(rows, cols), cwd=cwd or os.getcwd(), env=env) # type: ignore
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
async def _on_data():
while child.isalive():
try:
# Run blocking read in executor to not block event loop
data = await loop.run_in_executor(None, child.read, 1 << 16)
if data:
reader.feed_data(data.encode('utf-8') if isinstance(data, str) else data)
except EOFError:
break
except Exception:
await asyncio.sleep(0.01)
reader.feed_eof()
# Start pumping output in background
asyncio.create_task(_on_data())
class _Stdin:
def write(self, d):
# Use winpty's write method, not os.write
if isinstance(d, bytes):
d = d.decode('utf-8', errors='replace')
# Windows needs \r\n for proper line endings
if _IS_WIN:
d = d.replace('\n', '\r\n')
child.write(d)
async def drain(self):
await asyncio.sleep(0.01) # Give write time to complete
class _Proc:
def __init__(self):
self.stdin = _Stdin() # type: ignore
self.stdout = reader
self.pid = child.pid
self.returncode = None
async def wait(self):
while child.isalive():
await asyncio.sleep(0.2)
self.returncode = 0
return 0
def terminate(self):
if child.isalive():
child.terminate()
def kill(self):
if child.isalive():
child.kill()
return _Proc()
# βββββββββββββββββββββββββ INTERACTIVE DRIVER βββββββββββββββββββββββββ
if __name__ == "__main__":
async def interactive_shell():
shell_cmd, prompt_hint = ("powershell.exe", ">") if _IS_WIN else ("/bin/bash", "$")
# echo=False β suppress the shellβs own echo of commands
term = TTYSession(shell_cmd)
await term.start()
timeout = 1.0
print(f"Connected to {shell_cmd}.")
print("Type commands for the shell.")
print("β’ /t=<seconds> β change idle timeout")
print("β’ /exit β quit helper\n")
await term.sendline(" ")
print(await term.read_full_until_idle(timeout, timeout), end="", flush=True)
while True:
try:
user = input(f"(timeout={timeout}) {prompt_hint} ")
except (EOFError, KeyboardInterrupt):
print("\nLeavingβ¦")
break
if user.lower() == "/exit":
break
if user.startswith("/t="):
try:
timeout = float(user.split("=", 1)[1])
print(f"[helper] idle timeout set to {timeout}s")
except ValueError:
print("[helper] invalid number")
continue
idle_timeout = timeout
total_timeout = 10 * idle_timeout
if user == "":
# Just read output, do not send empty line
async for chunk in term.read_chunks_until_idle(
idle_timeout, total_timeout
):
print(chunk, end="", flush=True)
else:
await term.sendline(user)
async for chunk in term.read_chunks_until_idle(
idle_timeout, total_timeout
):
print(chunk, end="", flush=True)
await term.sendline("exit")
await term.wait()
asyncio.run(interactive_shell())
|