File size: 3,022 Bytes
deab47c
 
 
 
 
 
ba0986b
 
 
 
 
deab47c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba0986b
 
 
 
 
 
 
deab47c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba0986b
deab47c
 
 
 
 
 
 
 
 
 
 
 
 
 
ba0986b
 
 
 
 
 
deab47c
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import paramiko
import time
import re
from typing import Optional, Tuple

class SSHInteractiveSession:

    end_comment = "# @@==>> SSHInteractiveSession End-of-Command  <<==@@"

    ps1_label = "SSHInteractiveSession CLI>"
    
    def __init__(self, hostname: str, port: int, username: str, password: str):
        self.hostname = hostname
        self.port = port
        self.username = username
        self.password = password
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.shell = None
        self.full_output = ''

    def connect(self):
        # try 3 times with wait and then except
        errors = 0
        while True:
            try:
                self.client.connect(self.hostname, self.port, self.username, self.password)
                self.shell = self.client.invoke_shell(width=160,height=48)
                # self.shell.send(f'PS1="{SSHInteractiveSession.ps1_label}"'.encode())
                return
                # while True: # wait for end of initial output
                #     full, part = self.read_output()
                #     if full and not part: return
                #     time.sleep(0.1)
            except Exception as e:
                errors += 1
                if errors < 3:
                    print(f"SSH Connection attempt {errors}...")
                    time.sleep(5)
                else:
                    raise e

    def close(self):
        if self.shell:
            self.shell.close()
        if self.client:
            self.client.close()

    def send_command(self, command: str):
        if not self.shell:
            raise Exception("Shell not connected")
        self.full_output = ""
        self.shell.send((command + " \\\n" +SSHInteractiveSession.end_comment + "\n").encode())

    def read_output(self) -> Tuple[str, str]:
        if not self.shell:
            raise Exception("Shell not connected")

        partial_output = ''
        while self.shell.recv_ready():
            data = self.shell.recv(1024).decode('utf-8')
            data = self.clean_string(data)
            partial_output += data
            self.full_output += data
            time.sleep(0.1)  # Prevent busy waiting

        self.full_output = self.clean_string(self.full_output)

        # split output at end_comment
        if SSHInteractiveSession.end_comment in self.full_output:
            self.full_output = self.full_output.split(SSHInteractiveSession.end_comment)[-1].lstrip("\r\n")
            partial_output = partial_output.split(SSHInteractiveSession.end_comment)[-1].lstrip("\r\n")
        
        return self.full_output, partial_output

    def clean_string(self, input_string):
        # Remove ANSI escape codes
        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
        cleaned = ansi_escape.sub('', input_string)
        
        # Replace '\r\n' with '\n'
        cleaned = cleaned.replace('\r\n', '\n')
        
        return cleaned