| import os |
| import shutil |
| import subprocess |
| import json |
| import time |
| from datetime import datetime |
| from PIL import Image |
|
|
|
|
| class API_Connection: |
| def __init__(self, GD_connection, kaggle_username: str = "", kaggle_key: str = ""): |
| os.environ["KAGGLE_USERNAME"] = kaggle_username |
| os.environ["KAGGLE_KEY"] = kaggle_key |
|
|
| self.GoogleDrive_connection = GD_connection |
|
|
| self.PROJECT_PATH = r"" |
| self.NOTEBOOK_ID = "amirmoris/pix2pix" |
| self.DATASET_NAME = "dataset" |
|
|
| def execute_terminal_command(self, command: str): |
| try: |
| process = subprocess.Popen( |
| command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| output, error = process.communicate() |
| output = str(output.decode("utf-8")) |
| error = str(error.decode("utf-8")) |
| print(rf"Command executed successfully: {command}") |
| return output |
| except Exception as e: |
| return None, str(e) |
|
|
| def correct_path(self, path: str): |
| return path[1:] if path.startswith("\\") else path |
|
|
| def create_folder(self, path: str): |
| path = self.correct_path(path) |
| if os.path.exists(path): |
| shutil.rmtree(path) |
| self.execute_terminal_command(rf"mkdir {path}") |
|
|
| def pull_kaggle_notebook(self, notebook_path: str): |
| command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m" |
| return self.execute_terminal_command(command) |
|
|
| def push_kaggle_notebook(self, notebook_path: str): |
| command = rf"kaggle kernels push -p {notebook_path}" |
| return self.execute_terminal_command(command) |
|
|
| def get_notebook_status(self): |
| command = rf"kaggle kernels status {self.NOTEBOOK_ID}" |
| return self.execute_terminal_command(command) |
|
|
| def run(self, notebook_path: str): |
| notebook_path = self.correct_path(notebook_path) |
| self.pull_kaggle_notebook(notebook_path) |
| return self.push_kaggle_notebook(notebook_path) |
|
|
| def write_file(self, data: list, file_path: str, file_name: str = ""): |
| if len(file_name) > 0: |
| file_path = rf"{file_path}\{file_name}" |
|
|
| file_path = self.correct_path(file_path) |
| |
| with open(file_path, "w") as file: |
| for idx in range(len(data)): |
| json_string = json.dumps(data[idx]) + ( |
| "\n" if idx < len(data) - 1 else "" |
| ) |
| file.write(json_string) |
|
|
| def read_image(self, image_path: str): |
| try: |
| image = Image.open(image_path) |
| return image |
| except IOError: |
| print("Unable to load image") |
| return None |
|
|
| def get_notebook_output(self, output_path: str): |
| output_path = self.correct_path(output_path) |
| command = rf"kaggle kernels output {self.NOTEBOOK_ID} -p {output_path}" |
| return self.execute_terminal_command(command) |
|
|
| def generate_image( |
| self, |
| input_image_name: str, |
| edit_instruction: str, |
| output_image_name: str, |
| ): |
| if len(input_image_name) == 0 or len(edit_instruction) == 0: |
| return False, rf"Missing Input" |
|
|
| if len(output_image_name) == 0: |
| return False, rf"Missing Output" |
|
|
| current_time = self.get_current_time() |
| print(rf"Start Time : {current_time}") |
|
|
| dataset_path = self.correct_path(rf"{self.PROJECT_PATH}\{self.DATASET_NAME}") |
| notebook_path = self.correct_path(rf"{self.PROJECT_PATH}\notebook") |
|
|
| self.create_folder(dataset_path) |
|
|
| |
| shutil.copyfile( |
| rf"local_dataset\{input_image_name}", |
| rf"{dataset_path}\{input_image_name}", |
| ) |
|
|
| data = [ |
| { |
| "time": current_time, |
| "status": "IDLE", |
| "edit": edit_instruction, |
| "input_image_path": input_image_name, |
| "output_image_path": output_image_name, |
| } |
| ] |
|
|
| self.write_file(data, dataset_path, "data.jsonl") |
| |
| self.GoogleDrive_connection.upload_file( |
| "data.jsonl", rf"{self.DATASET_NAME}\data.jsonl" |
| ) |
| self.GoogleDrive_connection.upload_file( |
| input_image_name, rf"{self.DATASET_NAME}\{input_image_name}" |
| ) |
|
|
| |
| print(self.run(notebook_path)) |
|
|
| number_of_checks = 0 |
| while True: |
| status = str(self.get_notebook_status()).replace("\n", "") |
| print(rf"- status no #{number_of_checks} : {status}") |
| number_of_checks += 1 |
| if "complete" in status: |
| break |
|
|
| if "error" in status: |
| return False, "notebook status error" |
| if "cancelAcknowledged" in status: |
| return False, "notebook status cancelAcknowledged" |
| time.sleep(120) |
|
|
| |
| self.GoogleDrive_connection.download_file( |
| output_image_name, rf"{dataset_path}\{output_image_name}" |
| ) |
| output_image = self.read_image(rf"{dataset_path}\{output_image_name}") |
| |
|
|
| if output_image is None: |
| return False, "An error occured while running, no output image found" |
|
|
| return True, output_image |
|
|
| def get_current_time(self): |
| return str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
|
|
|
|
| def main(): |
| pass |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|