|
|
import re |
|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
class ColabNotebookProcessor: |
|
|
"""Processes Jupyter notebooks to replace Google Colab specific code with local equivalents""" |
|
|
|
|
|
def __init__(self, notebook_dir: str = "/tmp/Notebook"): |
|
|
self.notebook_dir = Path(notebook_dir) |
|
|
self.dataset_files = self._get_available_datasets() |
|
|
self.dataset_mapping = self._create_dataset_mapping() |
|
|
|
|
|
def _get_available_datasets(self) -> List[str]: |
|
|
"""Get list of available dataset files in the notebook directory""" |
|
|
if not self.notebook_dir.exists(): |
|
|
return [] |
|
|
|
|
|
dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'} |
|
|
return [f.name for f in self.notebook_dir.iterdir() |
|
|
if f.suffix.lower() in dataset_extensions and f.is_file()] |
|
|
|
|
|
def _create_dataset_mapping(self) -> Dict[str, str]: |
|
|
"""Create mapping for common dataset references""" |
|
|
mapping = {} |
|
|
|
|
|
|
|
|
for filename in self.dataset_files: |
|
|
name_without_ext = Path(filename).stem |
|
|
|
|
|
|
|
|
mapping[filename] = filename |
|
|
mapping[name_without_ext] = filename |
|
|
mapping[filename.lower()] = filename |
|
|
mapping[name_without_ext.lower()] = filename |
|
|
|
|
|
|
|
|
if filename.lower().endswith('.csv'): |
|
|
mapping['data.csv'] = filename |
|
|
mapping['dataset.csv'] = filename |
|
|
mapping['train.csv'] = filename |
|
|
mapping['test.csv'] = filename |
|
|
|
|
|
return mapping |
|
|
|
|
|
def process_notebook(self, notebook_path: str) -> str: |
|
|
"""Process notebook and return path to modified notebook""" |
|
|
with open(notebook_path, 'r', encoding='utf-8') as f: |
|
|
notebook = json.load(f) |
|
|
|
|
|
|
|
|
for cell in notebook.get('cells', []): |
|
|
if cell.get('cell_type') == 'code': |
|
|
cell['source'] = self._process_code_cell(cell.get('source', [])) |
|
|
|
|
|
|
|
|
modified_path = str(Path(notebook_path).parent / f"modified_{Path(notebook_path).name}") |
|
|
with open(modified_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(notebook, f, indent=2) |
|
|
|
|
|
return modified_path |
|
|
|
|
|
def _process_code_cell(self, source_lines: List[str]) -> List[str]: |
|
|
"""Process individual code cell to replace Colab-specific code""" |
|
|
if isinstance(source_lines, str): |
|
|
source_lines = source_lines.splitlines(True) |
|
|
|
|
|
processed_lines = [] |
|
|
skip_next = False |
|
|
|
|
|
for i, line in enumerate(source_lines): |
|
|
if skip_next: |
|
|
skip_next = False |
|
|
continue |
|
|
|
|
|
processed_line = self._process_line(line) |
|
|
|
|
|
|
|
|
if self._is_colab_drive_mount(line): |
|
|
|
|
|
processed_lines.append("# Google Drive mount replaced with local file access\n") |
|
|
continue |
|
|
elif self._is_colab_files_upload(line): |
|
|
|
|
|
processed_lines.append(self._replace_file_upload(line)) |
|
|
continue |
|
|
|
|
|
processed_lines.append(processed_line) |
|
|
|
|
|
return processed_lines |
|
|
|
|
|
def _process_line(self, line: str) -> str: |
|
|
"""Process individual line for Colab replacements""" |
|
|
original_line = line |
|
|
|
|
|
|
|
|
if self._is_colab_import(line): |
|
|
return f"# {line}" if not line.strip().startswith('#') else line |
|
|
|
|
|
|
|
|
line = self._replace_drive_paths(line) |
|
|
|
|
|
|
|
|
line = self._replace_file_operations(line) |
|
|
|
|
|
|
|
|
line = self._replace_uploaded_files(line) |
|
|
|
|
|
return line |
|
|
|
|
|
def _is_colab_import(self, line: str) -> bool: |
|
|
"""Check if line contains Colab-specific imports""" |
|
|
colab_imports = [ |
|
|
'from google.colab import drive', |
|
|
'from google.colab import files', |
|
|
'from google.colab import auth', |
|
|
'import google.colab' |
|
|
] |
|
|
|
|
|
line_stripped = line.strip() |
|
|
return any(imp in line_stripped for imp in colab_imports) |
|
|
|
|
|
def _is_colab_drive_mount(self, line: str) -> bool: |
|
|
"""Check if line is a drive mount operation""" |
|
|
return 'drive.mount(' in line or 'drive.mount (' in line |
|
|
|
|
|
def _is_colab_files_upload(self, line: str) -> bool: |
|
|
"""Check if line is a files upload operation""" |
|
|
return 'files.upload(' in line or 'files.upload (' in line |
|
|
|
|
|
def _replace_drive_paths(self, line: str) -> str: |
|
|
"""Replace Google Drive paths with local paths""" |
|
|
|
|
|
drive_patterns = [ |
|
|
(r'/content/drive/My Drive/', './'), |
|
|
(r'/content/drive/MyDrive/', './'), |
|
|
(r'/content/drive/', './'), |
|
|
(r'/content/', './'), |
|
|
(r'"/content/drive/[^"]*"', lambda m: self._find_dataset_match(m.group())), |
|
|
(r"'/content/drive/[^']*'", lambda m: self._find_dataset_match(m.group())), |
|
|
] |
|
|
|
|
|
for pattern, replacement in drive_patterns: |
|
|
if callable(replacement): |
|
|
line = re.sub(pattern, replacement, line) |
|
|
else: |
|
|
line = re.sub(pattern, replacement, line) |
|
|
|
|
|
return line |
|
|
|
|
|
def _replace_file_operations(self, line: str) -> str: |
|
|
"""Replace file operations with local equivalents""" |
|
|
|
|
|
if 'pd.read_csv(' in line: |
|
|
line = self._replace_pandas_read(line, 'csv') |
|
|
elif 'pd.read_excel(' in line: |
|
|
line = self._replace_pandas_read(line, 'excel') |
|
|
|
|
|
return line |
|
|
|
|
|
def _replace_pandas_read(self, line: str, file_type: str) -> str: |
|
|
"""Replace pandas read operations with local file paths""" |
|
|
|
|
|
pattern = r'["\']([^"\']+)["\']' |
|
|
matches = re.findall(pattern, line) |
|
|
|
|
|
if matches: |
|
|
original_path = matches[0] |
|
|
|
|
|
local_file = self._find_best_dataset_match(original_path, file_type) |
|
|
if local_file: |
|
|
line = line.replace(original_path, local_file) |
|
|
|
|
|
return line |
|
|
|
|
|
def _replace_uploaded_files(self, line: str) -> str: |
|
|
"""Replace references to uploaded files with local dataset files""" |
|
|
|
|
|
if 'uploaded[' in line and self.dataset_files: |
|
|
|
|
|
line = f"# Uploaded file replaced with local dataset: {self.dataset_files[0]}\n" |
|
|
line += f"# Original: {line.strip()}\n" |
|
|
line += f"# Use: '{self.dataset_files[0]}' instead\n" |
|
|
|
|
|
return line |
|
|
|
|
|
def _replace_file_upload(self, line: str) -> str: |
|
|
"""Replace file upload with comment about available datasets""" |
|
|
comment = "# File upload replaced with local datasets\n" |
|
|
if self.dataset_files: |
|
|
comment += f"# Available datasets: {', '.join(self.dataset_files)}\n" |
|
|
else: |
|
|
comment += "# No datasets found in directory\n" |
|
|
return comment |
|
|
|
|
|
def _find_dataset_match(self, quoted_path: str) -> str: |
|
|
"""Find best matching dataset for a quoted path""" |
|
|
|
|
|
path = quoted_path.strip('\'"') |
|
|
filename = os.path.basename(path) |
|
|
|
|
|
|
|
|
if filename in self.dataset_files: |
|
|
return f'"{filename}"' |
|
|
|
|
|
|
|
|
if filename in self.dataset_mapping: |
|
|
return f'"{self.dataset_mapping[filename]}"' |
|
|
|
|
|
|
|
|
for dataset in self.dataset_files: |
|
|
if filename.lower() in dataset.lower() or dataset.lower() in filename.lower(): |
|
|
return f'"{dataset}"' |
|
|
|
|
|
|
|
|
if self.dataset_files: |
|
|
return f'"{self.dataset_files[0]}"' |
|
|
|
|
|
return quoted_path |
|
|
|
|
|
def _find_best_dataset_match(self, original_path: str, file_type: str) -> str: |
|
|
"""Find the best matching dataset file""" |
|
|
filename = os.path.basename(original_path) |
|
|
|
|
|
|
|
|
type_filtered = [] |
|
|
if file_type == 'csv': |
|
|
type_filtered = [f for f in self.dataset_files if f.lower().endswith('.csv')] |
|
|
elif file_type == 'excel': |
|
|
type_filtered = [f for f in self.dataset_files if f.lower().endswith(('.xlsx', '.xls'))] |
|
|
else: |
|
|
type_filtered = self.dataset_files |
|
|
|
|
|
|
|
|
if filename in type_filtered: |
|
|
return filename |
|
|
|
|
|
|
|
|
name_without_ext = os.path.splitext(filename)[0] |
|
|
for dataset in type_filtered: |
|
|
if os.path.splitext(dataset)[0] == name_without_ext: |
|
|
return dataset |
|
|
|
|
|
|
|
|
if type_filtered: |
|
|
return type_filtered[0] |
|
|
|
|
|
|
|
|
if self.dataset_files: |
|
|
return self.dataset_files[0] |
|
|
|
|
|
return filename |