import os,sys,shutil,subprocess,json,re,ast,tempfile,nbformat,logging,argparse import pandas as pd from pathlib import Path from colab_handler import ColabNotebookProcessor # -------- Logging -------- logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) # -------- Executor Class -------- class NotebookExecutor: def __init__(self, working_dir: str | None = None, verbose: bool = False, output_csv: str = "/tmp/Notebook/notebook_execution_report.csv"): self.working_dir = Path(working_dir) if working_dir else Path("/tmp/Notebook") self.verbose = verbose self.results = [] self.output_csv = output_csv self.colab_processor = ColabNotebookProcessor(str(self.working_dir)) def setup_working_dir(self): """Create working directory""" self.working_dir.mkdir(parents=True, exist_ok=True) if self.verbose: logger.info(f"Working directory: {self.working_dir}") def clean_working_dir(self): """Remove working directory""" try: if self.working_dir.exists(): shutil.rmtree(self.working_dir) if self.verbose: logger.info(f"Cleaned working directory: {self.working_dir}") except Exception as e: logger.warning(f"Unable to fully clean working dir: {e}") def _read_notebook_json(self, notebook_path: Path): try: with open(str(notebook_path), "r", encoding="utf-8") as f: return json.load(f) except Exception as e: raise RuntimeError(f"Failed to read notebook JSON: {e}") def list_available_datasets(self): """List available datasets in the working directory""" dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'} datasets = [f.name for f in self.working_dir.iterdir() if f.suffix.lower() in dataset_extensions and f.is_file()] if self.verbose and datasets: logger.info(f"Available datasets: {', '.join(datasets)}") return datasets def extract_imports_from_notebook(self, notebook_path: Path): """Extract third-party imports from notebook via AST (best-effort).""" imports = set() try: nb_json = self._read_notebook_json(notebook_path) except Exception: return set() for cell in nb_json.get("cells", []): if cell.get("cell_type") != "code": continue source = cell.get("source", "") if isinstance(source, list): source = "\n".join(source) try: tree = ast.parse(source) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports.add(alias.name.split(".")[0]) elif isinstance(node, ast.ImportFrom): if node.module: imports.add(node.module.split(".")[0]) except Exception: # ignore parsing errors in individual cells continue # Filter standard-library modules (non-exhaustive) stdlib = { 'os','sys','json','re','math','random','datetime','time','collections', 'itertools','functools','operator','pathlib','urllib','http','xml','html', 'csv','sqlite3','pickle','logging','unittest','argparse','configparser', 'io','typing','warnings','copy','string','textwrap','unicodedata','struct', 'codecs','calendar','hashlib','hmac','secrets','statistics', 'subprocess' } # Filter out Google Colab specific imports as they're handled separately colab_modules = {'google', 'colab'} third_party = imports - stdlib - colab_modules return third_party def install_packages(self, python_executable: Path, packages: set | list): """Install packages into the environment (best-effort). Returns (success, stderr_text)""" if not packages: return True, "" # map common names -> pip packages package_mapping = { 'sklearn': 'scikit-learn', 'cv2': 'opencv-python', 'PIL': 'Pillow', 'bs4': 'beautifulsoup4', } mapped = [package_mapping.get(p, p) for p in packages] # Install packages one-by-one so errors are isolated for pkg in mapped: try: proc = subprocess.run([str(python_executable), "-m", "pip", "install", pkg], capture_output=True, text=True, timeout=600) if proc.returncode != 0: stderr = proc.stderr or proc.stdout or f"pip install returned {proc.returncode}" logger.warning(f"Failed to install {pkg}: {stderr.strip()[:400]}") return False, stderr except subprocess.TimeoutExpired: msg = f"Timeout while installing {pkg}" logger.warning(msg) return False, msg except Exception as e: msg = f"Error while installing {pkg}: {e}" logger.warning(msg) return False, msg return True, "" def extract_notebook_error(self, stderr_text: str): """Attempt to extract concise error message from papermill/pip stderr.""" if not stderr_text: return "Unknown error occurred" lines = stderr_text.strip().splitlines() # Look for Traceback or Exception lines for line in reversed(lines): if any(keyword in line for keyword in ("Traceback", "Error", "Exception", "ModuleNotFoundError", "ImportError")): return line.strip() # fallback to last non-empty line for line in reversed(lines): if line.strip(): return line.strip() return lines[-1] if lines else "Unknown error" def display_cell_execution_details(self, output_notebook_path: Path): """Verbose: show last executed cells (best-effort).""" try: nb = nbformat.read(str(output_notebook_path), as_version=4) except Exception as e: logger.info(f"Could not read output notebook for cell details: {e}") return logger.info("CELL-BY-CELL EXECUTION DETAILS (showing up to 10 code cells)") shown = 0 for i, cell in enumerate(nb.cells, start=1): if cell.cell_type != "code": continue shown += 1 logger.info(f"--- CELL {i} ---") src_preview = ("\n".join(cell.source.splitlines()[:6]) + ("\n..." if len(cell.source.splitlines()) > 6 else "")) logger.info("SOURCE (first lines):\n" + src_preview) if getattr(cell, "outputs", None): for output in cell.outputs[-2:]: # show last two outputs per cell if output.output_type == "stream": text = getattr(output, "text", "") logger.info("STREAM OUTPUT:\n" + ("\n".join(text.splitlines()[-4:]))) elif output.output_type == "error": ename = getattr(output, "ename", "") evalue = getattr(output, "evalue", "") logger.info(f"ERROR: {ename}: {evalue}") if shown >= 10: break def run_notebook(self, notebook_path: str | Path, timeout: int = 1800): """ Run a single notebook with Colab code replacement and dataset support. Returns a dict: {'notebook': , 'status': 'Pass'|'Fail', 'error_message': } """ try: if isinstance(notebook_path, str): if notebook_path.startswith('/'): notebook_full_path = Path(notebook_path) else: notebook_full_path = Path('/tmp/Notebook') / notebook_path else: notebook_full_path = Path(notebook_path) except Exception as e: return {"notebook": str(notebook_path), "status": "Fail", "error_message": f"Invalid path: {e}"} notebook_name = notebook_full_path.name notebook_dir = Path('/tmp/Notebook') # Check if notebook exists if not notebook_full_path.exists(): return {"notebook": notebook_name, "status": "Fail", "error_message": f"Notebook not found at: {notebook_full_path}"} # List available datasets datasets = self.list_available_datasets() if datasets: logger.info(f"Processing notebook with {len(datasets)} available dataset(s)") # Process notebook for Colab compatibility try: processed_notebook_path = self.colab_processor.process_notebook(str(notebook_full_path)) if self.verbose: logger.info(f"Processed notebook for Colab compatibility: {processed_notebook_path}") except Exception as e: logger.warning(f"Failed to process Colab compatibility: {e}") processed_notebook_path = str(notebook_full_path) # create fresh venv in the notebook folder env_path = notebook_dir / "venv" if env_path.exists(): try: shutil.rmtree(env_path) except Exception: pass # create venv try: venv_proc = subprocess.run([sys.executable, "-m", "venv", str(env_path)], capture_output=True, text=True, timeout=120) if venv_proc.returncode != 0: stderr = venv_proc.stderr or venv_proc.stdout return {"notebook": notebook_name, "status": "Fail", "error_message": f"Failed to create venv: {stderr.strip()[:400]}"} except subprocess.TimeoutExpired: return {"notebook": notebook_name, "status": "Fail", "error_message": "Timeout while creating virtual environment"} except Exception as e: return {"notebook": notebook_name, "status": "Fail", "error_message": f"Error creating venv: {e}"} # python executable inside venv if os.name == "nt": python_exec = env_path / "Scripts" / "python.exe" else: python_exec = env_path / "bin" / "python" if not python_exec.exists(): return {"notebook": notebook_name, "status": "Fail", "error_message": f"Python executable not found in venv: {python_exec}"} # Upgrade pip and install pinned minimal tooling try: # Upgrade pip up_proc = subprocess.run([str(python_exec), "-m", "pip", "install", "--upgrade", "pip"], capture_output=True, text=True, timeout=120) if up_proc.returncode != 0: logger.warning("pip upgrade returned non-zero; continuing if possible") # Install pinned papermill / ipykernel / jupyter (stable versions) pinned = [ "papermill==2.5.0", "ipykernel==6.29.5", "jupyter==1.0.0" ] install_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + pinned, capture_output=True, text=True, timeout=600) if install_proc.returncode != 0: stderr_text = install_proc.stderr or install_proc.stdout or "pip install returned non-zero" return {"notebook": notebook_name, "status": "Fail", "error_message": f"Failed to setup environment (pip install core packages): {stderr_text.strip()[:800]}"} except subprocess.TimeoutExpired: return {"notebook": notebook_name, "status": "Fail", "error_message": "Timeout installing core packages"} except Exception as e: return {"notebook": notebook_name, "status": "Fail", "error_message": f"Error installing core packages: {e}"} # Install common data-science packages (helps many notebooks run without per-notebook pip) common_packages = ["numpy", "pandas", "matplotlib", "seaborn", "scikit-learn", "plotly"] try: common_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + common_packages, capture_output=True, text=True, timeout=600) if common_proc.returncode != 0: logger.warning("Installing common packages returned non-zero; continuing") except Exception: logger.warning("Unexpected error during common package install; continuing") # Extract inferred imports and try to install them (best-effort) # Use the original notebook for import detection, not the processed one inferred = self.extract_imports_from_notebook(notebook_full_path) if inferred: success, stderr_text = self.install_packages(python_exec, inferred) if not success: return {"notebook": notebook_name, "status": "Fail", "error_message": f"Failed to install inferred packages: {stderr_text.strip()[:800]}"} # Create kernel name and install kernel kernel_name = f"nb_{re.sub(r'[^A-Za-z0-9_]', '_', notebook_name)}" try: kernel_proc = subprocess.run([str(python_exec), "-m", "ipykernel", "install", "--user", "--name", kernel_name, "--display-name", f"Python ({kernel_name})"], capture_output=True, text=True, timeout=60) if kernel_proc.returncode != 0: stderr_text = kernel_proc.stderr or kernel_proc.stdout or "ipykernel install returned non-zero" return {"notebook": notebook_name, "status": "Fail", "error_message": f"Failed to install kernel: {stderr_text.strip()[:800]}"} except subprocess.TimeoutExpired: return {"notebook": notebook_name, "status": "Fail", "error_message": "Timeout while installing kernel"} except Exception as e: return {"notebook": notebook_name, "status": "Fail", "error_message": f"Error installing kernel: {e}"} # Execute notebook with papermill (use the processed notebook) output_path = notebook_dir / "output.ipynb" try: exec_proc = subprocess.run([str(python_exec), "-m", "papermill", processed_notebook_path, str(output_path), "--kernel", kernel_name, "--no-progress-bar"], capture_output=True, text=True, timeout=timeout, cwd=str(notebook_dir)) if exec_proc.returncode == 0: status = "Pass" error_message = "" if self.verbose: logger.info("Notebook executed successfully") if output_path.exists(): self.display_cell_execution_details(output_path) else: status = "Fail" stderr_text = (exec_proc.stderr or "") + "\n" + (exec_proc.stdout or "") concise = self.extract_notebook_error(stderr_text) error_message = f"Execution failed: {concise}" if self.verbose: logger.error(error_message) if output_path.exists(): self.display_cell_execution_details(output_path) except subprocess.TimeoutExpired: status = "Fail" error_message = f"Execution timed out after {timeout} seconds" except Exception as e: status = "Fail" error_message = f"Papermill execution error: {e}" # cleanup processed notebook try: if processed_notebook_path != str(notebook_full_path) and os.path.exists(processed_notebook_path): os.remove(processed_notebook_path) except Exception: pass # cleanup venv if present (best-effort) try: if env_path.exists(): shutil.rmtree(env_path) except Exception: logger.info("Could not remove venv directory (non-fatal)") result = {"notebook": notebook_name, "status": status, "error_message": error_message} # store result self.results.append(result) # update CSV incrementally self._update_csv_report() return result def _update_csv_report(self): """Write incremental CSV with columns notebook,status,error_message""" try: df = pd.DataFrame(self.results) # Ensure consistent column ordering cols = ['notebook', 'status', 'error_message'] for c in cols: if c not in df.columns: df[c] = "" df = df[cols] df.to_csv(self.output_csv, index=False) if self.verbose: logger.info(f"Wrote report to {self.output_csv}") except Exception as e: logger.warning(f"Failed to write CSV report: {e}") # -------- Public entrypoint -------- def execute_notebook(path: str, timeout: int = 1800, verbose: bool = False, output_csv: str = "notebook_execution_report.csv"): """ Public function to execute a single notebook with Colab support and dataset integration. Args: path: path to the uploaded .ipynb file (string) timeout: execution timeout in seconds (default 1800) verbose: enable verbose logging output_csv: path to write CSV report (default notebook_execution_report.csv) Returns: result dict: {'notebook': , 'status': 'Pass'|'Fail', 'error_message': } """ executor = NotebookExecutor(verbose=verbose, output_csv=output_csv) executor.setup_working_dir() result = executor.run_notebook(path, timeout=timeout) return result # -------- CLI main (optional) -------- def main_call(notebook): """Main function for executing notebook with enhanced Colab and dataset support""" executor = NotebookExecutor(verbose=True) # Enable verbose for better debugging executor.setup_working_dir() result = executor.run_notebook(notebook, timeout=1800) return result