import os import shutil import subprocess import json import time from datetime import datetime from PIL import Image class API_Connection: def __init__(self, GD_connection, kaggle_username: str = "", kaggle_key: str = ""): os.environ["KAGGLE_USERNAME"] = kaggle_username os.environ["KAGGLE_KEY"] = kaggle_key self.GoogleDrive_connection = GD_connection self.PROJECT_PATH = r"" self.NOTEBOOK_ID = "amirmoris/pix2pix" self.DATASET_NAME = "dataset" def execute_terminal_command(self, command: str): try: process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, error = process.communicate() output = str(output.decode("utf-8")) error = str(error.decode("utf-8")) print(rf"Command executed successfully: {command}") return output except Exception as e: return None, str(e) def correct_path(self, path: str): return path[1:] if path.startswith("\\") else path def create_folder(self, path: str): path = self.correct_path(path) if os.path.exists(path): shutil.rmtree(path) self.execute_terminal_command(rf"mkdir {path}") def pull_kaggle_notebook(self, notebook_path: str): command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m" return self.execute_terminal_command(command) def push_kaggle_notebook(self, notebook_path: str): command = rf"kaggle kernels push -p {notebook_path}" return self.execute_terminal_command(command) def get_notebook_status(self): command = rf"kaggle kernels status {self.NOTEBOOK_ID}" return self.execute_terminal_command(command) def run(self, notebook_path: str): notebook_path = self.correct_path(notebook_path) self.pull_kaggle_notebook(notebook_path) return self.push_kaggle_notebook(notebook_path) def write_file(self, data: list, file_path: str, file_name: str = ""): if len(file_name) > 0: file_path = rf"{file_path}\{file_name}" file_path = self.correct_path(file_path) # Writing JSON data with open(file_path, "w") as file: for idx in range(len(data)): json_string = json.dumps(data[idx]) + ( "\n" if idx < len(data) - 1 else "" ) file.write(json_string) def read_image(self, image_path: str): try: image = Image.open(image_path) return image except IOError: print("Unable to load image") return None def get_notebook_output(self, output_path: str): output_path = self.correct_path(output_path) command = rf"kaggle kernels output {self.NOTEBOOK_ID} -p {output_path}" return self.execute_terminal_command(command) def generate_image( self, input_image_name: str, edit_instruction: str, output_image_name: str, ): if len(input_image_name) == 0 or len(edit_instruction) == 0: return False, rf"Missing Input" if len(output_image_name) == 0: return False, rf"Missing Output" current_time = self.get_current_time() print(rf"Start Time : {current_time}") dataset_path = self.correct_path(rf"{self.PROJECT_PATH}\{self.DATASET_NAME}") notebook_path = self.correct_path(rf"{self.PROJECT_PATH}\notebook") self.create_folder(dataset_path) # copy image to the dataset shutil.copyfile( rf"local_dataset\{input_image_name}", rf"{dataset_path}\{input_image_name}", ) data = [ { "time": current_time, "status": "IDLE", "edit": edit_instruction, "input_image_path": input_image_name, "output_image_path": output_image_name, } ] self.write_file(data, dataset_path, "data.jsonl") # update dataset self.GoogleDrive_connection.upload_file( "data.jsonl", rf"{self.DATASET_NAME}\data.jsonl" ) self.GoogleDrive_connection.upload_file( input_image_name, rf"{self.DATASET_NAME}\{input_image_name}" ) # run notebook print(self.run(notebook_path)) number_of_checks = 0 while True: status = str(self.get_notebook_status()).replace("\n", "") print(rf"- status no #{number_of_checks} : {status}") number_of_checks += 1 if "complete" in status: break if "error" in status: return False, "notebook status error" if "cancelAcknowledged" in status: return False, "notebook status cancelAcknowledged" time.sleep(120) # get output self.GoogleDrive_connection.download_file( output_image_name, rf"{dataset_path}\{output_image_name}" ) output_image = self.read_image(rf"{dataset_path}\{output_image_name}") # clear input and output if output_image is None: return False, "An error occured while running, no output image found" return True, output_image def get_current_time(self): return str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def main(): pass if __name__ == "__main__": main()