# Copyright 2023-2024 The SapientML Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import glob import os import re import shutil import subprocess import threading import zipfile from pathlib import Path from typing import Optional import nbformat from p2j.p2j import p2j from .result_extractor import ResultExtractor class MakeResultThread(threading.Thread): def __init__( self, dirpath: Path, csv_name: Optional[str], test_csv_name: Optional[str], csv_encoding: Optional[str], available_file_patterns: list, downloadable_files: list, in_memory_files: list, debug: bool = False, ): self.dirpath = dirpath self.exception = None self.csv_name = csv_name self.test_csv_name = test_csv_name self.csv_encoding = csv_encoding self.files = dict() self.debug = debug self.available_file_patterns = ( available_file_patterns.copy() if available_file_patterns is not None else [ "script_code_explainability.json", ".skeleton.json", "final_predict.py", "final_train.py", "permutation_importance.csv", "run_info.json", "script.ipynb", "script.py", "training.csv", "test.csv", "lib", "candidates", "*.pkl", ] ) self.downloadable_files = ( downloadable_files.copy() if downloadable_files is not None else [ "final_predict.py", "final_train.py", "permutation_importance.csv", "run_info.json", "script.ipynb", "script.py", "test.csv", "training.csv", "lib/sample_dataset.py", "candidates/lib/sample_dataset.py", ] ) self.in_memory_files = ( in_memory_files.copy() if in_memory_files is not None else [ "result.zip", "script_code_explainability.json", ".skeleton.json", "permutation_importance.csv", "run_info.json", "script.ipynb", "script.py", "script.html", ] ) threading.Thread.__init__(self) def run(self): try: dirpath = Path(self.dirpath) lib_path = dirpath / "lib" # if os.path.exists(lib_path / "timeauto.py"): # os.remove(lib_path / "timeauto.py") candidate_path = dirpath / "candidates" candidate_path.mkdir(parents=True, exist_ok=True) if lib_path.exists(): candidate_lib_path = candidate_path / "lib" candidate_lib_path.mkdir(parents=True, exist_ok=True) for file in os.listdir(lib_path): if os.path.isdir(os.path.join(lib_path, file)): continue shutil.copyfile(lib_path / file, candidate_lib_path / file) script_pattern = "[1-3]_script.py" scripts = dirpath.glob(script_pattern) for script in scripts: shutil.move(str(script), str(candidate_path / script.name)) if "script.ipynb" in self.available_file_patterns: p2j( candidate_path / script.name, candidate_path / f"{script.name.split('.')[-2]}.ipynb", overwrite=True, ) json_pattern = "[1-3]_script_code_explainability.json" json_files = dirpath.glob(json_pattern) for json_file in json_files: shutil.move(str(json_file), str(candidate_path / json_file.name)) # final_script.ipynbが存在する場合に、ファイル名を変更する if (dirpath / "final_script.ipynb").exists(): (dirpath / "final_script.ipynb").rename(dirpath / "script_raw.ipynb") explained_notebook_path = Path(dirpath / "final_script.ipynb.out.ipynb") data = "" if explained_notebook_path.exists(): with explained_notebook_path.open("r") as f: data = f.read() # if result.image_url: # data = data.replace(result.image_url, Path(result.image_url).name) # result.image_url = Path(result.image_url # ).replace(dirpath / Path(result.image_url).name) data = data.replace(str(dirpath), ".") with explained_notebook_path.open("w") as f: f.write(data) explained_notebook_path.rename(Path(dirpath / "script.ipynb")) if Path(dirpath / "final_script.py").exists(): Path(dirpath / "final_script.py").rename(Path(dirpath / "script.py")) ResultExtractor.remake_run_info(dirpath) file_names = glob.glob(os.path.join(self.dirpath, "*")) available_files = [] for pattern in self.available_file_patterns: available_file_pattern = os.path.join(self.dirpath, pattern) available_files.extend(glob.glob(available_file_pattern)) if self.debug is False: for file_name in file_names: if file_name not in available_files: if os.path.isfile(file_name): os.remove(file_name) else: shutil.rmtree(file_name) self.downloadable_files += [ os.path.relpath(p, dirpath) for p in (dirpath / "candidates").glob("*_script.py") ] + [os.path.relpath(p, dirpath) for p in (dirpath / "candidates").glob("*_script.ipynb")] zip_dir = dirpath / "zip" zip_dir.mkdir(parents=True, exist_ok=True) (zip_dir / "lib").mkdir(parents=True, exist_ok=True) (zip_dir / "candidates" / "lib").mkdir(parents=True, exist_ok=True) for file in self.downloadable_files: if (dirpath / Path(file)).exists(): shutil.copyfile(dirpath / Path(file), zip_dir / Path(file)) # Rename file training.cs and test.csv if present if Path(zip_dir / "training.csv").exists(): if "https://" not in self.csv_name: Path(zip_dir / "training.csv").rename(zip_dir / self.csv_name) self.downloadable_files.remove("training.csv") self.downloadable_files.append(self.csv_name) if self.test_csv_name is not None and Path(zip_dir / "test.csv").exists(): Path(zip_dir / "test.csv").rename(zip_dir / self.test_csv_name) self.downloadable_files.remove("test.csv") self.downloadable_files.append(self.test_csv_name) self.downloadable_files = [zip_dir / Path(file_name) for file_name in self.downloadable_files] with zipfile.ZipFile(file=dirpath / "result.zip", mode="w") as zf: for filepath in self.downloadable_files: if filepath.is_file(): if os.path.splitext(filepath)[1] == ".py" and self.csv_name is not None: with open(filepath, "r") as f: src = f.read() with open(filepath, "w") as f: src = re.sub(r"r\"/.+/training\.csv", '"training.csv', src) src = re.sub(r"r\"/.+/test\.csv", '"test.csv', src) src = src.replace("training.csv", f"{self.csv_name}") if self.test_csv_name is not None: src = src.replace("test.csv", f"{self.test_csv_name}") f.write(src) if os.path.splitext(filepath)[1] == ".ipynb" and self.csv_name is not None: with open(filepath, "r", encoding="utf-8") as nb_file: notebook_content = nb_file.read() nb = nbformat.reads(notebook_content, as_version=4) for cell in nb.cells: if cell.cell_type == "code": if "source" in cell: cell["source"] = re.sub( r"r\"/.+/training\.csv", '"training.csv', cell["source"] ) cell["source"] = re.sub(r"r\"/.+/test\.csv", '"test.csv', cell["source"]) cell["source"] = ( cell["source"] .replace("training.csv", f"{self.csv_name}") .replace("test.csv", f"{self.test_csv_name}") ) if "outputs" in cell: outputs = cell["outputs"] for output in outputs: if "output_type" in output and output["output_type"] == "stream": output["text"] = re.sub( r"/tmp/.+/.+\.py", "scrip.py", output.get("text", "") ) with open(filepath, "w", encoding="utf-8") as new_nb_file: nbformat.write(nb, new_nb_file) zf.write(filepath, arcname=filepath.relative_to(zip_dir)) if self.debug is False: shutil.rmtree(zip_dir) # convert ipynb to html to display generate result subprocess.run( ["jupyter", "nbconvert", "--to", "html", dirpath / "script.ipynb"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) self.in_memory_files += ( [os.path.relpath(p, dirpath) for p in (dirpath / "candidates").glob("*_script.py")] + [os.path.relpath(p, dirpath) for p in (dirpath / "candidates").glob("*_script.ipynb")] + [ os.path.relpath(p, dirpath) for p in (dirpath / "candidates").glob("*_script_code_explainability.json") ] ) for file in self.in_memory_files: if os.path.exists(dirpath / file): with open( dirpath / file, "rb" if file.endswith(".zip") else "r", encoding=self.csv_encoding if file == "training.csv" else None, ) as f: self.files[file] = f.read() if self.debug is False: os.remove(dirpath / file) if self.debug is False: shutil.rmtree(dirpath / "lib") shutil.rmtree(dirpath / "candidates") for filepath in dirpath.glob("*"): if filepath.is_file(): filepath.unlink() except Exception as e: self.exception = e def get_exception(self): return self.exception def get_files(self): return self.files