File size: 3,684 Bytes
5d09ff5
c93cbfc
 
ff60762
5d09ff5
 
ff60762
5d09ff5
 
 
933c00b
88b8c75
c93cbfc
5d09ff5
 
 
 
c93cbfc
5d09ff5
 
 
c93cbfc
5d09ff5
 
 
c93cbfc
5d09ff5
 
 
 
 
c93cbfc
147c862
c93cbfc
 
 
 
 
 
 
5d09ff5
 
c93cbfc
5d09ff5
c93cbfc
ef94047
 
c93cbfc
 
5d09ff5
c93cbfc
5d09ff5
 
c93cbfc
5d09ff5
 
 
ef94047
c93cbfc
5d09ff5
 
c93cbfc
 
 
 
 
5d09ff5
 
 
c93cbfc
 
5d09ff5
9aa7219
 
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88b8c75
5d09ff5
 
c93cbfc
5d09ff5
 
 
 
 
 
 
 
 
 
 
 
147c862
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import time

from Helper_functions import *


class API_Connection:
    def __init__(self, gd_connection, kaggle_username: str = "", kaggle_key: str = ""):
        os.environ["KAGGLE_USERNAME"] = kaggle_username
        os.environ["KAGGLE_KEY"] = kaggle_key

        self.GoogleDrive_connection = gd_connection

        self.NOTEBOOK_ID = "amirmorris/pix2pix-model"
        self.DATASET_NAME = "dataset"

    def pull_kaggle_notebook(self, notebook_path: str):
        command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m"
        return execute_terminal_command(command)

    def push_kaggle_notebook(self, notebook_path: str):
        command = rf"kaggle kernels push -p {notebook_path}"
        return execute_terminal_command(command)

    def get_notebook_status(self):
        command = rf"kaggle kernels status {self.NOTEBOOK_ID}"
        return execute_terminal_command(command)

    def run(self, notebook_path: str):
        self.pull_kaggle_notebook(notebook_path)
        return self.push_kaggle_notebook(notebook_path)

    def generate_image(self, input_image_name: str, edit_instruction: str, output_image_name: str,
                       steps: int, seed: int, cfgtext: float, cfgimage: float, resolution: int
                       ):

        if len(input_image_name) == 0:
            return False, rf"Missing Input: input_image"

        if len(edit_instruction) == 0:
            return False, rf"Missing Input: edit_instruction"

        if len(output_image_name) == 0:
            return False, rf"Output Error: Missing output_image path"

        current_time = get_current_time()
        print(rf"Start Time : {current_time}")

        dataset_path = correct_path(self.DATASET_NAME)
        notebook_path = correct_path("notebook")

        create_folder(dataset_path)

        # copy image to the dataset
        copy_file(rf"local_dataset\{input_image_name}", rf"{dataset_path}\{input_image_name}")

        data = [
            {
                "time": current_time,
                "edit_instruction": edit_instruction,
                "input_image_path": input_image_name,
                "output_image_path": output_image_name,
                "steps": steps,
                "seed": seed,
                "cfg-text": cfgtext,
                "cfg-image": cfgimage,
                "resolution": resolution
            }
        ]

        write_file(data, dataset_path, "data.json")

        # update dataset
        self.GoogleDrive_connection.upload_file("data.json", rf"{self.DATASET_NAME}\data.json")
        self.GoogleDrive_connection.upload_file(input_image_name, rf"{self.DATASET_NAME}\{input_image_name}")

        # run notebook
        print(self.run(notebook_path))

        number_of_checks = 0
        while True:
            status = str(self.get_notebook_status()).replace("\n", "")
            print(rf"- status no #{number_of_checks} : {status}")
            number_of_checks += 1
            if "complete" in status:
                break

            if "error" in status:
                return False, "notebook status error"
            if "cancelAcknowledged" in status:
                return False, "notebook status cancelAcknowledged"
            time.sleep(120)

        # get output
        self.GoogleDrive_connection.download_file(
            output_image_name, rf"{dataset_path}\{output_image_name}"
        )
        output_image = read_image(rf"{dataset_path}\{output_image_name}")

        if output_image is None:
            return False, "An error occured while running, no output image found"

        return True, output_image


def main():
    pass


if __name__ == "__main__":
    main()