File size: 5,555 Bytes
5d09ff5
 
 
 
 
88b8c75
5d09ff5
 
 
 
88b8c75
5d09ff5
 
 
88b8c75
 
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef94047
 
 
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
ef94047
295e086
5d09ff5
 
 
 
 
 
 
 
88b8c75
5d09ff5
 
88b8c75
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b8c75
5d09ff5
 
 
 
 
 
 
 
 
 
88b8c75
 
 
5d09ff5
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import os
import shutil
import subprocess
import json
import time
from datetime import datetime
from PIL import Image


class API_Connection:
    def __init__(self, GD_connection, kaggle_username: str = "", kaggle_key: str = ""):
        os.environ["KAGGLE_USERNAME"] = kaggle_username
        os.environ["KAGGLE_KEY"] = kaggle_key

        self.GoogleDrive_connection = GD_connection

        self.PROJECT_PATH = r""
        self.NOTEBOOK_ID = "amirmoris/pix2pix"
        self.DATASET_NAME = "dataset"

    def execute_terminal_command(self, command: str):
        try:
            process = subprocess.Popen(
                command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            output, error = process.communicate()
            output = str(output.decode("utf-8"))
            error = str(error.decode("utf-8"))
            print(rf"Command executed successfully: {command}")
            return output
        except Exception as e:
            return None, str(e)

    def correct_path(self, path: str):
        return path[1:] if path.startswith("\\") else path

    def create_folder(self, path: str):
        path = self.correct_path(path)
        if os.path.exists(path):
            shutil.rmtree(path)
        self.execute_terminal_command(rf"mkdir {path}")

    def pull_kaggle_notebook(self, notebook_path: str):
        command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m"
        return self.execute_terminal_command(command)

    def push_kaggle_notebook(self, notebook_path: str):
        command = rf"kaggle kernels push -p {notebook_path}"
        return self.execute_terminal_command(command)

    def get_notebook_status(self):
        command = rf"kaggle kernels status {self.NOTEBOOK_ID}"
        return self.execute_terminal_command(command)

    def run(self, notebook_path: str):
        notebook_path = self.correct_path(notebook_path)
        self.pull_kaggle_notebook(notebook_path)
        return self.push_kaggle_notebook(notebook_path)

    def write_file(self, data: list, file_path: str, file_name: str = ""):
        if len(file_name) > 0:
            file_path = rf"{file_path}\{file_name}"

        file_path = self.correct_path(file_path)
        # Writing JSON data
        with open(file_path, "w") as file:
            for idx in range(len(data)):
                json_string = json.dumps(data[idx]) + (
                    "\n" if idx < len(data) - 1 else ""
                )
                file.write(json_string)

    def read_image(self, image_path: str):
        try:
            image = Image.open(image_path)
            return image
        except IOError:
            print("Unable to load image")
            return None

    def get_notebook_output(self, output_path: str):
        output_path = self.correct_path(output_path)
        command = rf"kaggle kernels output {self.NOTEBOOK_ID} -p {output_path}"
        return self.execute_terminal_command(command)

    def generate_image(
        self,
        input_image_name: str,
        edit_instruction: str,
        output_image_name: str,
    ):
        if len(input_image_name) == 0 or len(edit_instruction) == 0:
            return False, rf"Missing Input"

        if len(output_image_name) == 0:
            return False, rf"Missing Output"

        current_time = self.get_current_time()
        print(rf"Start Time : {current_time}")

        dataset_path = self.correct_path(rf"{self.PROJECT_PATH}\{self.DATASET_NAME}")
        notebook_path = self.correct_path(rf"{self.PROJECT_PATH}\notebook")

        self.create_folder(dataset_path)

        # copy image to the dataset
        shutil.copyfile(
            rf"local_dataset\{input_image_name}",
            rf"{dataset_path}\{input_image_name}",
        )

        data = [
            {
                "time": current_time,
                "status": "IDLE",
                "edit": edit_instruction,
                "input_image_path": input_image_name,
                "output_image_path": output_image_name,
            }
        ]

        self.write_file(data, dataset_path, "data.jsonl")
        # update dataset
        self.GoogleDrive_connection.upload_file(
            "data.jsonl", rf"{self.DATASET_NAME}\data.jsonl"
        )
        self.GoogleDrive_connection.upload_file(
            input_image_name, rf"{self.DATASET_NAME}\{input_image_name}"
        )

        # run notebook
        print(self.run(notebook_path))

        number_of_checks = 0
        while True:
            status = str(self.get_notebook_status()).replace("\n", "")
            print(rf"- status no #{number_of_checks} : {status}")
            number_of_checks += 1
            if "complete" in status:
                break

            if "error" in status:
                return False, "notebook status error"
            if "cancelAcknowledged" in status:
                return False, "notebook status cancelAcknowledged"
            time.sleep(120)

        # get output
        self.GoogleDrive_connection.download_file(
            output_image_name, rf"{dataset_path}\{output_image_name}"
        )
        output_image = self.read_image(rf"{dataset_path}\{output_image_name}")
        # clear input and output

        if output_image is None:
            return False, "An error occured while running, no output image found"

        return True, output_image

    def get_current_time(self):
        return str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


def main():
    pass


if __name__ == "__main__":
    main()