AmirMoris's picture
fix time
ef94047
raw
history blame
5.56 kB
import os
import shutil
import subprocess
import json
import time
from datetime import datetime
from PIL import Image
class API_Connection:
def __init__(self, GD_connection, kaggle_username: str = "", kaggle_key: str = ""):
os.environ["KAGGLE_USERNAME"] = kaggle_username
os.environ["KAGGLE_KEY"] = kaggle_key
self.GoogleDrive_connection = GD_connection
self.PROJECT_PATH = r""
self.NOTEBOOK_ID = "amirmoris/pix2pix"
self.DATASET_NAME = "dataset"
def execute_terminal_command(self, command: str):
try:
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
output, error = process.communicate()
output = str(output.decode("utf-8"))
error = str(error.decode("utf-8"))
print(rf"Command executed successfully: {command}")
return output
except Exception as e:
return None, str(e)
def correct_path(self, path: str):
return path[1:] if path.startswith("\\") else path
def create_folder(self, path: str):
path = self.correct_path(path)
if os.path.exists(path):
shutil.rmtree(path)
self.execute_terminal_command(rf"mkdir {path}")
def pull_kaggle_notebook(self, notebook_path: str):
command = rf"kaggle kernels pull {self.NOTEBOOK_ID} -p {notebook_path} -m"
return self.execute_terminal_command(command)
def push_kaggle_notebook(self, notebook_path: str):
command = rf"kaggle kernels push -p {notebook_path}"
return self.execute_terminal_command(command)
def get_notebook_status(self):
command = rf"kaggle kernels status {self.NOTEBOOK_ID}"
return self.execute_terminal_command(command)
def run(self, notebook_path: str):
notebook_path = self.correct_path(notebook_path)
self.pull_kaggle_notebook(notebook_path)
return self.push_kaggle_notebook(notebook_path)
def write_file(self, data: list, file_path: str, file_name: str = ""):
if len(file_name) > 0:
file_path = rf"{file_path}\{file_name}"
file_path = self.correct_path(file_path)
# Writing JSON data
with open(file_path, "w") as file:
for idx in range(len(data)):
json_string = json.dumps(data[idx]) + (
"\n" if idx < len(data) - 1 else ""
)
file.write(json_string)
def read_image(self, image_path: str):
try:
image = Image.open(image_path)
return image
except IOError:
print("Unable to load image")
return None
def get_notebook_output(self, output_path: str):
output_path = self.correct_path(output_path)
command = rf"kaggle kernels output {self.NOTEBOOK_ID} -p {output_path}"
return self.execute_terminal_command(command)
def generate_image(
self,
input_image_name: str,
edit_instruction: str,
output_image_name: str,
):
if len(input_image_name) == 0 or len(edit_instruction) == 0:
return False, rf"Missing Input"
if len(output_image_name) == 0:
return False, rf"Missing Output"
current_time = self.get_current_time()
print(rf"Start Time : {current_time}")
dataset_path = self.correct_path(rf"{self.PROJECT_PATH}\{self.DATASET_NAME}")
notebook_path = self.correct_path(rf"{self.PROJECT_PATH}\notebook")
self.create_folder(dataset_path)
# copy image to the dataset
shutil.copyfile(
rf"local_dataset\{input_image_name}",
rf"{dataset_path}\{input_image_name}",
)
data = [
{
"time": current_time,
"status": "IDLE",
"edit": edit_instruction,
"input_image_path": input_image_name,
"output_image_path": output_image_name,
}
]
self.write_file(data, dataset_path, "data.jsonl")
# update dataset
self.GoogleDrive_connection.upload_file(
"data.jsonl", rf"{self.DATASET_NAME}\data.jsonl"
)
self.GoogleDrive_connection.upload_file(
input_image_name, rf"{self.DATASET_NAME}\{input_image_name}"
)
# run notebook
print(self.run(notebook_path))
number_of_checks = 0
while True:
status = str(self.get_notebook_status()).replace("\n", "")
print(rf"- status no #{number_of_checks} : {status}")
number_of_checks += 1
if "complete" in status:
break
if "error" in status:
return False, "notebook status error"
if "cancelAcknowledged" in status:
return False, "notebook status cancelAcknowledged"
time.sleep(120)
# get output
self.GoogleDrive_connection.download_file(
output_image_name, rf"{dataset_path}\{output_image_name}"
)
output_image = self.read_image(rf"{dataset_path}\{output_image_name}")
# clear input and output
if output_image is None:
return False, "An error occured while running, no output image found"
return True, output_image
def get_current_time(self):
return str(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def main():
pass
if __name__ == "__main__":
main()