GravityFalls / python /helpers /shell_local.py
frdel
Docker + SSH, AgentConfig class
deab47c
raw
history blame
2.07 kB
import select
import subprocess
import time
import sys
from typing import Optional, Tuple
class LocalInteractiveSession:
def __init__(self):
self.process = None
self.full_output = ''
def connect(self):
# Start a new subprocess with the appropriate shell for the OS
if sys.platform.startswith('win'):
# Windows
self.process = subprocess.Popen(
['cmd.exe'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
else:
# macOS and Linux
self.process = subprocess.Popen(
['/bin/bash'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
def close(self):
if self.process:
self.process.terminate()
self.process.wait()
def send_command(self, command: str):
if not self.process:
raise Exception("Shell not connected")
self.full_output = ""
self.process.stdin.write(command + '\n') # type: ignore
self.process.stdin.flush() # type: ignore
def read_output(self) -> Tuple[str, Optional[str]]:
if not self.process:
raise Exception("Shell not connected")
partial_output = ''
while True:
rlist, _, _ = select.select([self.process.stdout], [], [], 0.1)
if rlist:
line = self.process.stdout.readline() # type: ignore
if line:
partial_output += line
self.full_output += line
time.sleep(0.1)
else:
break # No more output
else:
break # No data available
if not partial_output:
return self.full_output, None
return self.full_output, partial_output