File size: 9,925 Bytes
7f5c744
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import re
import json
import os
from pathlib import Path
from typing import List, Dict, Any

class ColabNotebookProcessor:
    """Processes Jupyter notebooks to replace Google Colab specific code with local equivalents"""
    
    def __init__(self, notebook_dir: str = "/tmp/Notebook"):
        self.notebook_dir = Path(notebook_dir)
        self.dataset_files = self._get_available_datasets()
        self.dataset_mapping = self._create_dataset_mapping()
    
    def _get_available_datasets(self) -> List[str]:
        """Get list of available dataset files in the notebook directory"""
        if not self.notebook_dir.exists():
            return []
        
        dataset_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt', '.parquet'}
        return [f.name for f in self.notebook_dir.iterdir() 
                if f.suffix.lower() in dataset_extensions and f.is_file()]
    
    def _create_dataset_mapping(self) -> Dict[str, str]:
        """Create mapping for common dataset references"""
        mapping = {}
        
        # If we have datasets, create common mappings
        for filename in self.dataset_files:
            name_without_ext = Path(filename).stem
            
            # Direct mappings
            mapping[filename] = filename
            mapping[name_without_ext] = filename
            mapping[filename.lower()] = filename
            mapping[name_without_ext.lower()] = filename
            
            # Common patterns
            if filename.lower().endswith('.csv'):
                mapping['data.csv'] = filename
                mapping['dataset.csv'] = filename
                mapping['train.csv'] = filename
                mapping['test.csv'] = filename
        
        return mapping
    
    def process_notebook(self, notebook_path: str) -> str:
        """Process notebook and return path to modified notebook"""
        with open(notebook_path, 'r', encoding='utf-8') as f:
            notebook = json.load(f)
        
        # Process each cell
        for cell in notebook.get('cells', []):
            if cell.get('cell_type') == 'code':
                cell['source'] = self._process_code_cell(cell.get('source', []))
        
        # Save modified notebook
        modified_path = str(Path(notebook_path).parent / f"modified_{Path(notebook_path).name}")
        with open(modified_path, 'w', encoding='utf-8') as f:
            json.dump(notebook, f, indent=2)
        
        return modified_path
    
    def _process_code_cell(self, source_lines: List[str]) -> List[str]:
        """Process individual code cell to replace Colab-specific code"""
        if isinstance(source_lines, str):
            source_lines = source_lines.splitlines(True)
        
        processed_lines = []
        skip_next = False
        
        for i, line in enumerate(source_lines):
            if skip_next:
                skip_next = False
                continue
                
            processed_line = self._process_line(line)
            
            # Handle multi-line Colab patterns
            if self._is_colab_drive_mount(line):
                # Skip the mount line and add a comment
                processed_lines.append("# Google Drive mount replaced with local file access\n")
                continue
            elif self._is_colab_files_upload(line):
                # Replace file upload with dataset selection
                processed_lines.append(self._replace_file_upload(line))
                continue
            
            processed_lines.append(processed_line)
        
        return processed_lines
    
    def _process_line(self, line: str) -> str:
        """Process individual line for Colab replacements"""
        original_line = line
        
        # Skip/comment out Colab-specific imports
        if self._is_colab_import(line):
            return f"# {line}" if not line.strip().startswith('#') else line
        
        # Replace Google Drive paths with local paths
        line = self._replace_drive_paths(line)
        
        # Replace Colab file operations
        line = self._replace_file_operations(line)
        
        # Replace uploaded file references
        line = self._replace_uploaded_files(line)
        
        return line
    
    def _is_colab_import(self, line: str) -> bool:
        """Check if line contains Colab-specific imports"""
        colab_imports = [
            'from google.colab import drive',
            'from google.colab import files',
            'from google.colab import auth',
            'import google.colab'
        ]
        
        line_stripped = line.strip()
        return any(imp in line_stripped for imp in colab_imports)
    
    def _is_colab_drive_mount(self, line: str) -> bool:
        """Check if line is a drive mount operation"""
        return 'drive.mount(' in line or 'drive.mount (' in line
    
    def _is_colab_files_upload(self, line: str) -> bool:
        """Check if line is a files upload operation"""
        return 'files.upload(' in line or 'files.upload (' in line
    
    def _replace_drive_paths(self, line: str) -> str:
        """Replace Google Drive paths with local paths"""
        # Common drive path patterns
        drive_patterns = [
            (r'/content/drive/My Drive/', './'),
            (r'/content/drive/MyDrive/', './'),
            (r'/content/drive/', './'),
            (r'/content/', './'),
            (r'"/content/drive/[^"]*"', lambda m: self._find_dataset_match(m.group())),
            (r"'/content/drive/[^']*'", lambda m: self._find_dataset_match(m.group())),
        ]
        
        for pattern, replacement in drive_patterns:
            if callable(replacement):
                line = re.sub(pattern, replacement, line)
            else:
                line = re.sub(pattern, replacement, line)
        
        return line
    
    def _replace_file_operations(self, line: str) -> str:
        """Replace file operations with local equivalents"""
        # Replace common file reading patterns
        if 'pd.read_csv(' in line:
            line = self._replace_pandas_read(line, 'csv')
        elif 'pd.read_excel(' in line:
            line = self._replace_pandas_read(line, 'excel')
        
        return line
    
    def _replace_pandas_read(self, line: str, file_type: str) -> str:
        """Replace pandas read operations with local file paths"""
        # Extract filename from the line if possible
        pattern = r'["\']([^"\']+)["\']'
        matches = re.findall(pattern, line)
        
        if matches:
            original_path = matches[0]
            # Try to find a matching local dataset
            local_file = self._find_best_dataset_match(original_path, file_type)
            if local_file:
                line = line.replace(original_path, local_file)
        
        return line
    
    def _replace_uploaded_files(self, line: str) -> str:
        """Replace references to uploaded files with local dataset files"""
        # Pattern for uploaded file references
        if 'uploaded[' in line and self.dataset_files:
            # Replace with first available dataset
            line = f"# Uploaded file replaced with local dataset: {self.dataset_files[0]}\n"
            line += f"# Original: {line.strip()}\n"
            line += f"# Use: '{self.dataset_files[0]}' instead\n"
        
        return line
    
    def _replace_file_upload(self, line: str) -> str:
        """Replace file upload with comment about available datasets"""
        comment = "# File upload replaced with local datasets\n"
        if self.dataset_files:
            comment += f"# Available datasets: {', '.join(self.dataset_files)}\n"
        else:
            comment += "# No datasets found in directory\n"
        return comment
    
    def _find_dataset_match(self, quoted_path: str) -> str:
        """Find best matching dataset for a quoted path"""
        # Remove quotes
        path = quoted_path.strip('\'"')
        filename = os.path.basename(path)
        
        # Try direct match first
        if filename in self.dataset_files:
            return f'"{filename}"'
        
        # Try mapping
        if filename in self.dataset_mapping:
            return f'"{self.dataset_mapping[filename]}"'
        
        # Try partial matches
        for dataset in self.dataset_files:
            if filename.lower() in dataset.lower() or dataset.lower() in filename.lower():
                return f'"{dataset}"'
        
        # Return first available dataset if any
        if self.dataset_files:
            return f'"{self.dataset_files[0]}"'
        
        return quoted_path  # Return original if no match found
    
    def _find_best_dataset_match(self, original_path: str, file_type: str) -> str:
        """Find the best matching dataset file"""
        filename = os.path.basename(original_path)
        
        # Filter by file type if specified
        type_filtered = []
        if file_type == 'csv':
            type_filtered = [f for f in self.dataset_files if f.lower().endswith('.csv')]
        elif file_type == 'excel':
            type_filtered = [f for f in self.dataset_files if f.lower().endswith(('.xlsx', '.xls'))]
        else:
            type_filtered = self.dataset_files
        
        # Try exact match
        if filename in type_filtered:
            return filename
        
        # Try name without extension
        name_without_ext = os.path.splitext(filename)[0]
        for dataset in type_filtered:
            if os.path.splitext(dataset)[0] == name_without_ext:
                return dataset
        
        # Return first file of the right type
        if type_filtered:
            return type_filtered[0]
        
        # Return first available dataset
        if self.dataset_files:
            return self.dataset_files[0]
        
        return filename  # Return original if no datasets available