Spaces:
Paused
Paused
| import paramiko | |
| import time | |
| import re | |
| from typing import Optional, Tuple | |
| class SSHInteractiveSession: | |
| end_comment = "# @@==>> SSHInteractiveSession End-of-Command <<==@@" | |
| ps1_label = "SSHInteractiveSession CLI>" | |
| def __init__(self, hostname: str, port: int, username: str, password: str): | |
| self.hostname = hostname | |
| self.port = port | |
| self.username = username | |
| self.password = password | |
| self.client = paramiko.SSHClient() | |
| self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
| self.shell = None | |
| self.full_output = '' | |
| def connect(self): | |
| # try 3 times with wait and then except | |
| errors = 0 | |
| while True: | |
| try: | |
| self.client.connect(self.hostname, self.port, self.username, self.password) | |
| self.shell = self.client.invoke_shell(width=160,height=48) | |
| # self.shell.send(f'PS1="{SSHInteractiveSession.ps1_label}"'.encode()) | |
| return | |
| # while True: # wait for end of initial output | |
| # full, part = self.read_output() | |
| # if full and not part: return | |
| # time.sleep(0.1) | |
| except Exception as e: | |
| errors += 1 | |
| if errors < 3: | |
| print(f"SSH Connection attempt {errors}...") | |
| time.sleep(5) | |
| else: | |
| raise e | |
| def close(self): | |
| if self.shell: | |
| self.shell.close() | |
| if self.client: | |
| self.client.close() | |
| def send_command(self, command: str): | |
| if not self.shell: | |
| raise Exception("Shell not connected") | |
| self.full_output = "" | |
| self.shell.send((command + " \\\n" +SSHInteractiveSession.end_comment + "\n").encode()) | |
| def read_output(self) -> Tuple[str, str]: | |
| if not self.shell: | |
| raise Exception("Shell not connected") | |
| partial_output = '' | |
| while self.shell.recv_ready(): | |
| data = self.shell.recv(1024).decode('utf-8') | |
| data = self.clean_string(data) | |
| partial_output += data | |
| self.full_output += data | |
| time.sleep(0.1) # Prevent busy waiting | |
| self.full_output = self.clean_string(self.full_output) | |
| # split output at end_comment | |
| if SSHInteractiveSession.end_comment in self.full_output: | |
| self.full_output = self.full_output.split(SSHInteractiveSession.end_comment)[-1].lstrip("\r\n") | |
| partial_output = partial_output.split(SSHInteractiveSession.end_comment)[-1].lstrip("\r\n") | |
| return self.full_output, partial_output | |
| def clean_string(self, input_string): | |
| # Remove ANSI escape codes | |
| ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | |
| cleaned = ansi_escape.sub('', input_string) | |
| # Replace '\r\n' with '\n' | |
| cleaned = cleaned.replace('\r\n', '\n') | |
| return cleaned |