gLens / src /v2 /notebook.py
h2i's picture
Upload 13 files
7f5c744 verified
import os,sys,shutil,subprocess,json,re,ast,tempfile,nbformat,logging,argparse
import pandas as pd
from pathlib import Path
from colab_handler import ColabNotebookProcessor
# -------- Logging --------
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
# -------- Executor Class --------
class NotebookExecutor:
def __init__(self, working_dir: str | None = None, verbose: bool = False, output_csv: str = "/tmp/Notebook/notebook_execution_report.csv"):
self.working_dir = Path(working_dir) if working_dir else Path("/tmp/Notebook")
self.verbose = verbose
self.results = []
self.output_csv = output_csv
self.colab_processor = ColabNotebookProcessor(str(self.working_dir))
def setup_working_dir(self):
"""Create working directory"""
self.working_dir.mkdir(parents=True, exist_ok=True)
if self.verbose:
logger.info(f"Working directory: {self.working_dir}")
def clean_working_dir(self):
"""Remove working directory"""
try:
if self.working_dir.exists():
shutil.rmtree(self.working_dir)
if self.verbose:
logger.info(f"Cleaned working directory: {self.working_dir}")
except Exception as e:
logger.warning(f"Unable to fully clean working dir: {e}")
def _read_notebook_json(self, notebook_path: Path):
try:
with open(str(notebook_path), "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
raise RuntimeError(f"Failed to read notebook JSON: {e}")
def list_available_datasets(self):
"""List available datasets in the working directory"""
dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'}
datasets = [f.name for f in self.working_dir.iterdir()
if f.suffix.lower() in dataset_extensions and f.is_file()]
if self.verbose and datasets:
logger.info(f"Available datasets: {', '.join(datasets)}")
return datasets
def extract_imports_from_notebook(self, notebook_path: Path):
"""Extract third-party imports from notebook via AST (best-effort)."""
imports = set()
try:
nb_json = self._read_notebook_json(notebook_path)
except Exception:
return set()
for cell in nb_json.get("cells", []):
if cell.get("cell_type") != "code":
continue
source = cell.get("source", "")
if isinstance(source, list):
source = "\n".join(source)
try:
tree = ast.parse(source)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name.split(".")[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.add(node.module.split(".")[0])
except Exception:
# ignore parsing errors in individual cells
continue
# Filter standard-library modules (non-exhaustive)
stdlib = {
'os','sys','json','re','math','random','datetime','time','collections',
'itertools','functools','operator','pathlib','urllib','http','xml','html',
'csv','sqlite3','pickle','logging','unittest','argparse','configparser',
'io','typing','warnings','copy','string','textwrap','unicodedata','struct',
'codecs','calendar','hashlib','hmac','secrets','statistics', 'subprocess'
}
# Filter out Google Colab specific imports as they're handled separately
colab_modules = {'google', 'colab'}
third_party = imports - stdlib - colab_modules
return third_party
def install_packages(self, python_executable: Path, packages: set | list):
"""Install packages into the environment (best-effort). Returns (success, stderr_text)"""
if not packages:
return True, ""
# map common names -> pip packages
package_mapping = {
'sklearn': 'scikit-learn',
'cv2': 'opencv-python',
'PIL': 'Pillow',
'bs4': 'beautifulsoup4',
}
mapped = [package_mapping.get(p, p) for p in packages]
# Install packages one-by-one so errors are isolated
for pkg in mapped:
try:
proc = subprocess.run([str(python_executable), "-m", "pip", "install", pkg],
capture_output=True, text=True, timeout=600)
if proc.returncode != 0:
stderr = proc.stderr or proc.stdout or f"pip install returned {proc.returncode}"
logger.warning(f"Failed to install {pkg}: {stderr.strip()[:400]}")
return False, stderr
except subprocess.TimeoutExpired:
msg = f"Timeout while installing {pkg}"
logger.warning(msg)
return False, msg
except Exception as e:
msg = f"Error while installing {pkg}: {e}"
logger.warning(msg)
return False, msg
return True, ""
def extract_notebook_error(self, stderr_text: str):
"""Attempt to extract concise error message from papermill/pip stderr."""
if not stderr_text:
return "Unknown error occurred"
lines = stderr_text.strip().splitlines()
# Look for Traceback or Exception lines
for line in reversed(lines):
if any(keyword in line for keyword in ("Traceback", "Error", "Exception", "ModuleNotFoundError", "ImportError")):
return line.strip()
# fallback to last non-empty line
for line in reversed(lines):
if line.strip():
return line.strip()
return lines[-1] if lines else "Unknown error"
def display_cell_execution_details(self, output_notebook_path: Path):
"""Verbose: show last executed cells (best-effort)."""
try:
nb = nbformat.read(str(output_notebook_path), as_version=4)
except Exception as e:
logger.info(f"Could not read output notebook for cell details: {e}")
return
logger.info("CELL-BY-CELL EXECUTION DETAILS (showing up to 10 code cells)")
shown = 0
for i, cell in enumerate(nb.cells, start=1):
if cell.cell_type != "code":
continue
shown += 1
logger.info(f"--- CELL {i} ---")
src_preview = ("\n".join(cell.source.splitlines()[:6]) + ("\n..." if len(cell.source.splitlines()) > 6 else ""))
logger.info("SOURCE (first lines):\n" + src_preview)
if getattr(cell, "outputs", None):
for output in cell.outputs[-2:]: # show last two outputs per cell
if output.output_type == "stream":
text = getattr(output, "text", "")
logger.info("STREAM OUTPUT:\n" + ("\n".join(text.splitlines()[-4:])))
elif output.output_type == "error":
ename = getattr(output, "ename", "")
evalue = getattr(output, "evalue", "")
logger.info(f"ERROR: {ename}: {evalue}")
if shown >= 10:
break
def run_notebook(self, notebook_path: str | Path, timeout: int = 1800):
"""
Run a single notebook with Colab code replacement and dataset support.
Returns a dict: {'notebook': <name>, 'status': 'Pass'|'Fail', 'error_message': <msg>}
"""
try:
if isinstance(notebook_path, str):
if notebook_path.startswith('/'):
notebook_full_path = Path(notebook_path)
else:
notebook_full_path = Path('/tmp/Notebook') / notebook_path
else:
notebook_full_path = Path(notebook_path)
except Exception as e:
return {"notebook": str(notebook_path), "status": "Fail", "error_message": f"Invalid path: {e}"}
notebook_name = notebook_full_path.name
notebook_dir = Path('/tmp/Notebook')
# Check if notebook exists
if not notebook_full_path.exists():
return {"notebook": notebook_name, "status": "Fail", "error_message": f"Notebook not found at: {notebook_full_path}"}
# List available datasets
datasets = self.list_available_datasets()
if datasets:
logger.info(f"Processing notebook with {len(datasets)} available dataset(s)")
# Process notebook for Colab compatibility
try:
processed_notebook_path = self.colab_processor.process_notebook(str(notebook_full_path))
if self.verbose:
logger.info(f"Processed notebook for Colab compatibility: {processed_notebook_path}")
except Exception as e:
logger.warning(f"Failed to process Colab compatibility: {e}")
processed_notebook_path = str(notebook_full_path)
# create fresh venv in the notebook folder
env_path = notebook_dir / "venv"
if env_path.exists():
try:
shutil.rmtree(env_path)
except Exception:
pass
# create venv
try:
venv_proc = subprocess.run([sys.executable, "-m", "venv", str(env_path)], capture_output=True, text=True, timeout=120)
if venv_proc.returncode != 0:
stderr = venv_proc.stderr or venv_proc.stdout
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Failed to create venv: {stderr.strip()[:400]}"}
except subprocess.TimeoutExpired:
return {"notebook": notebook_name, "status": "Fail",
"error_message": "Timeout while creating virtual environment"}
except Exception as e:
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Error creating venv: {e}"}
# python executable inside venv
if os.name == "nt":
python_exec = env_path / "Scripts" / "python.exe"
else:
python_exec = env_path / "bin" / "python"
if not python_exec.exists():
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Python executable not found in venv: {python_exec}"}
# Upgrade pip and install pinned minimal tooling
try:
# Upgrade pip
up_proc = subprocess.run([str(python_exec), "-m", "pip", "install", "--upgrade", "pip"],
capture_output=True, text=True, timeout=120)
if up_proc.returncode != 0:
logger.warning("pip upgrade returned non-zero; continuing if possible")
# Install pinned papermill / ipykernel / jupyter (stable versions)
pinned = [
"papermill==2.5.0",
"ipykernel==6.29.5",
"jupyter==1.0.0"
]
install_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + pinned,
capture_output=True, text=True, timeout=600)
if install_proc.returncode != 0:
stderr_text = install_proc.stderr or install_proc.stdout or "pip install returned non-zero"
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Failed to setup environment (pip install core packages): {stderr_text.strip()[:800]}"}
except subprocess.TimeoutExpired:
return {"notebook": notebook_name, "status": "Fail",
"error_message": "Timeout installing core packages"}
except Exception as e:
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Error installing core packages: {e}"}
# Install common data-science packages (helps many notebooks run without per-notebook pip)
common_packages = ["numpy", "pandas", "matplotlib", "seaborn", "scikit-learn", "plotly"]
try:
common_proc = subprocess.run([str(python_exec), "-m", "pip", "install"] + common_packages,
capture_output=True, text=True, timeout=600)
if common_proc.returncode != 0:
logger.warning("Installing common packages returned non-zero; continuing")
except Exception:
logger.warning("Unexpected error during common package install; continuing")
# Extract inferred imports and try to install them (best-effort)
# Use the original notebook for import detection, not the processed one
inferred = self.extract_imports_from_notebook(notebook_full_path)
if inferred:
success, stderr_text = self.install_packages(python_exec, inferred)
if not success:
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Failed to install inferred packages: {stderr_text.strip()[:800]}"}
# Create kernel name and install kernel
kernel_name = f"nb_{re.sub(r'[^A-Za-z0-9_]', '_', notebook_name)}"
try:
kernel_proc = subprocess.run([str(python_exec), "-m", "ipykernel", "install", "--user",
"--name", kernel_name, "--display-name", f"Python ({kernel_name})"],
capture_output=True, text=True, timeout=60)
if kernel_proc.returncode != 0:
stderr_text = kernel_proc.stderr or kernel_proc.stdout or "ipykernel install returned non-zero"
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Failed to install kernel: {stderr_text.strip()[:800]}"}
except subprocess.TimeoutExpired:
return {"notebook": notebook_name, "status": "Fail",
"error_message": "Timeout while installing kernel"}
except Exception as e:
return {"notebook": notebook_name, "status": "Fail",
"error_message": f"Error installing kernel: {e}"}
# Execute notebook with papermill (use the processed notebook)
output_path = notebook_dir / "output.ipynb"
try:
exec_proc = subprocess.run([str(python_exec), "-m", "papermill",
processed_notebook_path, str(output_path),
"--kernel", kernel_name, "--no-progress-bar"],
capture_output=True, text=True, timeout=timeout, cwd=str(notebook_dir))
if exec_proc.returncode == 0:
status = "Pass"
error_message = ""
if self.verbose:
logger.info("Notebook executed successfully")
if output_path.exists():
self.display_cell_execution_details(output_path)
else:
status = "Fail"
stderr_text = (exec_proc.stderr or "") + "\n" + (exec_proc.stdout or "")
concise = self.extract_notebook_error(stderr_text)
error_message = f"Execution failed: {concise}"
if self.verbose:
logger.error(error_message)
if output_path.exists():
self.display_cell_execution_details(output_path)
except subprocess.TimeoutExpired:
status = "Fail"
error_message = f"Execution timed out after {timeout} seconds"
except Exception as e:
status = "Fail"
error_message = f"Papermill execution error: {e}"
# cleanup processed notebook
try:
if processed_notebook_path != str(notebook_full_path) and os.path.exists(processed_notebook_path):
os.remove(processed_notebook_path)
except Exception:
pass
# cleanup venv if present (best-effort)
try:
if env_path.exists():
shutil.rmtree(env_path)
except Exception:
logger.info("Could not remove venv directory (non-fatal)")
result = {"notebook": notebook_name, "status": status, "error_message": error_message}
# store result
self.results.append(result)
# update CSV incrementally
self._update_csv_report()
return result
def _update_csv_report(self):
"""Write incremental CSV with columns notebook,status,error_message"""
try:
df = pd.DataFrame(self.results)
# Ensure consistent column ordering
cols = ['notebook', 'status', 'error_message']
for c in cols:
if c not in df.columns:
df[c] = ""
df = df[cols]
df.to_csv(self.output_csv, index=False)
if self.verbose:
logger.info(f"Wrote report to {self.output_csv}")
except Exception as e:
logger.warning(f"Failed to write CSV report: {e}")
# -------- Public entrypoint --------
def execute_notebook(path: str, timeout: int = 1800, verbose: bool = False, output_csv: str = "notebook_execution_report.csv"):
"""
Public function to execute a single notebook with Colab support and dataset integration.
Args:
path: path to the uploaded .ipynb file (string)
timeout: execution timeout in seconds (default 1800)
verbose: enable verbose logging
output_csv: path to write CSV report (default notebook_execution_report.csv)
Returns:
result dict: {'notebook': <name>, 'status': 'Pass'|'Fail', 'error_message': <msg>}
"""
executor = NotebookExecutor(verbose=verbose, output_csv=output_csv)
executor.setup_working_dir()
result = executor.run_notebook(path, timeout=timeout)
return result
# -------- CLI main (optional) --------
def main_call(notebook):
"""Main function for executing notebook with enhanced Colab and dataset support"""
executor = NotebookExecutor(verbose=True) # Enable verbose for better debugging
executor.setup_working_dir()
result = executor.run_notebook(notebook, timeout=1800)
return result