|
|
import os,sys,shutil,subprocess,json,re,ast,tempfile,nbformat,logging,argparse |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from colab_handler import ColabNotebookProcessor |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='[%(asctime)s] %(levelname)s: %(message)s', |
|
|
handlers=[logging.StreamHandler(sys.stdout)] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class NotebookExecutor: |
|
|
def __init__(self, working_dir: str | None = None, verbose: bool = False, output_csv: str = "/tmp/Notebook/notebook_execution_report.csv"): |
|
|
self.working_dir = Path(working_dir) if working_dir else Path("/tmp/Notebook") |
|
|
self.verbose = verbose |
|
|
self.results = [] |
|
|
self.output_csv = output_csv |
|
|
self.colab_processor = ColabNotebookProcessor(str(self.working_dir)) |
|
|
|
|
|
def setup_working_dir(self): |
|
|
"""Create working directory""" |
|
|
self.working_dir.mkdir(parents=True, exist_ok=True) |
|
|
if self.verbose: |
|
|
logger.info(f"Working directory: {self.working_dir}") |
|
|
|
|
|
def clean_working_dir(self): |
|
|
"""Remove working directory""" |
|
|
try: |
|
|
if self.working_dir.exists(): |
|
|
shutil.rmtree(self.working_dir) |
|
|
if self.verbose: |
|
|
logger.info(f"Cleaned working directory: {self.working_dir}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Unable to fully clean working dir: {e}") |
|
|
|
|
|
def _read_notebook_json(self, notebook_path: Path): |
|
|
try: |
|
|
with open(str(notebook_path), "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to read notebook JSON: {e}") |
|
|
|
|
|
def list_available_datasets(self): |
|
|
"""List available datasets in the working directory""" |
|
|
dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'} |
|
|
datasets = [f.name for f in self.working_dir.iterdir() |
|
|
if f.suffix.lower() in dataset_extensions and f.is_file()] |
|
|
|
|
|
if self.verbose and datasets: |
|
|
logger.info(f"Available datasets: {', '.join(datasets)}") |
|
|
|
|
|
return datasets |
|
|
|
|
|
def extract_imports_from_notebook(self, notebook_path: Path): |
|
|
"""Extract third-party imports from notebook via AST (best-effort).""" |
|
|
imports = set() |
|
|
try: |
|
|
nb_json = self._read_notebook_json(notebook_path) |
|
|
except Exception: |
|
|
return set() |
|
|
|
|
|
for cell in nb_json.get("cells", []): |
|
|
if cell.get("cell_type") != "code": |
|
|
continue |
|
|
source = cell.get("source", "") |
|
|
if isinstance(source, list): |
|
|
source = "\n".join(source) |
|
|
try: |
|
|
tree = ast.parse(source) |
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.Import): |
|
|
for alias in node.names: |
|
|
imports.add(alias.name.split(".")[0]) |
|
|
elif isinstance(node, ast.ImportFrom): |
|
|
if node.module: |
|
|
imports.add(node.module.split(".")[0]) |
|
|
except Exception: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
stdlib = { |
|
|
'os','sys','json','re','math','random','datetime','time','collections', |
|
|
'itertools','functools','operator','pathlib','urllib','http','xml','html', |
|
|
'csv','sqlite3','pickle','logging','unittest','argparse','configparser', |
|
|
'io','typing','warnings','copy','string','textwrap','unicodedata','struct', |
|
|
'codecs','calendar','hashlib','hmac','secrets','statistics', 'subprocess' |
|
|
} |
|
|
|
|
|
|
|
|
colab_modules = {'google', 'colab'} |
|
|
|
|
|
third_party = imports - stdlib - colab_modules |
|
|
return third_party |
|
|
|
|
|
def install_packages(self, python_executable: Path, packages: set | list): |
|
|
"""Install packages into the environment (best-effort). Returns (success, stderr_text)""" |
|
|
if not packages: |
|
|
return True, "" |
|
|
|
|
|
|
|
|
package_mapping = { |
|
|
'sklearn': 'scikit-learn', |
|
|
'cv2': 'opencv-python', |
|
|
'PIL': 'Pillow', |
|
|
'bs4': 'beautifulsoup4', |
|
|
} |
|
|
|
|
|
mapped = [package_mapping.get(p, p) for p in packages] |
|
|
|
|
|
|
|
|
for pkg in mapped: |
|
|
try: |
|
|
proc = subprocess.run([str(python_executable), "-m", "pip", "install", pkg], |
|
|
capture_output=True, text=True, timeout=600) |
|
|
if proc.returncode != 0: |
|
|
stderr = proc.stderr or proc.stdout or f"pip install returned {proc.returncode}" |
|
|
logger.warning(f"Failed to install {pkg}: {stderr.strip()[:400]}") |
|
|
return False, stderr |
|
|
except subprocess.TimeoutExpired: |
|
|
msg = f"Timeout while installing {pkg}" |
|
|
logger.warning(msg) |
|
|
return False, msg |
|
|
except Exception as e: |
|
|
msg = f"Error while installing {pkg}: {e}" |
|
|
logger.warning(msg) |
|
|
return False, msg |
|
|
|
|
|
return True, "" |
|
|
|
|
|
def extract_notebook_error(self, stderr_text: str): |
|
|
"""Attempt to extract concise error message from papermill/pip stderr.""" |
|
|
if not stderr_text: |
|
|
return "Unknown error occurred" |
|
|
lines = stderr_text.strip().splitlines() |
|
|
|
|
|
for line in reversed(lines): |
|
|
if any(keyword in line for keyword in ("Traceback", "Error", "Exception", "ModuleNotFoundError", "ImportError")): |
|
|
return line.strip() |
|
|
|
|
|
for line in reversed(lines): |
|
|
if line.strip(): |
|
|
return line.strip() |
|
|
return lines[-1] if lines else "Unknown error" |
|
|
|
|
|
def display_cell_execution_details(self, output_notebook_path: Path): |
|
|
"""Verbose: show last executed cells (best-effort).""" |
|
|
try: |
|
|
nb = nbformat.read(str(output_notebook_path), as_version=4) |
|
|
except Exception as e: |
|
|
logger.info(f"Could not read output notebook for cell details: {e}") |
|
|
return |
|
|
|
|
|
logger.info("CELL-BY-CELL EXECUTION DETAILS (showing up to 10 code cells)") |
|
|
shown = 0 |
|
|
for i, cell in enumerate(nb.cells, start=1): |
|
|
if cell.cell_type != "code": |
|
|
continue |
|
|
shown += 1 |
|
|
logger.info(f"--- CELL {i} ---") |
|
|
src_preview = ("\n".join(cell.source.splitlines()[:6]) + ("\n..." if len(cell.source.splitlines()) > 6 else "")) |
|
|
logger.info("SOURCE (first lines):\n" + src_preview) |
|
|
if getattr(cell, "outputs", None): |
|
|
for output in cell.outputs[-2:]: |
|
|
if output.output_type == "stream": |
|
|
text = getattr(output, "text", "") |
|
|
logger.info("STREAM OUTPUT:\n" + ("\n".join(text.splitlines()[-4:]))) |
|
|
elif output.output_type == "error": |
|
|
ename = getattr(output, "ename", "") |
|
|
evalue = getattr(output, "evalue", "") |
|
|
logger.info(f"ERROR: {ename}: {evalue}") |
|
|
if shown >= 10: |
|
|
break |
|
|
|
|
|
def run_notebook(self, notebook_path: str | Path, timeout: int = 1800): |
|
|
""" |
|
|
Run a single notebook with Colab code replacement and dataset support. |
|
|
Returns a dict: {'notebook': <name>, 'status': 'Pass'|'Fail', 'error_message': <msg>} |
|
|
""" |
|
|
try: |
|
|
if isinstance(notebook_path, str): |
|
|
if notebook_path.startswith('/'): |
|
|
notebook_full_path = Path(notebook_path) |
|
|
else: |
|
|
notebook_full_path = Path('/tmp/Notebook') / notebook_path |
|
|
else: |
|
|
notebook_full_path = Path(notebook_path) |
|
|
except Exception as e: |
|
|
return {"notebook": str(notebook_path), "status": "Fail", "error_message": f"Invalid path: {e}"} |
|
|
|
|
|
notebook_name = notebook_full_path.name |
|
|
notebook_dir = Path('/tmp/Notebook') |
|
|
|
|
|
|
|
|
if not notebook_full_path.exists(): |
|
|
return {"notebook": notebook_name, "status": "Fail", "error_message": f"Notebook not found at: {notebook_full_path}"} |
|
|
|
|
|
|
|
|
datasets = self.list_available_datasets() |
|
|
if datasets: |
|
|
logger.info(f"Processing notebook with {len(datasets)} available dataset(s)") |
|
|
|
|
|
|
|
|
try: |
|
|
processed_notebook_path = self.colab_processor.process_notebook(str(notebook_full_path)) |
|
|
if self.verbose: |
|
|
logger.info(f"Processed notebook for Colab compatibility: {processed_notebook_path}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to process Colab compatibility: {e}") |
|
|
processed_notebook_path = str(notebook_full_path) |
|
|
|
|
|
|
|
|
env_path = notebook_dir / "venv" |
|
|
if env_path.exists(): |
|
|
try: |
|
|
shutil.rmtree(env_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
venv_proc = subprocess.run([sys.executable, "-m", "venv", str(env_path)], capture_output=True, text=True, timeout=120) |
|
|
if venv_proc.returncode != 0: |
|
|
stderr = venv_proc.stderr or venv_proc.stdout |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Failed to create venv: {stderr.strip()[:400]}"} |
|
|
except subprocess.TimeoutExpired: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": "Timeout while creating virtual environment"} |
|
|
except Exception as e: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Error creating venv: {e}"} |
|
|
|
|
|
|
|
|
if os.name == "nt": |
|
|
python_exec = env_path / "Scripts" / "python.exe" |
|
|
else: |
|
|
python_exec = env_path / "bin" / "python" |
|
|
|
|
|
if not python_exec.exists(): |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Python executable not found in venv: {python_exec}"} |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
up_proc = subprocess.run([str(python_exec), "-m", "pip", "install", "--upgrade", "pip"], |
|
|
capture_output=True, text=True, timeout=120) |
|
|
if up_proc.returncode != 0: |
|
|
logger.warning("pip upgrade returned non-zero; continuing if possible") |
|
|
|
|
|
|
|
|
pinned = [ |
|
|
"papermill==2.5.0", |
|
|
"ipykernel==6.29.5", |
|
|
"jupyter==1.0.0" |
|
|
] |
|
|
install_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + pinned, |
|
|
capture_output=True, text=True, timeout=600) |
|
|
if install_proc.returncode != 0: |
|
|
stderr_text = install_proc.stderr or install_proc.stdout or "pip install returned non-zero" |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Failed to setup environment (pip install core packages): {stderr_text.strip()[:800]}"} |
|
|
except subprocess.TimeoutExpired: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": "Timeout installing core packages"} |
|
|
except Exception as e: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Error installing core packages: {e}"} |
|
|
|
|
|
|
|
|
common_packages = ["numpy", "pandas", "matplotlib", "seaborn", "scikit-learn", "plotly"] |
|
|
try: |
|
|
common_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + common_packages, |
|
|
capture_output=True, text=True, timeout=600) |
|
|
if common_proc.returncode != 0: |
|
|
logger.warning("Installing common packages returned non-zero; continuing") |
|
|
except Exception: |
|
|
logger.warning("Unexpected error during common package install; continuing") |
|
|
|
|
|
|
|
|
|
|
|
inferred = self.extract_imports_from_notebook(notebook_full_path) |
|
|
if inferred: |
|
|
success, stderr_text = self.install_packages(python_exec, inferred) |
|
|
if not success: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Failed to install inferred packages: {stderr_text.strip()[:800]}"} |
|
|
|
|
|
|
|
|
kernel_name = f"nb_{re.sub(r'[^A-Za-z0-9_]', '_', notebook_name)}" |
|
|
try: |
|
|
kernel_proc = subprocess.run([str(python_exec), "-m", "ipykernel", "install", "--user", |
|
|
"--name", kernel_name, "--display-name", f"Python ({kernel_name})"], |
|
|
capture_output=True, text=True, timeout=60) |
|
|
if kernel_proc.returncode != 0: |
|
|
stderr_text = kernel_proc.stderr or kernel_proc.stdout or "ipykernel install returned non-zero" |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Failed to install kernel: {stderr_text.strip()[:800]}"} |
|
|
except subprocess.TimeoutExpired: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": "Timeout while installing kernel"} |
|
|
except Exception as e: |
|
|
return {"notebook": notebook_name, "status": "Fail", |
|
|
"error_message": f"Error installing kernel: {e}"} |
|
|
|
|
|
|
|
|
output_path = notebook_dir / "output.ipynb" |
|
|
try: |
|
|
exec_proc = subprocess.run([str(python_exec), "-m", "papermill", |
|
|
processed_notebook_path, str(output_path), |
|
|
"--kernel", kernel_name, "--no-progress-bar"], |
|
|
capture_output=True, text=True, timeout=timeout, cwd=str(notebook_dir)) |
|
|
if exec_proc.returncode == 0: |
|
|
status = "Pass" |
|
|
error_message = "" |
|
|
if self.verbose: |
|
|
logger.info("Notebook executed successfully") |
|
|
if output_path.exists(): |
|
|
self.display_cell_execution_details(output_path) |
|
|
else: |
|
|
status = "Fail" |
|
|
stderr_text = (exec_proc.stderr or "") + "\n" + (exec_proc.stdout or "") |
|
|
concise = self.extract_notebook_error(stderr_text) |
|
|
error_message = f"Execution failed: {concise}" |
|
|
if self.verbose: |
|
|
logger.error(error_message) |
|
|
if output_path.exists(): |
|
|
self.display_cell_execution_details(output_path) |
|
|
except subprocess.TimeoutExpired: |
|
|
status = "Fail" |
|
|
error_message = f"Execution timed out after {timeout} seconds" |
|
|
except Exception as e: |
|
|
status = "Fail" |
|
|
error_message = f"Papermill execution error: {e}" |
|
|
|
|
|
|
|
|
try: |
|
|
if processed_notebook_path != str(notebook_full_path) and os.path.exists(processed_notebook_path): |
|
|
os.remove(processed_notebook_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
if env_path.exists(): |
|
|
shutil.rmtree(env_path) |
|
|
except Exception: |
|
|
logger.info("Could not remove venv directory (non-fatal)") |
|
|
|
|
|
result = {"notebook": notebook_name, "status": status, "error_message": error_message} |
|
|
|
|
|
self.results.append(result) |
|
|
|
|
|
self._update_csv_report() |
|
|
return result |
|
|
|
|
|
def _update_csv_report(self): |
|
|
"""Write incremental CSV with columns notebook,status,error_message""" |
|
|
try: |
|
|
df = pd.DataFrame(self.results) |
|
|
|
|
|
cols = ['notebook', 'status', 'error_message'] |
|
|
for c in cols: |
|
|
if c not in df.columns: |
|
|
df[c] = "" |
|
|
df = df[cols] |
|
|
df.to_csv(self.output_csv, index=False) |
|
|
if self.verbose: |
|
|
logger.info(f"Wrote report to {self.output_csv}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to write CSV report: {e}") |
|
|
|
|
|
|
|
|
def execute_notebook(path: str, timeout: int = 1800, verbose: bool = False, output_csv: str = "notebook_execution_report.csv"): |
|
|
""" |
|
|
Public function to execute a single notebook with Colab support and dataset integration. |
|
|
|
|
|
Args: |
|
|
path: path to the uploaded .ipynb file (string) |
|
|
timeout: execution timeout in seconds (default 1800) |
|
|
verbose: enable verbose logging |
|
|
output_csv: path to write CSV report (default notebook_execution_report.csv) |
|
|
|
|
|
Returns: |
|
|
result dict: {'notebook': <name>, 'status': 'Pass'|'Fail', 'error_message': <msg>} |
|
|
""" |
|
|
executor = NotebookExecutor(verbose=verbose, output_csv=output_csv) |
|
|
executor.setup_working_dir() |
|
|
result = executor.run_notebook(path, timeout=timeout) |
|
|
return result |
|
|
|
|
|
|
|
|
def main_call(notebook): |
|
|
"""Main function for executing notebook with enhanced Colab and dataset support""" |
|
|
executor = NotebookExecutor(verbose=True) |
|
|
executor.setup_working_dir() |
|
|
|
|
|
result = executor.run_notebook(notebook, timeout=1800) |
|
|
|
|
|
return result |