File size: 5,555 Bytes
5d09ff5 88b8c75 5d09ff5 88b8c75 5d09ff5 88b8c75 5d09ff5 ef94047 5d09ff5 ef94047 295e086 5d09ff5 88b8c75 5d09ff5 88b8c75 5d09ff5 88b8c75 5d09ff5 88b8c75 5d09ff5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | import os
import shutil
import subprocess
import json
import time
from datetime import datetime
from PIL import Image
class API_Connection:
def __init__(self, GD_connection, kaggle_username: str = "", kaggle_key: str = ""):
os.environ["KAGGLE_USERNAME"] = kaggle_username
os.environ["KAGGLE_KEY"] = kaggle_key
self.GoogleDrive_connection = GD_connection
self.PROJECT_PATH = r""
self.NOTEBOOK_ID = "amirmoris/pix2pix"
self.DATASET_NAME = "dataset"
def execute_terminal_command(self, command: str):
try:
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
output, error = process.communicate()
output = str(output.decode("utf-8"))
error = str(error.decode("utf-8"))
print(rf"Command executed successfully: {command}")
return output
except Exception as e:
return None, str(e)
def correct_path(self, path: str):
return path[1:] if path.startswith("\\") else path
def create_folder(self, path: str):
path = self.correct_path(path)
if os.path.exists(path):
shutil.rmtree(path)
self.execute_terminal_command(rf"mkdir {path}")
def pull_kaggle_notebook(self, notebook_path: str):
command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m"
return self.execute_terminal_command(command)
def push_kaggle_notebook(self, notebook_path: str):
command = rf"kaggle kernels push -p {notebook_path}"
return self.execute_terminal_command(command)
def get_notebook_status(self):
command = rf"kaggle kernels status {self.NOTEBOOK_ID}"
return self.execute_terminal_command(command)
def run(self, notebook_path: str):
notebook_path = self.correct_path(notebook_path)
self.pull_kaggle_notebook(notebook_path)
return self.push_kaggle_notebook(notebook_path)
def write_file(self, data: list, file_path: str, file_name: str = ""):
if len(file_name) > 0:
file_path = rf"{file_path}\{file_name}"
file_path = self.correct_path(file_path)
# Writing JSON data
with open(file_path, "w") as file:
for idx in range(len(data)):
json_string = json.dumps(data[idx]) + (
"\n" if idx < len(data) - 1 else ""
)
file.write(json_string)
def read_image(self, image_path: str):
try:
image = Image.open(image_path)
return image
except IOError:
print("Unable to load image")
return None
def get_notebook_output(self, output_path: str):
output_path = self.correct_path(output_path)
command = rf"kaggle kernels output {self.NOTEBOOK_ID} -p {output_path}"
return self.execute_terminal_command(command)
def generate_image(
self,
input_image_name: str,
edit_instruction: str,
output_image_name: str,
):
if len(input_image_name) == 0 or len(edit_instruction) == 0:
return False, rf"Missing Input"
if len(output_image_name) == 0:
return False, rf"Missing Output"
current_time = self.get_current_time()
print(rf"Start Time : {current_time}")
dataset_path = self.correct_path(rf"{self.PROJECT_PATH}\{self.DATASET_NAME}")
notebook_path = self.correct_path(rf"{self.PROJECT_PATH}\notebook")
self.create_folder(dataset_path)
# copy image to the dataset
shutil.copyfile(
rf"local_dataset\{input_image_name}",
rf"{dataset_path}\{input_image_name}",
)
data = [
{
"time": current_time,
"status": "IDLE",
"edit": edit_instruction,
"input_image_path": input_image_name,
"output_image_path": output_image_name,
}
]
self.write_file(data, dataset_path, "data.jsonl")
# update dataset
self.GoogleDrive_connection.upload_file(
"data.jsonl", rf"{self.DATASET_NAME}\data.jsonl"
)
self.GoogleDrive_connection.upload_file(
input_image_name, rf"{self.DATASET_NAME}\{input_image_name}"
)
# run notebook
print(self.run(notebook_path))
number_of_checks = 0
while True:
status = str(self.get_notebook_status()).replace("\n", "")
print(rf"- status no #{number_of_checks} : {status}")
number_of_checks += 1
if "complete" in status:
break
if "error" in status:
return False, "notebook status error"
if "cancelAcknowledged" in status:
return False, "notebook status cancelAcknowledged"
time.sleep(120)
# get output
self.GoogleDrive_connection.download_file(
output_image_name, rf"{dataset_path}\{output_image_name}"
)
output_image = self.read_image(rf"{dataset_path}\{output_image_name}")
# clear input and output
if output_image is None:
return False, "An error occured while running, no output image found"
return True, output_image
def get_current_time(self):
return str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def main():
pass
if __name__ == "__main__":
main()
|